Skip to content

Commit

Permalink
feature: enqueue strategy holders endpoint (#203)
Browse files Browse the repository at this point in the history
* including list holders as a background queue api task
  • Loading branch information
lucasmenendez authored Jun 28, 2024
1 parent b4e614e commit 4a68370
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 53 deletions.
3 changes: 2 additions & 1 deletion api/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ const (
getStrategiesTimeout = time.Second * 10
getStrategyTimeout = time.Second * 10
getTokensStrategyTimeout = time.Second * 10
getStrategyHoldersTimeout = time.Second * 20
checkStrategyHoldersTimeout = time.Second * 20
getStrategyHoldersTimeout = time.Minute * 5
// tokens
getTokensTimeout = time.Second * 20
createTokenTimeout = time.Second * 10
Expand Down
117 changes: 74 additions & 43 deletions api/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ func (capi *census3API) initStrategiesHandlers() error {
return err
}
if err := capi.endpoint.RegisterMethod("/strategies/{strategyID}/holders", "GET",
api.MethodAccessTypePublic, capi.listStrategyHolders); err != nil {
api.MethodAccessTypePublic, capi.launchStrategyHolders); err != nil {
return err
}
if err := capi.endpoint.RegisterMethod("/strategies/{strategyID}/holders/queue/{queueID}", "GET",
api.MethodAccessTypePublic, capi.enqueueStrategyHolders); err != nil {
return err
}
if err := capi.endpoint.RegisterMethod("/strategies/estimation", "POST",
Expand Down Expand Up @@ -550,29 +554,18 @@ func (capi *census3API) getStrategy(msg *api.APIdata, ctx *httprouter.HTTPContex
return ctx.Send(res, api.HTTPstatusOK)
}

// listStrategyHolders function handler returns the list of the holders of the
// strategy ID provided. It returns a 400 error if the provided
func (capi *census3API) listStrategyHolders(msg *api.APIdata, ctx *httprouter.HTTPContext) error {
func (capi *census3API) launchStrategyHolders(_ *api.APIdata, ctx *httprouter.HTTPContext) error {
// get provided strategyID
iStrategyID, err := strconv.Atoi(ctx.URLParam("strategyID"))
if err != nil {
return ErrMalformedStrategyID.WithErr(err)
}
strategyID := uint64(iStrategyID)
// get token information from the database
internalCtx, cancel := context.WithTimeout(ctx.Request.Context(), getStrategyHoldersTimeout)
checkCtx, cancel := context.WithTimeout(ctx.Request.Context(), checkStrategyHoldersTimeout)
defer cancel()
tx, err := capi.db.RO.BeginTx(internalCtx, nil)
if err != nil {
return ErrCantGetStrategyHolders.WithErr(err)
}
defer func() {
if err := tx.Rollback(); err != nil {
log.Errorw(err, "error rolling back tokens transaction")
}
}()
qtx := capi.db.QueriesRO.WithTx(tx)
strategy, err := qtx.StrategyByID(internalCtx, strategyID)

strategy, err := capi.db.QueriesRO.StrategyByID(checkCtx, strategyID)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrNotFoundStrategy.WithErr(err)
Expand All @@ -582,40 +575,78 @@ func (capi *census3API) listStrategyHolders(msg *api.APIdata, ctx *httprouter.HT
if strategy.Predicate == "" {
return ErrInvalidStrategyPredicate.With("empty predicate")
}
strategyTokens, err := qtx.StrategyTokens(internalCtx, strategyID)
if err != nil {
return ErrCantGetStrategyHolders.WithErr(err)
}
strategyTokensBySymbol := map[string]*StrategyToken{}
for _, token := range strategyTokens {
strategyTokensBySymbol[token.TokenAlias] = &StrategyToken{
ID: common.BytesToAddress(token.TokenID).String(),
ChainID: token.ChainID,
ExternalID: token.ExternalID,
MinBalance: token.MinBalance,
// import the strategy from IPFS in background generating a queueID
queueID := capi.queue.Enqueue()
go func() {
bgCtx, cancel := context.WithTimeout(context.Background(), getStrategyHoldersTimeout)
defer cancel()
strategyTokens, err := capi.db.QueriesRO.StrategyTokens(bgCtx, strategyID)
if err != nil {
if ok := capi.queue.Fail(queueID, ErrCantGetStrategyHolders.WithErr(err)); !ok {
log.Errorf("error updating list strategy holders queue %s", queueID)
}
return
}
}
strategyHolders, _, _, err := capi.CalculateStrategyHolders(
internalCtx, strategy.Predicate, strategyTokensBySymbol, nil)
strategyTokensBySymbol := map[string]*StrategyToken{}
for _, token := range strategyTokens {
strategyTokensBySymbol[token.TokenAlias] = &StrategyToken{
ID: common.BytesToAddress(token.TokenID).String(),
ChainID: token.ChainID,
ExternalID: token.ExternalID,
MinBalance: token.MinBalance,
}
}
strategyHolders, _, _, err := capi.CalculateStrategyHolders(
bgCtx, strategy.Predicate, strategyTokensBySymbol, nil)
if err != nil {
if ok := capi.queue.Fail(queueID, ErrEvalStrategyPredicate.WithErr(err)); !ok {
log.Errorf("error updating list strategy holders queue %s", queueID)
}
return
}
if len(strategyHolders) == 0 {
if ok := capi.queue.Fail(queueID, ErrNoStrategyHolders); !ok {
log.Errorf("error updating list strategy holders queue %s", queueID)
}
return
}
holders := make(map[string]string)
// parse and encode holders
for addr, balance := range strategyHolders {
holders[addr.String()] = balance.String()
}
if ok := capi.queue.Done(queueID, holders); !ok {
log.Errorf("error updating list strategy holders queue %s", queueID)
}
}()
// encode and send the queueID
res, err := json.Marshal(QueueResponse{QueueID: queueID})
if err != nil {
return ErrEvalStrategyPredicate.WithErr(err)
return ErrEncodeQueueItem.WithErr(err)
}
if len(strategyHolders) == 0 {
return ErrNoStrategyHolders
return ctx.Send(res, api.HTTPstatusOK)
}

func (capi *census3API) enqueueStrategyHolders(msg *api.APIdata, ctx *httprouter.HTTPContext) error {
// parse queueID from url
queueID := ctx.URLParam("queueID")
if queueID == "" {
return ErrMalformedStrategyQueueID
}
// init response struct with the no pagination information and empty list
// of holders
holdersResponse := GetStrategyHoldersResponse{
Holders: make(map[string]string),
// try to get and check if the strategy is in the queue
queueItem, exists := capi.queue.IsDone(queueID)
if !exists {
return ErrNotFoundStrategy.Withf("the ID %s does not exist in the queue", queueID)
}
// parse and encode holders
for addr, balance := range strategyHolders {
holdersResponse.Holders[addr.String()] = balance.String()
// check if it is not finished or some error occurred
if queueItem.Done && queueItem.Error == nil {
// remove the item from the queue and the censusID from the data
capi.queue.Dequeue(queueID)
}
// encode and send the response
res, err := json.Marshal(holdersResponse)
// encode item response and send it
res, err := json.Marshal(queueItem)
if err != nil {
return ErrEncodeTokenHolders.WithErr(err)
return ErrEncodeQueueItem.WithErr(err)
}
return ctx.Send(res, api.HTTPstatusOK)
}
Expand Down
2 changes: 2 additions & 0 deletions apiclient/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
CreateStrategyURI = "/strategies"
// GetTokenHoldersByStrategyURI is the URI for getting token holders of a given strategy
GetTokenHoldersByStrategyURI = "/strategies/%d/holders"
// GetTokenHoldersByStrategyURI is the URI for getting token holders of a given strategy
GetTokenHoldersByStrategyQueueURI = "/strategies/%d/holders/queue/%s"

// Censuses endpoints:

Expand Down
88 changes: 79 additions & 9 deletions apiclient/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"bytes"
"encoding/json"
"fmt"
"math/big"
"net/http"

"github.com/ethereum/go-ethereum/common"
"github.com/vocdoni/census3/api"
"github.com/vocdoni/census3/helpers/queue"
"go.vocdoni.io/dvote/log"
)

Expand Down Expand Up @@ -78,38 +81,105 @@ func (c *HTTPclient) Strategy(strategyID uint64) (*api.Strategy, error) {
return strategyResponse, nil
}

// HoldersByStrategy returns the holders of a strategy
func (c *HTTPclient) HoldersByStrategy(strategyID uint64) (*api.GetStrategyHoldersResponse, error) {
// HoldersByStrategy method queries the API for the holders of a strategy, it
// receives the strategyID and returns the queueID of the task and an error if
// something went wrong. The status of the task can be checked with the
// HoldersByStrategyQueue method.
func (c *HTTPclient) HoldersByStrategy(strategyID uint64) (string, error) {
// construct the URL to the API with the given parameters
endpoint := fmt.Sprintf(GetTokenHoldersByStrategyURI, strategyID)
u, err := c.constructURL(endpoint)
if err != nil {
return nil, fmt.Errorf("%w: %w", ErrConstructingURL, err)
return "", fmt.Errorf("%w: %w", ErrConstructingURL, err)
}
// create the request and send it, if there is an error or the status code
// is not 200, return an error
req, err := http.NewRequest(http.MethodGet, u, nil)
if err != nil {
return nil, fmt.Errorf("%w: %w", ErrCreatingRequest, err)
return "", fmt.Errorf("%w: %w", ErrCreatingRequest, err)
}
res, err := c.c.Do(req)
if err != nil {
return nil, fmt.Errorf("%w: %w", ErrMakingRequest, err)
return "", fmt.Errorf("%w: %w", ErrMakingRequest, err)
}
defer func() {
if err := res.Body.Close(); err != nil {
log.Errorf("error closing response body: %v", err)
}
}()
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("%w: %s", ErrNoStatusOk,
return "", fmt.Errorf("%w: %s", ErrNoStatusOk,
fmt.Errorf("%d %s", res.StatusCode, http.StatusText(res.StatusCode)))
}
holdersResponse := &api.GetStrategyHoldersResponse{}
holdersResponse := &api.QueueResponse{}
if err := json.NewDecoder(res.Body).Decode(holdersResponse); err != nil {
return nil, fmt.Errorf("%w: %w", ErrDecodingResponse, err)
return "", fmt.Errorf("%w: %w", ErrDecodingResponse, err)
}
return holdersResponse.QueueID, nil
}

// HoldersByStrategyQueue method checks the status of the query for the holders
// of a strategy from the API, it receives the strategyID and the queueID and
// returns a map of addresses and amounts, a boolean indicating if the queue
// task is completed and an error if something went wrong.
func (c *HTTPclient) HoldersByStrategyQueue(strategyID uint64, queueID string) (
map[common.Address]*big.Int, bool, error,
) {
if strategyID == 0 {
return nil, false, fmt.Errorf("%w: strategyID is required", ErrBadInputs)
}
if queueID == "" {
return nil, false, fmt.Errorf("%w: queueID is required", ErrBadInputs)
}
// construct the URL to the API with the given parameters
endpoint := fmt.Sprintf(GetTokenHoldersByStrategyQueueURI, strategyID, queueID)
u, err := c.constructURL(endpoint)
if err != nil {
return nil, false, fmt.Errorf("%w: %w", ErrConstructingURL, err)
}
// create the request and send it, if there is an error or the status code
// is not 200, return an error
req, err := http.NewRequest(http.MethodGet, u, nil)
if err != nil {
return nil, false, fmt.Errorf("%w: %w", ErrCreatingRequest, err)
}
res, err := c.c.Do(req)
if err != nil {
return nil, false, fmt.Errorf("%w: %w", ErrMakingRequest, err)
}
defer func() {
if err := res.Body.Close(); err != nil {
log.Errorf("error closing response body: %v", err)
}
}()
if res.StatusCode != http.StatusOK {
return nil, false, fmt.Errorf("%w: %s", ErrNoStatusOk,
fmt.Errorf("%d %s", res.StatusCode, http.StatusText(res.StatusCode)))
}
// decode the queue response
item := &queue.QueueItem{}
if err := json.NewDecoder(res.Body).Decode(item); err != nil {
return nil, false, fmt.Errorf("%w: %w", ErrDecodingResponse, err)
}
// check if the item is done and if there is an error
if !item.Done {
return nil, false, nil
}
if item.Error != nil {
return nil, true, item.Error
}
// convert the data to a map of addresses and amounts
rawHolders := item.Data.(map[string]string)
holders := make(map[common.Address]*big.Int, len(rawHolders))
for k, v := range rawHolders {
addr := common.HexToAddress(k)
amount := new(big.Int)
if _, ok := amount.SetString(v, 10); !ok {
return nil, true, fmt.Errorf("error converting amount to big.Int")
}
holders[addr] = amount
}
return holdersResponse, nil
return holders, true, nil
}

func (c *HTTPclient) CreateStrategy(request *api.Strategy) (uint64, error) {
Expand Down

0 comments on commit 4a68370

Please sign in to comment.