Skip to content

Commit

Permalink
Merge pull request #536 from LerianStudio/feature/MIDAZ-535
Browse files Browse the repository at this point in the history
Feature/MIDAZ-535
  • Loading branch information
MartinezAvellan authored Feb 20, 2025
2 parents c2b46c6 + 5d83b0d commit a285835
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 77 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"github.com/LerianStudio/midaz/pkg"
"github.com/LerianStudio/midaz/pkg/constant"
goldModel "github.com/LerianStudio/midaz/pkg/gold/transaction/model"
"github.com/LerianStudio/midaz/pkg/mmodel"
"github.com/LerianStudio/midaz/pkg/mopentelemetry"
"github.com/LerianStudio/midaz/pkg/mpointers"
Expand All @@ -30,7 +31,7 @@ type Repository interface {
ListAllByAccountID(ctx context.Context, organizationID, ledgerID, accountID uuid.UUID, filter http.Pagination) ([]*mmodel.Balance, http.CursorPagination, error)
ListByAccountIDs(ctx context.Context, organizationID, ledgerID uuid.UUID, ids []uuid.UUID) ([]*mmodel.Balance, error)
ListByAliases(ctx context.Context, organizationID, ledgerID uuid.UUID, aliases []string) ([]*mmodel.Balance, error)
SelectForUpdate(ctx context.Context, organizationID, ledgerID uuid.UUID, balances []*mmodel.Balance) error
SelectForUpdate(ctx context.Context, organizationID, ledgerID uuid.UUID, aliases []string, fromTo map[string]goldModel.Amount) error
Update(ctx context.Context, organizationID, ledgerID, id uuid.UUID, balance mmodel.UpdateBalance) error
Delete(ctx context.Context, organizationID, ledgerID, id uuid.UUID) error
}
Expand Down Expand Up @@ -487,11 +488,11 @@ func (r *BalancePostgreSQLRepository) ListByAliases(ctx context.Context, organiz
}

// SelectForUpdate a Balance entity into Postgresql.
func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organizationID, ledgerID uuid.UUID, balances []*mmodel.Balance) error {
func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organizationID, ledgerID uuid.UUID, aliases []string, fromTo map[string]goldModel.Amount) error {
tracer := pkg.NewTracerFromContext(ctx)
logger := pkg.NewLoggerFromContext(ctx)

ctx, span := tracer.Start(ctx, "postgres.update_balances")
_, span := tracer.Start(ctx, "postgres.update_balances")
defer span.End()

db, err := r.connection.GetDB()
Expand All @@ -501,20 +502,49 @@ func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organ
return err
}

tx, err := db.Begin()
tx, err := db.BeginTx(ctx, nil)
if err != nil {
mopentelemetry.HandleSpanError(&span, "Failed to init balances", err)

return err
}

for _, blc := range balances {
query := "SELECT * FROM balance WHERE organization_id = $1 AND ledger_id = $2 AND id = $3 AND deleted_at IS NULL FOR UPDATE"
defer func() {
if err != nil {
rollbackErr := tx.Rollback()
if rollbackErr != nil {
mopentelemetry.HandleSpanError(&span, "Failed to init balances", rollbackErr)

logger.Errorf("err on rollback: %v", rollbackErr)
}
} else {
commitErr := tx.Commit()
if commitErr != nil {
mopentelemetry.HandleSpanError(&span, "Failed to init balances", commitErr)

logger.Errorf("err on commit: %v", commitErr)
}
}
}()

var balances []BalancePostgreSQLModel

query := "SELECT * FROM balance WHERE organization_id = $1 AND ledger_id = $2 AND alias = ANY($3) AND deleted_at IS NULL FOR UPDATE"

row := tx.QueryRowContext(ctx, query, organizationID, ledgerID, blc.ID)
rows, err := tx.QueryContext(ctx, query, organizationID, ledgerID, aliases)
if err != nil {
mopentelemetry.HandleSpanError(&span, "Failed to execute query", err)

logger.Errorf("Failed to execute query: %v - err: %v", query, err)

return err
}

defer rows.Close()

for rows.Next() {
var balance BalancePostgreSQLModel
err = row.Scan(
if err := rows.Scan(
&balance.ID,
&balance.OrganizationID,
&balance.LedgerID,
Expand All @@ -531,11 +561,9 @@ func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organ
&balance.CreatedAt,
&balance.UpdatedAt,
&balance.DeletedAt,
)

if err != nil {
); err != nil {
if errors.Is(err, sql.ErrNoRows) {
logger.Errorf("registro não encontrado para ID %s", blc.ID)
logger.Errorf("register not found")

return err
}
Expand All @@ -545,25 +573,37 @@ func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organ
return err
}

balances = append(balances, balance)
}

for _, balance := range balances {
calculateBalances := goldModel.OperateBalances(fromTo[balance.Alias],
goldModel.Balance{
Scale: balance.Scale,
Available: balance.Available,
OnHold: balance.OnHold,
},
fromTo[balance.Alias].Operation)

var updates []string

var args []any

updates = append(updates, "available = $"+strconv.Itoa(len(args)+1))
args = append(args, blc.Available)
args = append(args, calculateBalances.Available)

updates = append(updates, "on_hold = $"+strconv.Itoa(len(args)+1))
args = append(args, blc.OnHold)
args = append(args, calculateBalances.OnHold)

updates = append(updates, "scale = $"+strconv.Itoa(len(args)+1))
args = append(args, blc.Scale)
args = append(args, calculateBalances.Scale)

updates = append(updates, "version = $"+strconv.Itoa(len(args)+1))
version := blc.Version + 1
version := balance.Version + 1
args = append(args, version)

updates = append(updates, "updated_at = $"+strconv.Itoa(len(args)+1))
args = append(args, time.Now(), organizationID, ledgerID, blc.ID)
args = append(args, time.Now(), organizationID, ledgerID, balance.ID)

queryUpdate := `UPDATE balance SET ` + strings.Join(updates, ", ") +
` WHERE organization_id = $` + strconv.Itoa(len(args)-2) +
Expand All @@ -573,31 +613,27 @@ func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organ

result, err := tx.ExecContext(ctx, queryUpdate, args...)
if err != nil {
mopentelemetry.HandleSpanError(&span, "Err on result exec content", err)

logger.Errorf("Err on result exec content: %v", err)

return err
}

rowsAffected, err := result.RowsAffected()
if err != nil || rowsAffected == 0 {
mopentelemetry.HandleSpanError(&span, "Err or zero rows affected", err)

if err == nil {
err = sql.ErrNoRows
}

logger.Errorf("Err on rows affected: %v", err)
logger.Errorf("Err or zero rows affected: %v", err)

return err
}
}

if commitErr := tx.Commit(); commitErr != nil {
err := pkg.ValidateBusinessError(constant.ErrEntityNotFound, reflect.TypeOf(mmodel.Account{}).Name())

mopentelemetry.HandleSpanError(&span, "Failed to commit balances", err)

return commitErr
}

return nil
}

Expand Down
Loading

0 comments on commit a285835

Please sign in to comment.