Skip to content

Commit

Permalink
get stored links from ipfs cluster API and not parsing the folder
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsan6sha committed Mar 19, 2024
1 parent 715a86f commit 211a068
Showing 1 changed file with 27 additions and 19 deletions.
46 changes: 27 additions & 19 deletions blox/blox.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/functionland/go-fula/common"
"github.com/functionland/go-fula/exchange"
"github.com/functionland/go-fula/ping"
"github.com/ipfs-cluster/ipfs-cluster/api"
"github.com/ipfs/boxo/path"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -309,6 +310,31 @@ func (p *Blox) GetLastCheckedTime() (time.Time, error) {
return info.ModTime(), nil
}

// This method fetches the pinned items in ipfs-cluster since the lastChecked time
func (p *Blox) ListModifiedStoredLinks(ctx context.Context, lastChecked time.Time) ([]datamodel.Link, error) {
var storedLinks []datamodel.Link
if p.ipfsClusterApi == nil {
log.Errorw("ipfs cluster API is nil", "p.ipfsClusterApi", p.ipfsClusterApi)
return nil, fmt.Errorf("ipfs cluster API is nil")
}

// Create a channel to receive pin info
out := make(chan api.GlobalPinInfo, 1024) // Adjust buffer size as needed
go func() {
defer close(out)
p.ipfsClusterApi.StatusAll(ctx, api.TrackerStatusPinned, true, out)
}()

// Filter pins based on status and creation time
for pinInfo := range out {
if pinInfo.Match(api.TrackerStatusPinned) && pinInfo.Created.After(lastChecked) {
storedLinks = append(storedLinks, cidlink.Link{Cid: pinInfo.Cid.Cid})
}
}

return storedLinks, nil
}

// ListModifiedStoredBlocks lists only the folders that have been modified after the last check time
// and returns the filenames of the files created after the last check time in those folders.
func (p *Blox) ListModifiedStoredBlocks(lastChecked time.Time) ([]string, error) {
Expand Down Expand Up @@ -488,30 +514,12 @@ func (p *Blox) Start(ctx context.Context) error {
}
p.topicName = p.getPoolName()
if p.topicName != "0" {
storedFiles, err := p.ListModifiedStoredBlocks(lastCheckedTime)
storedLinks, err := p.ListModifiedStoredLinks(p.ctx, lastCheckedTime)
if err != nil {
log.Errorf("Error listing stored blocks: %v", err)
continue
}

var storedLinks []datamodel.Link
for _, filename := range storedFiles {
fi, err := os.Stat(filename)
if err != nil {
log.Errorf("Error getting file info for %s: %v", filename, err)
continue
}
if fi.ModTime().After(lastCheckedTime) {
cidv1, err := p.GetCidv1FromBlockFilename(filename)
if err != nil {
log.Errorf("Error extracting CIDv1 from filename %s: %v", filename, err)
continue
}

storedLinks = append(storedLinks, cidlink.Link{Cid: cidv1})
}
}

// Call HandleManifestBatchStore method
_, err = p.bl.HandleManifestBatchStore(context.TODO(), p.topicName, storedLinks)
if err != nil {
Expand Down

0 comments on commit 211a068

Please sign in to comment.