Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: multiple gitcoin downloads #196

Merged
merged 2 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/census3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func main() {
DataDir: config.dataDir,
Web3Providers: w3p,
GroupKey: config.connectKey,
HolderProviders: pm.Providers(),
HolderProviders: pm.Providers(ctx),
AdminToken: config.adminToken,
})
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions scanner/providers/farcaster/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var (
IterationSyncedCooldown = 60 * time.Second
)

func (p *FarcasterProvider) Init(iconf any) error {
func (p *FarcasterProvider) Init(globalCtx context.Context, iconf any) error {
// parse the config and set the endpoints
conf, ok := iconf.(FarcasterProviderConf)
if !ok {
Expand All @@ -69,7 +69,7 @@ func (p *FarcasterProvider) Init(iconf any) error {
// and the database. By default, the last block is the creation block of the
// key registry, because in the gap between the creation of the ID and Key
// registries, there are no logs to scan.
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
ctx, cancel := context.WithTimeout(globalCtx, 15*time.Second)
defer cancel()
idRegistryLastBlock, err := p.db.QueriesRO.LastBlock(ctx, IdRegistryAddress)
if err != nil {
Expand Down Expand Up @@ -122,7 +122,7 @@ func (p *FarcasterProvider) Init(iconf any) error {
p.contracts.idRegistrySynced.Store(false)
p.contracts.keyRegistrySynced.Store(false)
// start the internal scanner
p.scannerCtx, p.cancelScanner = context.WithCancel(context.Background())
p.scannerCtx, p.cancelScanner = context.WithCancel(globalCtx)
go p.initInternalScanner()
return nil
}
Expand Down
45 changes: 36 additions & 9 deletions scanner/providers/gitcoin/gitcoin_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ const (
metadataTimeout = time.Second * 5
)

// since the scanne can scan multiple tokens concurrently, and every stamp
// is a token, we need to avoid multiple downloads at the same time. this
// variable is used to avoid multiple downloads at the same time using an
// atomic bool
var downloading atomic.Bool

type GitcoinPassport struct {
// public endpoint to download the json
apiEndpoint string
Expand Down Expand Up @@ -67,7 +73,7 @@ type GitcoinPassportConf struct {
// Init initializes the Gitcoin Passport provider with the given config. If the
// config is not of type GitcoinPassportConf, or the API endpoint is missing, it
// returns an error. If the cooldown is not set, it defaults to 6 hours.
func (g *GitcoinPassport) Init(iconf any) error {
func (g *GitcoinPassport) Init(globalCtx context.Context, iconf any) error {
conf, ok := iconf.(GitcoinPassportConf)
if !ok {
return fmt.Errorf("invalid config type")
Expand All @@ -85,7 +91,7 @@ func (g *GitcoinPassport) Init(iconf any) error {
g.cooldown = conf.Cooldown
g.db = conf.DB
// init download variables
g.ctx, g.cancel = context.WithCancel(context.Background())
g.ctx, g.cancel = context.WithCancel(globalCtx)
g.scoresChan = make(chan *GitcoinScore)
g.waiter = new(sync.WaitGroup)
g.synced = atomic.Bool{}
Expand All @@ -99,8 +105,11 @@ func (g *GitcoinPassport) Init(iconf any) error {
if err == nil {
g.lastSyncedTime.Store(lastSync)
}

g.startScoreUpdates()
// if there are other instances downloading, this one does not need to
// start the download process
if !downloading.Load() {
g.startScoreUpdates()
}
return nil
}

Expand Down Expand Up @@ -134,7 +143,7 @@ func (g *GitcoinPassport) HoldersBalances(_ context.Context, stamp []byte, _ uin
) {
// get the current scores from the db, handle the case when the stamp is
// empty and when it is not to get the scores from the db
synced := g.synced.Load()
synced := g.isSynced(true)
totalSupply := big.NewInt(0)
currentScores := make(map[common.Address]*big.Int)
if len(stamp) > 0 {
Expand Down Expand Up @@ -190,9 +199,7 @@ func (g *GitcoinPassport) IsExternal() bool {

// IsSynced returns true if the balances are not empty.
func (g *GitcoinPassport) IsSynced(_ []byte) bool {
g.currentBalancesMtx.RLock()
defer g.currentBalancesMtx.RUnlock()
return len(g.currentBalances) > 0
return g.isSynced(false)
}

// Address returns the address of the Gitcoin Passport contract.
Expand Down Expand Up @@ -395,6 +402,22 @@ func (g *GitcoinPassport) updateLastSync(ctx context.Context) error {
return nil
}

func (g *GitcoinPassport) isSynced(update bool) bool {
if !update {
return g.synced.Load()
}
lastSync, err := g.loadLastSync(g.ctx)
if err != nil {
log.Warnw("error loading last sync time", "err", err)
return g.synced.Load()
}
g.lastSyncedTime.Store(lastSync)
tLastSync := time.Unix(lastSync, 0)
isSynced := time.Since(tLastSync) < g.cooldown
g.synced.Store(isSynced)
return isSynced
}

func (g *GitcoinPassport) startScoreUpdates() {
log.Debug("starting Gitcoin Passport score updates")
g.waiter.Add(1)
Expand Down Expand Up @@ -431,8 +454,12 @@ func (g *GitcoinPassport) startScoreUpdates() {
}

func (g *GitcoinPassport) updateScores() error {
downloading.Store(true)
defer downloading.Store(false)
// download de json from API endpoint
req, err := http.NewRequestWithContext(g.ctx, http.MethodGet, g.apiEndpoint, nil)
internalCtx, cancel := context.WithCancel(g.ctx)
defer cancel()
req, err := http.NewRequestWithContext(internalCtx, http.MethodGet, g.apiEndpoint, nil)
if err != nil {
return fmt.Errorf("error creating request: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions scanner/providers/gitcoin/gitcoin_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestGitcoinPassport(t *testing.T) {
})
// create the provider
provider := new(GitcoinPassport)
c.Assert(provider.Init(GitcoinPassportConf{endpoints["/original"], time.Second * 2, testDB}), qt.IsNil)
c.Assert(provider.Init(ctx, GitcoinPassportConf{endpoints["/original"], time.Second * 2, testDB}), qt.IsNil)
// start the first download
emptyBalances, _, _, _, _, err := provider.HoldersBalances(context.TODO(), nil, 0)
c.Assert(err, qt.IsNil)
Expand All @@ -69,7 +69,7 @@ func TestGitcoinPassport(t *testing.T) {
testDB, err = db.Init(tempDBDir, "gitcoinpassport.sql")
c.Assert(err, qt.IsNil)
newProvider := new(GitcoinPassport)
c.Assert(newProvider.Init(GitcoinPassportConf{endpoints["/updated"], time.Second * 2, testDB}), qt.IsNil)
c.Assert(newProvider.Init(ctx, GitcoinPassportConf{endpoints["/updated"], time.Second * 2, testDB}), qt.IsNil)
// new endpoint with one change
time.Sleep(time.Second * 5)
c.Assert(newProvider.SetLastBalances(context.TODO(), nil, holders, 0), qt.IsNil)
Expand Down
2 changes: 1 addition & 1 deletion scanner/providers/holders_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type HolderProvider interface {
// Init initializes the provider and its internal structures. Initial
// attributes values must be defined in the struct that implements this
// interface before calling this method.
Init(conf any) error
Init(ctx context.Context, conf any) error
// SetRef sets the reference to the provider. It is used to define the
// required token information to interact with the provider.
SetRef(ref any) error
Expand Down
9 changes: 5 additions & 4 deletions scanner/providers/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package manager
// the provider types and all the providers initialized at once.

import (
"context"
"fmt"
"sync"

Expand Down Expand Up @@ -36,7 +37,7 @@ func (m *ProviderManager) AddProvider(providerType uint64, conf any) {
// provider based on the configuration stored in the manager. It initializes a
// new provider every time to avoid data races. It returns an error if the
// provider type is not found or if the provider cannot be initialized.
func (m *ProviderManager) GetProvider(providerType uint64) (providers.HolderProvider, error) {
func (m *ProviderManager) GetProvider(ctx context.Context, providerType uint64) (providers.HolderProvider, error) {
// load the configuration for the provider type
conf, ok := m.confs.Load(providerType)
if !ok {
Expand All @@ -61,7 +62,7 @@ func (m *ProviderManager) GetProvider(providerType uint64) (providers.HolderProv
return nil, fmt.Errorf("provider type %d not found", providerType)
}
// initialize the provider with the specific configuration
if err := provider.Init(conf); err != nil {
if err := provider.Init(ctx, conf); err != nil {
return nil, err
}
return provider, nil
Expand All @@ -80,10 +81,10 @@ func (m *ProviderManager) GetProviderTypes() []uint64 {

// Providers returns all the providers stored in the manager associated to their
// types as a map of uint64 to HolderProvider.
func (m *ProviderManager) Providers() map[uint64]providers.HolderProvider {
func (m *ProviderManager) Providers(ctx context.Context) map[uint64]providers.HolderProvider {
providers := make(map[uint64]providers.HolderProvider)
for _, t := range m.GetProviderTypes() {
provider, err := m.GetProvider(t)
provider, err := m.GetProvider(ctx, t)
if err != nil {
panic(err)
}
Expand Down
17 changes: 13 additions & 4 deletions scanner/providers/poap/poap_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
"net/http"
"net/url"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/vocdoni/census3/scanner/providers"
"go.vocdoni.io/dvote/log"
)

const (
defaultRequestTimeout = 30 * time.Second
// POAP_SYMBOL_PREFIX is the prefix of the POAP token symbol to be used in
// with the eventID to compose the token symbol.
POAP_SYMBOL_PREFIX = "POAP"
Expand Down Expand Up @@ -61,6 +63,8 @@ type POAPSnapshot struct {
// POAP API to get the list of POAPs for an event ID and calculate the balances
// of the token holders from the last snapshot.
type POAPHolderProvider struct {
ctx context.Context
cancel context.CancelFunc
apiEndpoint string
accessToken string
snapshots map[string]*POAPSnapshot
Expand All @@ -75,19 +79,19 @@ type POAPConfig struct {
// Init initializes the POAP external provider with the database provided.
// It returns an error if the POAP access token or api endpoint uri is not
// defined.
func (p *POAPHolderProvider) Init(iconf any) error {
func (p *POAPHolderProvider) Init(globalCtx context.Context, iconf any) error {
// parse config
conf, ok := iconf.(POAPConfig)
if !ok {
return fmt.Errorf("bad config type, it must be a POAPConfig struct")
}

if conf.APIEndpoint == "" {
return fmt.Errorf("no POAP URI defined")
}
if conf.AccessToken == "" {
return fmt.Errorf("no POAP access token defined")
}
p.ctx, p.cancel = context.WithCancel(globalCtx)
p.apiEndpoint = conf.APIEndpoint
p.accessToken = conf.AccessToken
p.snapshots = make(map[string]*POAPSnapshot)
Expand Down Expand Up @@ -161,6 +165,7 @@ func (p *POAPHolderProvider) HoldersBalances(_ context.Context, id []byte, delta
// Close method is not implemented in the POAP external provider. By default it
// returns nil error.
func (p *POAPHolderProvider) Close() error {
p.cancel()
return nil
}

Expand Down Expand Up @@ -349,7 +354,9 @@ func (p *POAPHolderProvider) holdersPage(eventID string, offset int) (*POAPAPIRe
q.Add("offset", fmt.Sprint(offset))
endpoint.RawQuery = q.Encode()
// create request and add headers
req, err := http.NewRequest("GET", endpoint.String(), nil)
internalCtx, cancel := context.WithTimeout(p.ctx, defaultRequestTimeout)
defer cancel()
req, err := http.NewRequestWithContext(internalCtx, http.MethodGet, endpoint.String(), nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -392,7 +399,9 @@ func (p *POAPHolderProvider) getEventInfo(eventID string) (*EventAPIResponse, er
return nil, err
}
// create request and add headers
req, err := http.NewRequest("GET", endpoint.String(), nil)
internalCtx, cancel := context.WithTimeout(p.ctx, defaultRequestTimeout)
defer cancel()
req, err := http.NewRequestWithContext(internalCtx, http.MethodGet, endpoint.String(), nil)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion scanner/providers/poap/poap_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestPOAP(t *testing.T) {
})

provider := new(POAPHolderProvider)
c.Assert(provider.Init(POAPConfig{endpoints["/original"], "no-token"}), qt.IsNil)
c.Assert(provider.Init(ctx, POAPConfig{endpoints["/original"], "no-token"}), qt.IsNil)
holders, _, _, _, _, err := provider.HoldersBalances(context.TODO(), nil, 0)
c.Assert(err, qt.IsNil)
c.Assert(len(holders), qt.Equals, len(expectedOriginalHolders))
Expand Down
2 changes: 1 addition & 1 deletion scanner/providers/web3/erc20_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ERC20HolderProvider struct {
synced atomic.Bool
}

func (p *ERC20HolderProvider) Init(iconf any) error {
func (p *ERC20HolderProvider) Init(_ context.Context, iconf any) error {
// parse the config and set the endpoints
conf, ok := iconf.(Web3ProviderConfig)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion scanner/providers/web3/erc721_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ERC721HolderProvider struct {
synced atomic.Bool
}

func (p *ERC721HolderProvider) Init(iconf any) error {
func (p *ERC721HolderProvider) Init(_ context.Context, iconf any) error {
// parse the config and set the endpoints
conf, ok := iconf.(Web3ProviderConfig)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion scanner/providers/web3/erc777_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ERC777HolderProvider struct {
synced atomic.Bool
}

func (p *ERC777HolderProvider) Init(iconf any) error {
func (p *ERC777HolderProvider) Init(_ context.Context, iconf any) error {
// parse the config and set the endpoints
conf, ok := iconf.(Web3ProviderConfig)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (s *Scanner) ScanHolders(ctx context.Context, token ScannerToken) (
internalCtx, cancel := context.WithTimeout(ctx, SCAN_TIMEOUT)
defer cancel()
// get the correct token holder provider for the current token
provider, err := s.providerManager.GetProvider(token.Type)
provider, err := s.providerManager.GetProvider(s.ctx, token.Type)
if err != nil {
return nil, 0, token.LastBlock, token.Synced, nil,
fmt.Errorf("token type %d not supported: %w", token.Type, err)
Expand Down
Loading