Skip to content

Commit

Permalink
Update blox.go
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsan6sha committed Feb 26, 2024
1 parent 82cd6b0 commit ffe8d96
Showing 1 changed file with 136 additions and 18 deletions.
154 changes: 136 additions & 18 deletions blox/blox.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,28 @@ import (
"errors"
"fmt"
"net/http"
"os"
"path/filepath"
"strings"
"time"

"github.com/functionland/go-fula/announcements"
"github.com/functionland/go-fula/blockchain"
"github.com/functionland/go-fula/common"
"github.com/functionland/go-fula/exchange"
"github.com/functionland/go-fula/ping"
"github.com/functionland/go-fula/wap/pkg/wifi"
"github.com/ipfs/boxo/path"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multicodec"
)

var log = logging.Logger("fula/blox")
Expand Down Expand Up @@ -280,6 +285,85 @@ func (p *Blox) FetchAvailableManifestsAndStore(ctx context.Context, maxCids int)
return nil
}

func (p *Blox) GetLastCheckedTime() (time.Time, error) {
lastCheckedFile := "/internal/.last_time_ipfs_checked"
info, err := os.Stat(lastCheckedFile)
if err != nil {
if os.IsNotExist(err) {
// File does not exist, return zero time
return time.Time{}, nil
}
return time.Time{}, err
}
return info.ModTime(), nil
}

// ListModifiedStoredBlocks lists only the folders that have been modified after the last check time
// and returns the filenames of the files created after the last check time in those folders.
func (p *Blox) ListModifiedStoredBlocks(lastChecked time.Time) ([]string, error) {
blocksDir := "/uniondrive/badgerds/blocks"
var modifiedFiles []string

err := filepath.Walk(blocksDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() || strings.HasSuffix(info.Name(), ".temp") {
// Ignore non-directories and directories with ".temp" suffix
return nil
}
if info.ModTime().After(lastChecked) {
// If the directory was modified after the last check time, walk its contents
err := filepath.Walk(path, func(subPath string, subInfo os.FileInfo, subErr error) error {
if subErr != nil {
return subErr
}
if !subInfo.IsDir() && subInfo.ModTime().After(lastChecked) {
// Include file if it's not a directory and was created after the last check time
modifiedFiles = append(modifiedFiles, subPath)
}
return nil
})
if err != nil {
return err
}
}
return nil
})
if err != nil {
return nil, err
}
return modifiedFiles, nil
}

// GetCidv1FromBlockFilename extracts CIDv1 from block filename
func (p *Blox) GetCidv1FromBlockFilename(filename string) (cid.Cid, error) {
// Implement the logic to extract CIDv1 from filename
// For example, you can use regular expressions or string manipulation
// This is just a placeholder implementation
// Adjust it according to your actual filename format
// Here's a sample implementation:
base := filepath.Base(filename)
b58 := "B" + strings.ToUpper(strings.TrimSuffix(base, filepath.Ext(base)))
cidV0, err := cid.Decode(b58)
if err != nil {
fmt.Println("Error encoding to cidV0:", err)
return cid.Cid{}, err
}
cidV1 := cid.NewCidV1(uint64(multicodec.DagPb), cidV0.Hash())
return cidV1, nil
}

// UpdateLastCheckedTime updates the last checked time
func (p *Blox) UpdateLastCheckedTime() error {
lastCheckedFile := "/internal/.last_time_ipfs_checked"
err := os.WriteFile(lastCheckedFile, []byte(time.Now().Format(time.RFC3339)), 0644)
if err != nil {
return err
}
return nil
}

func (p *Blox) Start(ctx context.Context) error {
ctx = network.WithUseTransient(ctx, "fx.exchange")
// implemented topic validators with chain integration.
Expand Down Expand Up @@ -343,41 +427,75 @@ func (p *Blox) Start(ctx context.Context) error {
p.ctx, p.cancel = context.WithCancel(ctx)

// Starting a new goroutine for periodic task
log.Debug("Called wg.Add in FetchAvailableManifestsAndStore")
log.Debug("Called wg.Add in blox.start")
p.wg.Add(1)
go func() {
log.Debug("Called wg.Done in FetchAvailableManifestsAndStore")
log.Debug("Called wg.Done in blox.start")
defer p.wg.Done()
defer log.Debug("Start blox FetchAvailableManifestsAndStore go routine is ending")
ticker := time.NewTicker(5 * time.Minute)
defer log.Debug("Start blox blox.start go routine is ending")
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()

for {
select {
case <-ticker.C:
hasFreeSpace := true
// This block will execute every 5 minutes
freeSpace, err := wifi.GetBloxFreeSpace()
// instead of FetchAvailableManifestsAndStore See what are the new manifests from ther last time we checked in blockstore
// A method that checks the last time we checked for stored blocks file
// then it checks the stored files under /uniondrive/badgerds/blocks/{folders that are not .temp}
// then for those folders that the change date/time is after the time we last checked it reads the files names that are created after the last time we checked
// thne it passes each filename to a method called GetCidv1FromBlockFilename(string filename) (string, error) and receives the cidv1 of each file and put all in an array []string
// Then it calls the method:_, err := p.bl.HandleManifestBatchStore(ctx, p.topicName, storedCids)
// If no error, it stores the time/date in a file under /internal/.last_time_ipfs_checked so that it can use it in the next run
// ListStoredBlocks, and GetCidv1FromBlockFilename
lastCheckedTime, err := p.GetLastCheckedTime()
if err != nil {
if freeSpace.Size != 0 {
if freeSpace.UsedPercentage > 90 {
hasFreeSpace = false
log.Errorf("Error retrieving last checked time: %v", err)
continue
}

storedFiles, err := p.ListModifiedStoredBlocks(lastCheckedTime)
if err != nil {
log.Errorf("Error listing stored blocks: %v", err)
continue
}

var storedLinks []datamodel.Link
for _, filename := range storedFiles {
fi, err := os.Stat(filename)
if err != nil {
log.Errorf("Error getting file info for %s: %v", filename, err)
continue
}
if fi.ModTime().After(lastCheckedTime) {
cidv1, err := p.GetCidv1FromBlockFilename(filename)
if err != nil {
log.Errorf("Error extracting CIDv1 from filename %s: %v", filename, err)
continue
}

storedLinks = append(storedLinks, cidlink.Link{Cid: cidv1})
}
}
if hasFreeSpace {
if err := p.FetchAvailableManifestsAndStore(ctx, 50); err != nil {
log.Errorw("Error in FetchAvailableManifestsAndStore", "err", err)
// Handle the error or continue based on your requirement
}

// Call HandleManifestBatchStore method
_, err = p.bl.HandleManifestBatchStore(context.TODO(), p.topicName, storedLinks)
if err != nil {
log.Errorf("Error calling HandleManifestBatchStore: %v", err)
continue
}

// Update the last checked time
err = p.UpdateLastCheckedTime()
if err != nil {
log.Errorf("Error updating last checked time: %v", err)
}
case <-ctx.Done():
// This will handle the case where the parent context is canceled
log.Info("Stopping periodic FetchAvailableManifestsAndStore due to context cancellation")
log.Info("Stopping periodic blox.start due to context cancellation")
return
case <-p.ctx.Done():
// This will handle the case where the parent context is canceled
log.Info("Stopping periodic FetchAvailableManifestsAndStore due to context cancellation")
log.Info("Stopping periodic blox.start due to context cancellation")
return
}
}
Expand Down

0 comments on commit ffe8d96

Please sign in to comment.