Skip to content

Commit

Permalink
corrected bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsan6sha committed Mar 18, 2024
1 parent e7879ed commit 3af1447
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 26 deletions.
7 changes: 4 additions & 3 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func NewFxBlockchain(h host.Host, p *ping.FxPing, a *announcements.FxAnnouncemen
}

func (bl *FxBlockchain) startFetchCheck() {
internal := 1 * time.Minute
internal := 2 * time.Minute
bl.fetchCheckTicker = time.NewTicker(internal) // check every hour, adjust as needed

if bl.wg != nil {
Expand All @@ -161,12 +161,13 @@ func (bl *FxBlockchain) startFetchCheck() {
defer bl.wg.Done() // Decrement the counter when the goroutine completes
}
defer log.Debug("startFetchCheck ticker go routine is ending")

var topic string
for {
select {
case <-bl.fetchCheckTicker.C:
if time.Since(bl.lastFetchTime) >= bl.fetchInterval {
bl.FetchUsersAndPopulateSets(context.Background(), bl.topicName, false, internal)
topic = bl.getPoolName()
bl.FetchUsersAndPopulateSets(context.Background(), topic, false, internal)
bl.lastFetchTime = time.Now() // update last fetch time
}
case <-bl.fetchCheckStop:
Expand Down
14 changes: 13 additions & 1 deletion blockchain/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type (
topicName string
relays []string
updatePoolName func(string) error
getPoolName func() string
fetchFrequency time.Duration //Hours that it should update the list of pool users and pool requests if not called through pubsub
rpc *rpc.HttpApi
ipfsClusterApi ipfsCluster.Client
Expand All @@ -33,6 +34,9 @@ type (
func defaultUpdatePoolName(newPoolName string) error {
return nil
}
func defaultGetPoolName() string {
return "0"
}
func newOptions(o ...Option) (*options, error) {
opts := options{
authorizer: "", // replace with an appropriate default peer.ID
Expand All @@ -47,7 +51,8 @@ func newOptions(o ...Option) (*options, error) {
topicName: "0", // default topic name
relays: []string{}, // default to an empty slice
updatePoolName: defaultUpdatePoolName, // set a default function or leave nil
fetchFrequency: time.Hour * 1, // default frequency, e.g., 1 hour
getPoolName: defaultGetPoolName,
fetchFrequency: time.Hour * 1, // default frequency, e.g., 1 hour
rpc: nil,
ipfsClusterApi: nil,
}
Expand Down Expand Up @@ -160,6 +165,13 @@ func WithUpdatePoolName(updatePoolName func(string) error) Option {
}
}

func WithGetPoolName(getPoolName func() string) Option {
return func(o *options) error {
o.getPoolName = getPoolName
return nil
}
}

// WithStoreDir sets a the store directory we are using for datastore
// Required.
func WithRelays(r []string) Option {
Expand Down
47 changes: 25 additions & 22 deletions blox/blox.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func New(o ...Option) (*Blox, error) {
blockchain.WithFetchFrequency(1*time.Minute),
blockchain.WithTopicName(p.topicName),
blockchain.WithUpdatePoolName(p.updatePoolName),
blockchain.WithGetPoolName(p.getPoolName),
blockchain.WithRelays(p.relays),
blockchain.WithMaxPingTime(p.maxPingTime),
blockchain.WithIpfsClient(p.rpc),
Expand Down Expand Up @@ -485,36 +486,38 @@ func (p *Blox) Start(ctx context.Context) error {
log.Errorf("Error retrieving last checked time: %v", err)
continue
}

storedFiles, err := p.ListModifiedStoredBlocks(lastCheckedTime)
if err != nil {
log.Errorf("Error listing stored blocks: %v", err)
continue
}

var storedLinks []datamodel.Link
for _, filename := range storedFiles {
fi, err := os.Stat(filename)
p.topicName = p.getPoolName()
if p.topicName != "0" {
storedFiles, err := p.ListModifiedStoredBlocks(lastCheckedTime)
if err != nil {
log.Errorf("Error getting file info for %s: %v", filename, err)
log.Errorf("Error listing stored blocks: %v", err)
continue
}
if fi.ModTime().After(lastCheckedTime) {
cidv1, err := p.GetCidv1FromBlockFilename(filename)

var storedLinks []datamodel.Link
for _, filename := range storedFiles {
fi, err := os.Stat(filename)
if err != nil {
log.Errorf("Error extracting CIDv1 from filename %s: %v", filename, err)
log.Errorf("Error getting file info for %s: %v", filename, err)
continue
}

storedLinks = append(storedLinks, cidlink.Link{Cid: cidv1})
if fi.ModTime().After(lastCheckedTime) {
cidv1, err := p.GetCidv1FromBlockFilename(filename)
if err != nil {
log.Errorf("Error extracting CIDv1 from filename %s: %v", filename, err)
continue
}

storedLinks = append(storedLinks, cidlink.Link{Cid: cidv1})
}
}
}

// Call HandleManifestBatchStore method
_, err = p.bl.HandleManifestBatchStore(context.TODO(), p.topicName, storedLinks)
if err != nil {
log.Errorf("Error calling HandleManifestBatchStore: %v", err)
continue
// Call HandleManifestBatchStore method
_, err = p.bl.HandleManifestBatchStore(context.TODO(), p.topicName, storedLinks)
if err != nil {
log.Errorw("Error calling HandleManifestBatchStore", "err", err, "p.topicName", p.topicName, "storedLinks", storedLinks)
continue
}
}

// Update the last checked time
Expand Down
9 changes: 9 additions & 0 deletions blox/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
type (
Option func(*options) error
PoolNameUpdater func(string) error
PoolNameGetter func() string
options struct {
h host.Host
name string
Expand All @@ -37,6 +38,7 @@ type (
exchangeOpts []exchange.Option
relays []string
updatePoolName PoolNameUpdater
getPoolName PoolNameGetter
pingCount int
maxPingTime int
minSuccessRate int
Expand Down Expand Up @@ -220,6 +222,13 @@ func WithUpdatePoolName(updatePoolName PoolNameUpdater) Option {
}
}

func WithGetPoolName(getPoolName PoolNameGetter) Option {
return func(o *options) error {
o.getPoolName = getPoolName
return nil
}
}

func WithPingCount(pc int) Option {
return func(o *options) error {
o.pingCount = pc
Expand Down
14 changes: 14 additions & 0 deletions cmd/blox/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,19 @@ func before(ctx *cli.Context) error {
return os.WriteFile(app.configPath, yc, 0700)
}

func getPoolNameFromConfig() string {
configData, err := os.ReadFile(app.configPath)
if err != nil {
return "0"
}

// Parse the existing config file
if err := yaml.Unmarshal(configData, &app.config); err != nil {
return "0"
}
return app.config.PoolName
}

func updateConfig(p []peer.ID) error {
// Load existing config file
configData, err := os.ReadFile(app.configPath)
Expand Down Expand Up @@ -1234,6 +1247,7 @@ func action(ctx *cli.Context) error {
blox.WithStoreDir(app.config.StoreDir),
blox.WithRelays(app.config.StaticRelays),
blox.WithUpdatePoolName(updatePoolName),
blox.WithGetPoolName(getPoolNameFromConfig),
blox.WithBlockchainEndPoint(app.blockchainEndpoint),
blox.WithSecretsPath(app.secretsPath),
blox.WithPingCount(5),
Expand Down

0 comments on commit 3af1447

Please sign in to comment.