Skip to content

Commit

Permalink
changed Pull to PullBlock in MAnifestStore and correctd unittest
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsan6sha committed Dec 23, 2023
1 parent 7c3aad2 commit 913c73d
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 17 deletions.
9 changes: 5 additions & 4 deletions blox/blox.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (p *Blox) StoreCid(ctx context.Context, l ipld.Link, limit int) error {
}
p.h.Peerstore().AddAddrs(provider.ID, []multiaddr.Multiaddr{addr}, peerstore.ConnectedAddrTTL)
log.Debugw("Started Pull in StoreCid", "from", provider.ID, "l", l)
err = p.ex.Pull(ctx, provider.ID, l)
err = p.ex.PullBlock(ctx, provider.ID, l)
if err != nil {
log.Errorw("Error happened in pulling from provider", "err", err)
continue
Expand Down Expand Up @@ -223,8 +223,10 @@ func (p *Blox) StoreManifest(ctx context.Context, links []blockchain.LinkWithLim

// FetchAvailableManifestsAndStore fetches available manifests and stores them.
func (p *Blox) FetchAvailableManifestsAndStore(ctx context.Context, maxCids int) error {
childCtx, cancel := context.WithTimeout(ctx, 4*time.Minute)
defer cancel() // It's good practice to call cancel to free resources if the childCtx finishes before the timeout
// Fetch the available manifests for a specific pool_id
availableLinks, err := p.bl.HandleManifestsAvailable(ctx, p.topicName, maxCids)
availableLinks, err := p.bl.HandleManifestsAvailable(childCtx, p.topicName, maxCids)
if err != nil {
return fmt.Errorf("failed to fetch available manifests: %w", err)
}
Expand All @@ -235,11 +237,10 @@ func (p *Blox) FetchAvailableManifestsAndStore(ctx context.Context, maxCids int)
}

// Attempt to store the fetched manifests
err = p.StoreManifest(ctx, availableLinks, maxCids)
err = p.StoreManifest(childCtx, availableLinks, maxCids)
if err != nil {
return fmt.Errorf("failed to store manifests: %w", err)
}

return nil
}

Expand Down
31 changes: 18 additions & 13 deletions blox/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func startMockServer(addr string) *http.Server {
"manifest_metadata": map[string]interface{}{
"job": map[string]string{
"engine": "IPFS",
"uri": "bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34",
"uri": "bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji",
"work": "Storage",
},
},
Expand All @@ -161,7 +161,7 @@ func startMockServer(addr string) *http.Server {
"manifest_metadata": map[string]interface{}{
"job": map[string]string{
"engine": "IPFS",
"uri": "bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy",
"uri": "bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4",
"work": "Storage",
},
},
Expand Down Expand Up @@ -217,9 +217,9 @@ func startMockServer(addr string) *http.Server {
// Prepare the ContextID based on the CID
var contextID string
switch cid {
case "bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy":
case "bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4":
contextID = base64.StdEncoding.EncodeToString([]byte("12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX"))
case "bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34":
case "bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji":
contextID = base64.StdEncoding.EncodeToString([]byte("12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM"))
default:
http.Error(w, "Not Found", http.StatusNotFound)
Expand Down Expand Up @@ -412,10 +412,12 @@ func Example_poolDiscoverPeersViaPubSub() {

// Wait until the nodes discover each other
for {
if len(h1.Peerstore().Peers()) == 3 &&
len(h2.Peerstore().Peers()) == 3 &&
len(h3.Peerstore().Peers()) == 3 {
if len(h1.Peerstore().Peers()) == 4 &&
len(h2.Peerstore().Peers()) == 4 &&
len(h3.Peerstore().Peers()) == 4 {
break
} else {
log.Infow("h1.Peerstore().Peers() is waitting", "h1.Peerstore().Peers()", h1.Peerstore().Peers())
}
select {
case <-ctx.Done():
Expand Down Expand Up @@ -447,18 +449,21 @@ func Example_poolDiscoverPeersViaPubSub() {
// Instantiated node in pool 1 with ID: 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
// Instantiated node in pool 1 with ID: 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX
// Instantiated node in pool 1 with ID: 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX
// 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM peerstore contains 3 nodes:
// 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM peerstore contains 4 nodes:
// - 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX
// - 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX
// - 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
// 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX peerstore contains 3 nodes:
// - 12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R
// 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX peerstore contains 4 nodes:
// - 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX
// - 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
// - 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX
// 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX peerstore contains 3 nodes:
// - 12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R
// 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX peerstore contains 4 nodes:
// - 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX
// - 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
// - 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX
// - 12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R
}

func updatePoolName(newPoolName string) error {
Expand Down Expand Up @@ -802,7 +807,7 @@ func Example_storeManifest() {
}()

const poolName = "1"
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
defer cancel()

// Elevate log level to show internal communications.
Expand Down Expand Up @@ -1154,8 +1159,8 @@ func Example_storeManifest() {
// from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
// content: {"that":false}
// Found bafyr4iaab3lel4ykjcyzqajx5np2uluetwvfyv3ujupxt5qs57owhpo6ty on 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
// Stored manifest: {"cid":["bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34"],"pool_id":1}
// Stored manifest: {"cid":["bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34"],"pool_id":1}
// Stored manifest: {"cid":["bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji"],"pool_id":1}
// Stored manifest: {"cid":["bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji"],"pool_id":1}
}

type Config struct {
Expand Down

0 comments on commit 913c73d

Please sign in to comment.