Skip to content

Commit

Permalink
restore queued request
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasmenendez committed Jul 5, 2024
1 parent a8898fd commit d70b1d7
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 59 deletions.
134 changes: 76 additions & 58 deletions api/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ func (capi *census3API) initStrategiesHandlers() error {
return err
}
if err := capi.endpoint.RegisterMethod("/strategies/{strategyID}/holders", "GET",
api.MethodAccessTypePublic, capi.listStrategyHolders); err != nil {
api.MethodAccessTypePublic, capi.launchStrategyHolders); err != nil {
return err
}
if err := capi.endpoint.RegisterMethod("/strategies/{strategyID}/holders/queue/{queueID}", "GET",
api.MethodAccessTypePublic, capi.enqueueStrategyHolders); err != nil {
return err
}
if err := capi.endpoint.RegisterMethod("/strategies/estimation", "POST",
Expand Down Expand Up @@ -136,7 +140,6 @@ func (capi *census3API) getStrategies(msg *api.APIdata, ctx *httprouter.HTTPCont
}
// parse and encode strategies
for _, strategy := range rows {
skipMalformed := false
strategyResponse := &Strategy{
ID: strategy.ID,
Alias: strategy.Alias,
Expand All @@ -146,19 +149,11 @@ func (capi *census3API) getStrategies(msg *api.APIdata, ctx *httprouter.HTTPCont
}
strategyTokens, err := qtx.StrategyTokens(internalCtx, strategy.ID)
if err != nil {
log.Warnw("error getting strategy tokens", "strategyID", strategy.ID)
skipMalformed = true
continue
return ErrCantGetStrategies.WithErr(err)
}
for _, strategyToken := range strategyTokens {
if strategyToken.TokenAlias == "" {
log.Warnw("no token alias",
"strategyID", strategy.ID,
"tokenID", strategyToken.TokenID,
"chainID", strategyToken.ChainID,
"externalID", strategyToken.ExternalID)
skipMalformed = true
break
return ErrCantGetStrategies.With("invalid token symbol")
}
strategyResponse.Tokens[strategyToken.TokenAlias] = &StrategyToken{
ID: common.BytesToAddress(strategyToken.TokenID).String(),
Expand All @@ -168,10 +163,6 @@ func (capi *census3API) getStrategies(msg *api.APIdata, ctx *httprouter.HTTPCont
ExternalID: strategyToken.ExternalID,
}
}
if skipMalformed {
log.Warnw("skipping malformed strategy", "strategyID", strategy.ID)
continue
}
strategiesResponse.Strategies = append(strategiesResponse.Strategies, strategyResponse)
}
res, err := json.Marshal(strategiesResponse)
Expand Down Expand Up @@ -563,29 +554,18 @@ func (capi *census3API) getStrategy(msg *api.APIdata, ctx *httprouter.HTTPContex
return ctx.Send(res, api.HTTPstatusOK)
}

// listStrategyHolders function handler returns the list of the holders of the
// strategy ID provided. It returns a 400 error if the provided
func (capi *census3API) listStrategyHolders(msg *api.APIdata, ctx *httprouter.HTTPContext) error {
func (capi *census3API) launchStrategyHolders(_ *api.APIdata, ctx *httprouter.HTTPContext) error {
// get provided strategyID
iStrategyID, err := strconv.Atoi(ctx.URLParam("strategyID"))
if err != nil {
return ErrMalformedStrategyID.WithErr(err)
}
strategyID := uint64(iStrategyID)
// get token information from the database
internalCtx, cancel := context.WithTimeout(ctx.Request.Context(), getStrategyHoldersTimeout)
checkCtx, cancel := context.WithTimeout(ctx.Request.Context(), checkStrategyHoldersTimeout)
defer cancel()
tx, err := capi.db.RO.BeginTx(internalCtx, nil)
if err != nil {
return ErrCantGetStrategyHolders.WithErr(err)
}
defer func() {
if err := tx.Rollback(); err != nil {
log.Errorw(err, "error rolling back tokens transaction")
}
}()
qtx := capi.db.QueriesRO.WithTx(tx)
strategy, err := qtx.StrategyByID(internalCtx, strategyID)

strategy, err := capi.db.QueriesRO.StrategyByID(checkCtx, strategyID)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrNotFoundStrategy.WithErr(err)
Expand All @@ -595,40 +575,78 @@ func (capi *census3API) listStrategyHolders(msg *api.APIdata, ctx *httprouter.HT
if strategy.Predicate == "" {
return ErrInvalidStrategyPredicate.With("empty predicate")
}
strategyTokens, err := qtx.StrategyTokens(internalCtx, strategyID)
if err != nil {
return ErrCantGetStrategyHolders.WithErr(err)
}
strategyTokensBySymbol := map[string]*StrategyToken{}
for _, token := range strategyTokens {
strategyTokensBySymbol[token.TokenAlias] = &StrategyToken{
ID: common.BytesToAddress(token.TokenID).String(),
ChainID: token.ChainID,
ExternalID: token.ExternalID,
MinBalance: token.MinBalance,
// import the strategy from IPFS in background generating a queueID
queueID := capi.queue.Enqueue()
go func() {
bgCtx, cancel := context.WithTimeout(context.Background(), getStrategyHoldersTimeout)
defer cancel()
strategyTokens, err := capi.db.QueriesRO.StrategyTokens(bgCtx, strategyID)
if err != nil {
if ok := capi.queue.Fail(queueID, ErrCantGetStrategyHolders.WithErr(err)); !ok {
log.Errorf("error updating list strategy holders queue %s", queueID)
}
return
}
}
strategyHolders, _, _, err := capi.CalculateStrategyHolders(
internalCtx, strategy.Predicate, strategyTokensBySymbol, nil)
strategyTokensBySymbol := map[string]*StrategyToken{}
for _, token := range strategyTokens {
strategyTokensBySymbol[token.TokenAlias] = &StrategyToken{
ID: common.BytesToAddress(token.TokenID).String(),
ChainID: token.ChainID,
ExternalID: token.ExternalID,
MinBalance: token.MinBalance,
}
}
strategyHolders, _, _, err := capi.CalculateStrategyHolders(
bgCtx, strategy.Predicate, strategyTokensBySymbol, nil)
if err != nil {
if ok := capi.queue.Fail(queueID, ErrEvalStrategyPredicate.WithErr(err)); !ok {
log.Errorf("error updating list strategy holders queue %s", queueID)
}
return
}
if len(strategyHolders) == 0 {
if ok := capi.queue.Fail(queueID, ErrNoStrategyHolders); !ok {
log.Errorf("error updating list strategy holders queue %s", queueID)
}
return
}
holders := make(map[string]string)
// parse and encode holders
for addr, balance := range strategyHolders {
holders[addr.String()] = balance.String()
}
if ok := capi.queue.Done(queueID, holders); !ok {
log.Errorf("error updating list strategy holders queue %s", queueID)
}
}()
// encode and send the queueID
res, err := json.Marshal(QueueResponse{QueueID: queueID})
if err != nil {
return ErrEvalStrategyPredicate.WithErr(err)
return ErrEncodeQueueItem.WithErr(err)
}
if len(strategyHolders) == 0 {
return ErrNoStrategyHolders
return ctx.Send(res, api.HTTPstatusOK)
}

func (capi *census3API) enqueueStrategyHolders(msg *api.APIdata, ctx *httprouter.HTTPContext) error {
// parse queueID from url
queueID := ctx.URLParam("queueID")
if queueID == "" {
return ErrMalformedStrategyQueueID
}
// init response struct with the no pagination information and empty list
// of holders
holdersResponse := GetStrategyHoldersResponse{
Holders: make(map[string]string),
// try to get and check if the strategy is in the queue
queueItem, exists := capi.queue.IsDone(queueID)
if !exists {
return ErrNotFoundStrategy.Withf("the ID %s does not exist in the queue", queueID)
}
// parse and encode holders
for addr, balance := range strategyHolders {
holdersResponse.Holders[addr.String()] = balance.String()
// check if it is not finished or some error occurred
if queueItem.Done && queueItem.Error == nil {
// remove the item from the queue and the censusID from the data
capi.queue.Dequeue(queueID)
}
// encode and send the response
res, err := json.Marshal(holdersResponse)
// encode item response and send it
res, err := json.Marshal(queueItem)
if err != nil {
return ErrEncodeTokenHolders.WithErr(err)
return ErrEncodeQueueItem.WithErr(err)
}
return ctx.Send(res, api.HTTPstatusOK)
}
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "3.7"

services:
census3:
image: ghcr.io/vocdoni/census3:main
image: ghcr.io/vocdoni/census3:stage
build: .
env_file: ".env"
restart: always
Expand Down

0 comments on commit d70b1d7

Please sign in to comment.