Skip to content

Commit

Permalink
initial draft of getting cid storageprovider using ipni (#196)
Browse files Browse the repository at this point in the history
* initial draft of getting cid storageprovider using ipni

* upgraded everything

* Update go.sum

* added relay test

* Added connect test as well

* Update example_test.go

* removed custom multiaddr

* updated kubo version

* Update go-check.yml

* Update go-check.yml

* Update go-check.yml

* downgraded fuse version

* updated QR code characters
  • Loading branch information
ehsan6sha authored Dec 14, 2023
1 parent fcafb93 commit b33c9fc
Show file tree
Hide file tree
Showing 17 changed files with 1,244 additions and 453 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ensure-podspec-version.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
git push origin update-version-${{ github.event.release.tag_name }}
- name: Create Pull Request
uses: gh-actions-contrib/create-pull-request@v3
uses: peter-evans/create-pull-request@v3
with:
title: 'Update version to ${{ env.VERSION }}'
body: 'Automatically update version number.'
Expand Down
6 changes: 4 additions & 2 deletions .github/workflows/go-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ jobs:
submodules: recursive
- uses: actions/setup-go@v4
with:
go-version: 1.21.x
go-version: 1.21.4
- name: Install staticcheck
run: go install honnef.co/go/tools/cmd/staticcheck@4970552d932f48b71485287748246cf3237cebdf # 2023.1 (v0.4.0)
run: go install honnef.co/go/tools/cmd/staticcheck@v0.4.6
- name: Clear Go module cache
run: go clean -modcache
- name: Check that go.mod is tidy
run: |
go mod tidy
Expand Down
2 changes: 0 additions & 2 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ jobs:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
tags: |
type=semver,pattern={{version}}
- name: Extract metadata (tags, labels) for Docker
id: meta
uses: docker/metadata-action@v4
Expand Down
52 changes: 20 additions & 32 deletions blox/blox.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"net/http"
"strconv"
"time"

"github.com/functionland/go-fula/announcements"
Expand Down Expand Up @@ -116,46 +115,30 @@ func (p *Blox) StoreCid(ctx context.Context, l ipld.Link, limit int) error {
log.Warnw("link already exists in datastore", "l", l.String())
return fmt.Errorf("link already exists in datastore %s", l.String())
}
providers, err := p.ex.FindProvidersDht(l)
providers, err := p.ex.FindProvidersIpni(l, p.relays)
if err != nil {
log.Errorw("And Error happened in StoreCid", "err", err)
return err
}
// Iterate over the providers and ping
for _, provider := range providers {
if provider.ID != p.h.ID() {
log.Debugw("Pinging storer", "peerID", provider.ID)
err := p.ex.PingDht(provider.ID)
if err == nil {
//Found a storer, now pull the cid
//TODO: Ideally this should fetch only the cid itseld or the path that is changed with root below it
//TODO: Ideally we should have a mechanism to reserve the pull requests and keep the pulled+requests to a max of replication factor
log.Debugw("Found a storer", "id", provider.ID)
replicasStr := ""
replicasStr, err = p.ex.SearchValueDht(ctx, l.String())
//Found a storer, now pull the cid
//TODO: Ideally this should fetch only the cid itseld or the path that is changed with root below it
//TODO: Ideally we should have a mechanism to reserve the pull requests and keep the pulled+requests to a max of replication factor
log.Debugw("Found a storer", "id", provider.ID)

replicas := len(providers)
log.Debugw("Checking replicas vs limit", "replicas", replicas, "limit", limit)
if replicas < limit {
err = p.ex.Pull(ctx, provider.ID, l)
if err != nil {
log.Warnw("SearchValue returned an error", "err", err)
}
log.Debugw("SearchValue returned value", "val", replicasStr, "for", l.String())
replicas := 0
replicas, err = strconv.Atoi(replicasStr)
if err != nil {
log.Warn(err)
replicas = 0
}
log.Debugw("Checking replicas vs limit", "replicas", replicas, "limit", limit)
if replicas < limit {
newReplicas := replicas + 1
p.ex.PutValueDht(ctx, l.String(), strconv.Itoa(newReplicas))
err = p.ex.Pull(ctx, provider.ID, l)
if err != nil {
log.Errorw("Error happened in pulling from provider", "err", err)
continue
}
return nil
} else {
return fmt.Errorf("limit of %d is reached for %s", limit, l.String())
log.Errorw("Error happened in pulling from provider", "err", err)
continue
}
return nil
} else {
return fmt.Errorf("limit of %d is reached for %s", limit, l.String())
}
} else {
log.Warnw("provider is the same as requestor", "l", l.String())
Expand Down Expand Up @@ -337,6 +320,11 @@ func (p *Blox) GetBlMembers() map[peer.ID]common.MemberStatus {
return p.bl.GetMembers()
}

func (p *Blox) BloxFreeSpace(ctx context.Context, to peer.ID) ([]byte, error) {
//This is for unit testing and no need to call directly
return p.bl.BloxFreeSpace(ctx, to)
}

func (p *Blox) StartAnnouncementServer(ctx context.Context) error {
//This is for unit testing and no need to call directly
p.wg.Add(1)
Expand Down
112 changes: 110 additions & 2 deletions blox/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"net/http"
"strings"
"time"

"github.com/functionland/go-fula/blox"
Expand Down Expand Up @@ -209,6 +210,46 @@ func startMockServer(addr string) *http.Server {
}
})

handler.HandleFunc("/cid/", func(w http.ResponseWriter, r *http.Request) {
// Extract the CID from the URL path
cid := strings.TrimPrefix(r.URL.Path, "/cid/")

// Prepare the ContextID based on the CID
var contextID string
switch cid {
case "bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy":
contextID = base64.StdEncoding.EncodeToString([]byte("12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX"))
case "bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34":
contextID = base64.StdEncoding.EncodeToString([]byte("12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM"))
default:
http.Error(w, "Not Found", http.StatusNotFound)
return
}

// Create the response
response := map[string]interface{}{
"MultihashResults": []map[string]interface{}{
{
"Multihash": "HiCJpK9N9aiHbWJ40eq3r0Lns3qhnLSUviVYdcBJD4jWjQ==",
"ProviderResults": []map[string]interface{}{
{
"ContextID": contextID,
"Metadata": "gcA/",
"Provider": map[string]interface{}{
"ID": "12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R",
"Addrs": []string{"/dns/hub.dev.fx.land/tcp/40004/p2p/12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R"},
},
},
},
},
},
}

// Set Content-Type header and send the response
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
})

// Wrap the handlers with the logging middleware
loggedHandler := requestLoggerMiddleware(handler)

Expand Down Expand Up @@ -691,6 +732,66 @@ func updatePoolName(newPoolName string) error {
// Member ID: QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe, Status: 1
}*/

func Example_testMockserver() {
server := startMockServer("127.0.0.1:4000")
defer func() {
// Shutdown the server after test
if err := server.Shutdown(context.Background()); err != nil {
panic(err) // Handle the error as you see fit
}
}()
// Define the URL
url := "http://127.0.0.1:4000/cid/bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy"

// Send a GET request to the server
resp, err := http.Get(url)
if err != nil {
log.Fatal("Error making GET request:", err)
}
defer resp.Body.Close()

// Read the response body
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Fatal("Error reading response body:", err)
}

// Convert the body to a string and log it
fmt.Println("Response:", string(body))

// Unordered output:
// Response: {"MultihashResults":[{"Multihash":"HiCJpK9N9aiHbWJ40eq3r0Lns3qhnLSUviVYdcBJD4jWjQ==","ProviderResults":[{"ContextID":"MTJEM0tvb1dIOXN3amVDeXVSNnV0ektVMVVzcGlXNVJER3pBRnZORHdxa1Q1YlVId3V4WA==","Metadata":"gcA/","Provider":{"Addrs":["/dns/hub.dev.fx.land/tcp/40004/p2p/12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R"],"ID":"12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R"}}]}]}
}

func Example_encode64Test() {
originalString := "12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX"

// Encode to Base64
encodedString := base64.StdEncoding.EncodeToString([]byte(originalString))
fmt.Println("Encoded:", encodedString)

// Decode from Base64
decodedBytes, err := base64.StdEncoding.DecodeString(encodedString)
if err != nil {
fmt.Println("Decode error:", err)
return
}
decodedString := string(decodedBytes)
fmt.Println("Decoded:", decodedString)

// Check if original and decoded are the same
if originalString == decodedString {
fmt.Println("Success: Original and decoded strings are the same.")
} else {
fmt.Println("Error: Original and decoded strings are different.")
}

// Unordered output:
// Encoded: MTJEM0tvb1dIOXN3amVDeXVSNnV0ektVMVVzcGlXNVJER3pBRnZORHdxa1Q1YlVId3V4WA==
// Decoded: 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX
// Success: Original and decoded strings are the same.
}

func Example_storeManifest() {
server := startMockServer("127.0.0.1:4000")
defer func() {
Expand Down Expand Up @@ -788,6 +889,7 @@ func Example_storeManifest() {
dht.Resiliency(1),
dht.Mode(dht.ModeAutoServer),
),
exchange.WithIpniGetEndPoint("http://127.0.0.1:4000/cid/"),
),
)
if err != nil {
Expand Down Expand Up @@ -818,6 +920,7 @@ func Example_storeManifest() {
dht.Resiliency(1),
dht.Mode(dht.ModeAutoServer),
),
exchange.WithIpniGetEndPoint("http://127.0.0.1:4000/cid/"),
),
)
if err != nil {
Expand Down Expand Up @@ -1001,6 +1104,11 @@ func Example_storeManifest() {
}
fmt.Printf(" content: %s\n", buf.String())
}
err = n1.ProvideLinkByDht(n2leafLink)
if err != nil {
fmt.Print("Error happened in ProvideLinkByDht")
panic(err)
}
peerlist3, err := n3.FindLinkProvidersByDht(n2leafLink)
if err != nil {
fmt.Print("Error happened in FindLinkProvidersByDht3")
Expand Down Expand Up @@ -1046,8 +1154,8 @@ func Example_storeManifest() {
// from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
// content: {"that":false}
// Found bafyreibzxn3zdk6e53h7cvx2sfbbroozp5e3kuvz6t4jfo2hfu4ic2ooc4 on 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
// Stored manifest: {"cid":["bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34","bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy"],"pool_id":1}
// Stored manifest: {"cid":["bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34","bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy"],"pool_id":1}
// Stored manifest: {"cid":["bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34"],"pool_id":1}
// Stored manifest: {"cid":["bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34"],"pool_id":1}
}

type Config struct {
Expand Down
17 changes: 5 additions & 12 deletions cmd/blox/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"os/signal"
"path"
"path/filepath"
"runtime"
"strconv"
"sync"
"syscall"
Expand Down Expand Up @@ -222,7 +221,6 @@ func CreateCustomRepo(ctx context.Context, basePath string, h host.Host, options

conf.Datastore = DefaultDatastoreConfig(options, dsPath, storageMax)

conf.Experimental.GraphsyncEnabled = true
conf.Addresses.Swarm = app.config.ListenAddrs
conf.Identity.PeerID = h.ID().String()
conf.Identity.PrivKey = app.config.Identity
Expand Down Expand Up @@ -336,7 +334,7 @@ func init() {
Name: "ipniPublishDirectAnnounce",
Usage: "The list of IPNI URLs to which make direct announcements.",
Destination: &app.config.directAnnounce,
Value: cli.NewStringSlice("https://announce.relay2.functionyard.fula.network/ingest/announce"),
Value: cli.NewStringSlice("https://cid.contact/ingest/announce"),
}),
altsrc.NewStringFlag(&cli.StringFlag{
Name: "ipniPublisherIdentity",
Expand Down Expand Up @@ -762,6 +760,7 @@ func action(ctx *cli.Context) error {
exchange.WithAllowTransientConnection(app.config.AllowTransientConnection),
exchange.WithIpniPublishDisabled(app.config.IpniPublishDisabled),
exchange.WithIpniPublishInterval(app.config.IpniPublishInterval),
exchange.WithIpniGetEndPoint("https://cid.contact/cid/"),
exchange.WithIpniProviderEngineOptions(
engine.WithHost(ipnih),
engine.WithDatastore(namespace.Wrap(ds, datastore.NewKey("ipni/ads"))),
Expand Down Expand Up @@ -843,15 +842,9 @@ func printMultiaddrAsQR(h host.Host) {
QuietZone: qrterminal.QUIET_ZONE,
}

if runtime.GOOS == "windows" {
// Specific characters for Windows terminal
config.BlackChar = qrterminal.BLACK
config.WhiteChar = qrterminal.WHITE
} else {
// Characters for other terminals (e.g., Unix/Linux)
config.BlackChar = "%%"
config.WhiteChar = " "
}
// Characters for other terminals (e.g., Unix/Linux)
config.BlackChar = qrterminal.BLACK
config.WhiteChar = qrterminal.WHITE

// Generate QR code
qrterminal.GenerateWithConfig(fullAddr, config)
Expand Down
Loading

0 comments on commit b33c9fc

Please sign in to comment.