Skip to content

Commit

Permalink
token holder scan parallelized, holder providers simplifyed, fixing G…
Browse files Browse the repository at this point in the history
…etEndpoint web3Pool
  • Loading branch information
lucasmenendez committed Apr 11, 2024
1 parent 0ff59d8 commit 5a35070
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 184 deletions.
93 changes: 24 additions & 69 deletions cmd/census3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (
"github.com/vocdoni/census3/db"
"github.com/vocdoni/census3/internal"
"github.com/vocdoni/census3/scanner"
"github.com/vocdoni/census3/scanner/providers"
"github.com/vocdoni/census3/scanner/providers/farcaster"
"github.com/vocdoni/census3/scanner/providers/gitcoin"
gitcoinDB "github.com/vocdoni/census3/scanner/providers/gitcoin/db"
"github.com/vocdoni/census3/scanner/providers/manager"
"github.com/vocdoni/census3/scanner/providers/poap"
"github.com/vocdoni/census3/scanner/providers/web3"
"go.vocdoni.io/dvote/log"
Expand All @@ -33,6 +33,7 @@ type Census3Config struct {
poapAPIEndpoint, poapAuthToken string
gitcoinEndpoint string
gitcoinCooldown time.Duration
scannerConcurrentTokens int
scannerCoolDown time.Duration
adminToken string
initialTokens string
Expand Down Expand Up @@ -60,6 +61,7 @@ func main() {
var strWeb3Providers string
flag.StringVar(&strWeb3Providers, "web3Providers", "", "the list of URL's of available web3 providers")
flag.DurationVar(&config.scannerCoolDown, "scannerCoolDown", 120*time.Second, "the time to wait before next scanner iteration")
flag.IntVar(&config.scannerConcurrentTokens, "scannerConcurrentTokens", 5, "the number of tokens to scan concurrently")
flag.StringVar(&config.adminToken, "adminToken", "", "the admin UUID token for the API")
flag.StringVar(&config.initialTokens, "initialTokens", "", "path of the initial tokens json file")
flag.BoolVar(&config.farcaster, "farcaster", false, "enables farcaster support")
Expand Down Expand Up @@ -112,6 +114,10 @@ func main() {
panic(err)
}
config.listOfWeb3Providers = strings.Split(pviper.GetString("web3Providers"), ",")
if err := pviper.BindPFlag("scannerConcurrentTokens", flag.Lookup("scannerConcurrentTokens")); err != nil {
panic(err)
}
config.scannerConcurrentTokens = pviper.GetInt("scannerConcurrentTokens")
if err := pviper.BindPFlag("scannerCoolDown", flag.Lookup("scannerCoolDown")); err != nil {
panic(err)
}
Expand Down Expand Up @@ -149,75 +155,31 @@ func main() {
if err != nil {
log.Fatal(err)
}

// start the holder scanner with the database and the providers
hc := scanner.NewScanner(database, w3p, config.scannerCoolDown)

// init the provider manager
pm := manager.NewProviderManager()
// init the web3 token providers
erc20Provider := new(web3.ERC20HolderProvider)
if err := erc20Provider.Init(web3.Web3ProviderConfig{Endpoints: w3p}); err != nil {
log.Fatal(err)
return
}
erc721Provider := new(web3.ERC721HolderProvider)
if err := erc721Provider.Init(web3.Web3ProviderConfig{Endpoints: w3p}); err != nil {
log.Fatal(err)
return
}
erc777Provider := new(web3.ERC777HolderProvider)
if err := erc777Provider.Init(web3.Web3ProviderConfig{Endpoints: w3p}); err != nil {
log.Fatal(err)
return
}

// set the providers in the scanner and the API
if err := hc.SetProviders(erc20Provider, erc721Provider, erc777Provider); err != nil {
log.Fatal(err)
return
}
apiProviders := map[uint64]providers.HolderProvider{
erc20Provider.Type(): erc20Provider,
erc721Provider.Type(): erc721Provider,
erc777Provider.Type(): erc777Provider,
}
web3ProviderConf := web3.Web3ProviderConfig{Endpoints: w3p}
pm.AddProvider(new(web3.ERC20HolderProvider).Type(), web3ProviderConf)
pm.AddProvider(new(web3.ERC721HolderProvider).Type(), web3ProviderConf)
pm.AddProvider(new(web3.ERC777HolderProvider).Type(), web3ProviderConf)
// init POAP external provider
if config.poapAPIEndpoint != "" {
poapProvider := new(poap.POAPHolderProvider)
if err := poapProvider.Init(poap.POAPConfig{
pm.AddProvider(new(poap.POAPHolderProvider).Type(), poap.POAPConfig{
APIEndpoint: config.poapAPIEndpoint,
AccessToken: config.poapAuthToken,
}); err != nil {
log.Fatal(err)
return
}
if err := hc.SetProviders(poapProvider); err != nil {
log.Fatal(err)
return
}
apiProviders[poapProvider.Type()] = poapProvider
})
}
if config.gitcoinEndpoint != "" {
gitcoinDatabase, err := gitcoinDB.Init(config.dataDir, "gitcoinpassport.sql")
if err != nil {
log.Fatal(err)
}
// init Gitcoin external provider
gitcoinProvider := new(gitcoin.GitcoinPassport)
if err := gitcoinProvider.Init(gitcoin.GitcoinPassportConf{
pm.AddProvider(new(gitcoin.GitcoinPassport).Type(), gitcoin.GitcoinPassportConf{
APIEndpoint: config.gitcoinEndpoint,
Cooldown: config.gitcoinCooldown,
DB: gitcoinDatabase,
}); err != nil {
log.Fatal(err)
return
}
if err := hc.SetProviders(gitcoinProvider); err != nil {
log.Fatal(err)
return
}
apiProviders[gitcoinProvider.Type()] = gitcoinProvider
})
}

// if farcaster is enabled, init the farcaster database and the provider
var farcasterDB *farcaster.DB
if config.farcaster {
Expand All @@ -226,21 +188,13 @@ func main() {
if err != nil {
log.Fatal(err)
}
farcasterProvider := new(farcaster.FarcasterProvider)
if err := farcasterProvider.Init(farcaster.FarcasterProviderConf{
pm.AddProvider(new(farcaster.FarcasterProvider).Type(), farcaster.FarcasterProviderConf{
Endpoints: w3p,
DB: farcasterDB,
}); err != nil {
log.Fatal(err)
return
}
if err := hc.SetProviders(farcasterProvider); err != nil {
log.Fatal(err)
return
}
apiProviders[farcasterProvider.Type()] = farcasterProvider
})
}

// start the holder scanner with the database and the provider manager
hc := scanner.NewScanner(database, w3p, pm, config.scannerCoolDown)
// if the admin token is not defined, generate a random one
if config.adminToken != "" {
if _, err := uuid.Parse(config.adminToken); err != nil {
Expand All @@ -259,7 +213,7 @@ func main() {
DataDir: config.dataDir,
Web3Providers: w3p,
GroupKey: config.connectKey,
HolderProviders: apiProviders,
HolderProviders: pm.Providers(),
AdminToken: config.adminToken,
})
if err != nil {
Expand All @@ -272,7 +226,8 @@ func main() {
}
log.Info("initial tokens created, or at least tried to")
}()
go hc.Start(ctx)
// start the holder scanner
go hc.Start(ctx, config.scannerConcurrentTokens)

metrics.NewCounter(fmt.Sprintf("census3_info{version=%q,chains=%q}",
internal.Version, w3p.String())).Set(1)
Expand Down
93 changes: 93 additions & 0 deletions scanner/providers/manager/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package manager

// package manager provides a manager for providers of different types
// and a way to add and get them by type concurrently safe. It initializes a new
// provider based on the type and the configuration provided every time that a
// provider is requested to avoid data races. It also provides a way to get all
// the provider types and all the providers initialized at once.

import (
"fmt"
"sync"

"github.com/vocdoni/census3/scanner/providers"
"github.com/vocdoni/census3/scanner/providers/farcaster"
"github.com/vocdoni/census3/scanner/providers/gitcoin"
"github.com/vocdoni/census3/scanner/providers/poap"
"github.com/vocdoni/census3/scanner/providers/web3"
)

type ProviderManager struct {
confs sync.Map
}

// NewProviderManager creates a new provider manager
func NewProviderManager() *ProviderManager {
return &ProviderManager{}
}

// AddProvider adds a new provider configuration to the manager assigned to the
// specific type provided
func (m *ProviderManager) AddProvider(providerType uint64, conf any) {
m.confs.Store(providerType, conf)
}

// GetProvider returns a provider based on the type provided. It initializes the
// provider based on the configuration stored in the manager. It initializes a
// new provider every time to avoid data races. It returns an error if the
// provider type is not found or if the provider cannot be initialized.
func (m *ProviderManager) GetProvider(providerType uint64) (providers.HolderProvider, error) {
// load the configuration for the provider type
conf, ok := m.confs.Load(providerType)
if !ok {
return nil, fmt.Errorf("provider type %d not found", providerType)
}
// initialize the provider based on the type
var provider providers.HolderProvider
switch providerType {
case providers.CONTRACT_TYPE_ERC20:
provider = &web3.ERC20HolderProvider{}
case providers.CONTRACT_TYPE_ERC721:
provider = &web3.ERC721HolderProvider{}
case providers.CONTRACT_TYPE_ERC777:
provider = &web3.ERC777HolderProvider{}
case providers.CONTRACT_TYPE_POAP:
provider = &poap.POAPHolderProvider{}
case providers.CONTRACT_TYPE_GITCOIN:
provider = &gitcoin.GitcoinPassport{}
case providers.CONTRACT_TYPE_FARCASTER:
provider = &farcaster.FarcasterProvider{}
default:
return nil, fmt.Errorf("provider type %d not found", providerType)
}
// initialize the provider with the specific configuration
if err := provider.Init(conf); err != nil {
return nil, err
}
return provider, nil
}

// GetProviderTypes returns all the provider types stored in the manager as a
// slice of uint64.
func (m *ProviderManager) GetProviderTypes() []uint64 {
types := []uint64{}
m.confs.Range(func(t, _ any) bool {
types = append(types, t.(uint64))
return true
})
return types
}

// Providers returns all the providers stored in the manager associated to their
// types as a map of uint64 to HolderProvider.
func (m *ProviderManager) Providers() map[uint64]providers.HolderProvider {
providers := make(map[uint64]providers.HolderProvider)
for _, t := range m.GetProviderTypes() {
provider, err := m.GetProvider(t)
if err != nil {
panic(err)
}
providers[t] = provider
}
return providers
}
76 changes: 44 additions & 32 deletions scanner/providers/web3/web3_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,55 +153,67 @@ func (nm *Web3Pool) DelEndoint(uri string) {
// is found, it resets the available flag for all, resets the next available to
// the first one and returns it.
func (nm *Web3Pool) GetEndpoint(chainID uint64) (*Web3Endpoint, bool) {
// Cases:
// - is there any available endpoint for the chainID?
// - yes, continue
// - no, reset the available flag for all the endpoints, return the first
// one and set the second one as the next available (if there is one)
// - do the next available endpoint exists?
// - yes, continue
// - no, return the first one and set the second one as the next
// available (if there is one)
// - update the next available endpoint to the next one
// - is the current endpoint available?
// - yes, return it
// - no, start again
nm.endpointsMtx.RLock()
defer nm.endpointsMtx.RUnlock()
next, ok := nm.nextAvailable.Load(chainID)
// check if there is any available endpoint for the chainID
unavailable, ok := nm.unavailable.Load(chainID)
if ok && len(unavailable.([]int)) == len(nm.endpoints[chainID]) {
// if all the endpoints are unavailable, reset the available flag for
// all the endpoints, set the second one as the next available (if
// there) and return the first one
nm.unavailable.Delete(chainID)
if len(nm.endpoints[chainID]) > 1 {
nm.nextAvailable.Store(chainID, 1)
} else {
nm.nextAvailable.Store(chainID, 0)
}
return nm.endpoints[chainID][0], true
}
// get the next available endpoint for the chainID
currentEndpointIdx, ok := nm.nextAvailable.Load(chainID)
if !ok {
if _, ok := nm.endpoints[chainID]; !ok {
// if there is no next available endpoint, set the second one as the next
// available (if there is one) and return the first one, if there is no
// endpoint, return false
if len(nm.endpoints[chainID]) == 0 {
return nil, false
}
endpoint := nm.endpoints[chainID][0]
if endpoint == nil {
return nil, false
if len(nm.endpoints[chainID]) > 1 {
nm.nextAvailable.Store(chainID, 1)
} else {
nm.nextAvailable.Store(chainID, 0)
}
// if no available endpoint is found, set all the endpoints as available
// and return the first one
nm.unavailable.Delete(chainID)
nm.nextAvailable.Store(chainID, 0)
return nm.endpoints[chainID][0], true
}
endpointIdx, ok := next.(int)
if !ok {
return nil, false
}
// use the next available endpoint with the following endpoint for chainID
// if there are no more endpoints, use the first one as the next available
if _, ok := nm.endpoints[chainID]; !ok {
nm.nextAvailable.Delete(chainID)
return nil, false
}
endpoint := nm.endpoints[chainID][endpointIdx]
if endpoint == nil {
nm.nextAvailable.Delete(chainID)
return nil, false
}
// if the endpoint is available, set the next available to the next one
nextAvailable := endpointIdx + 1
// update the next available endpoint to the next one
nextAvailable := currentEndpointIdx.(int) + 1
if nextAvailable >= len(nm.endpoints[chainID]) {
nextAvailable = 0
}
nm.nextAvailable.Store(chainID, nextAvailable)
// if the endpoint is not available, return call the method again to get the
// next available endpoint
if unavailable, ok := nm.unavailable.Load(chainID); ok {
// check if the current endpoint is available
if unavailable != nil {
for _, unavailableIdx := range unavailable.([]int) {
if unavailableIdx == endpointIdx {
if unavailableIdx == currentEndpointIdx.(int) {
return nm.GetEndpoint(chainID)
}
}
}
// if it is available, return it
return endpoint, true
// return the current endpoint
return nm.endpoints[chainID][currentEndpointIdx.(int)], true
}

// DisableEndpoint method sets the available flag to false for the URI provided
Expand Down
Loading

0 comments on commit 5a35070

Please sign in to comment.