From a7e10ff58ed0b12a77e1f2126d30836e7703dc6d Mon Sep 17 00:00:00 2001 From: MartinezAvellan Date: Thu, 20 Feb 2025 10:54:42 +0100 Subject: [PATCH 1/4] feat: select for update with version correctly implemented; :sparkles: --- .../postgres/balance/balance.postgresql.go | 46 ++++++++++++------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/components/transaction/internal/adapters/postgres/balance/balance.postgresql.go b/components/transaction/internal/adapters/postgres/balance/balance.postgresql.go index 9d1c62ef..e62b23f4 100644 --- a/components/transaction/internal/adapters/postgres/balance/balance.postgresql.go +++ b/components/transaction/internal/adapters/postgres/balance/balance.postgresql.go @@ -491,7 +491,7 @@ func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organ tracer := pkg.NewTracerFromContext(ctx) logger := pkg.NewLoggerFromContext(ctx) - ctx, span := tracer.Start(ctx, "postgres.update_balances") + _, span := tracer.Start(ctx, "postgres.update_balances") defer span.End() db, err := r.connection.GetDB() @@ -501,17 +501,35 @@ func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organ return err } - tx, err := db.Begin() + tx, err := db.BeginTx(ctx, nil) if err != nil { mopentelemetry.HandleSpanError(&span, "Failed to init balances", err) return err } + defer func() { + if err != nil { + rollbackErr := tx.Rollback() + if rollbackErr != nil { + mopentelemetry.HandleSpanError(&span, "Failed to init balances", rollbackErr) + + logger.Errorf("err on rollback: %v", rollbackErr) + } + } else { + commitErr := tx.Commit() + if commitErr != nil { + mopentelemetry.HandleSpanError(&span, "Failed to init balances", commitErr) + + logger.Errorf("err on commit: %v", commitErr) + } + } + }() + for _, blc := range balances { - query := "SELECT * FROM balance WHERE organization_id = $1 AND ledger_id = $2 AND id = $3 AND deleted_at IS NULL FOR UPDATE" + query := "SELECT * FROM balance WHERE organization_id = $1 AND ledger_id = $2 AND id = $3 AND version = $4 AND deleted_at IS NULL FOR UPDATE" - row := tx.QueryRowContext(ctx, query, organizationID, ledgerID, blc.ID) + row := tx.QueryRowContext(ctx, query, organizationID, ledgerID, blc.ID, blc.Version) var balance BalancePostgreSQLModel err = row.Scan( @@ -534,19 +552,20 @@ func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organ ) if err != nil { + mopentelemetry.HandleSpanError(&span, "register not found", err) if errors.Is(err, sql.ErrNoRows) { - logger.Errorf("registro não encontrado para ID %s", blc.ID) + + logger.Errorf("register not found for ID %s", blc.ID) return err } - logger.Errorf("erro no select for update: %v", err) + logger.Errorf("err on select for update: %v", err) return err } var updates []string - var args []any updates = append(updates, "available = $"+strconv.Itoa(len(args)+1)) @@ -573,6 +592,8 @@ func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organ result, err := tx.ExecContext(ctx, queryUpdate, args...) if err != nil { + mopentelemetry.HandleSpanError(&span, "Err on result exec content", err) + logger.Errorf("Err on result exec content: %v", err) return err @@ -580,24 +601,17 @@ func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organ rowsAffected, err := result.RowsAffected() if err != nil || rowsAffected == 0 { + mopentelemetry.HandleSpanError(&span, "Err or zero rows affected", err) if err == nil { err = sql.ErrNoRows } - logger.Errorf("Err on rows affected: %v", err) + logger.Errorf("Err or zero rows affected: %v", err) return err } } - if commitErr := tx.Commit(); commitErr != nil { - err := pkg.ValidateBusinessError(constant.ErrEntityNotFound, reflect.TypeOf(mmodel.Account{}).Name()) - - mopentelemetry.HandleSpanError(&span, "Failed to commit balances", err) - - return commitErr - } - return nil } From 0bcde2ea41e10aec0697dfe214dbd6383ce06824 Mon Sep 17 00:00:00 2001 From: MartinezAvellan Date: Thu, 20 Feb 2025 11:35:07 +0100 Subject: [PATCH 2/4] feat: first version of select of update with balance rules; :sparkles: --- .../adapters/postgres/balance/balance.mock.go | 74 +++++++++---------- .../postgres/balance/balance.postgresql.go | 43 ++++++++--- .../services/command/update-balance.go | 17 ++--- 3 files changed, 78 insertions(+), 56 deletions(-) diff --git a/components/transaction/internal/adapters/postgres/balance/balance.mock.go b/components/transaction/internal/adapters/postgres/balance/balance.mock.go index 75650f12..9ea3aa24 100644 --- a/components/transaction/internal/adapters/postgres/balance/balance.mock.go +++ b/components/transaction/internal/adapters/postgres/balance/balance.mock.go @@ -13,6 +13,7 @@ import ( context "context" reflect "reflect" + model "github.com/LerianStudio/midaz/pkg/gold/transaction/model" mmodel "github.com/LerianStudio/midaz/pkg/mmodel" http "github.com/LerianStudio/midaz/pkg/net/http" uuid "github.com/google/uuid" @@ -23,7 +24,6 @@ import ( type MockRepository struct { ctrl *gomock.Controller recorder *MockRepositoryMockRecorder - isgomock struct{} } // MockRepositoryMockRecorder is the mock recorder for MockRepository. @@ -44,52 +44,52 @@ func (m *MockRepository) EXPECT() *MockRepositoryMockRecorder { } // Create mocks base method. -func (m *MockRepository) Create(ctx context.Context, balance *mmodel.Balance) error { +func (m *MockRepository) Create(arg0 context.Context, arg1 *mmodel.Balance) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Create", ctx, balance) + ret := m.ctrl.Call(m, "Create", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } // Create indicates an expected call of Create. -func (mr *MockRepositoryMockRecorder) Create(ctx, balance any) *gomock.Call { +func (mr *MockRepositoryMockRecorder) Create(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockRepository)(nil).Create), ctx, balance) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockRepository)(nil).Create), arg0, arg1) } // Delete mocks base method. -func (m *MockRepository) Delete(ctx context.Context, organizationID, ledgerID, id uuid.UUID) error { +func (m *MockRepository) Delete(arg0 context.Context, arg1, arg2, arg3 uuid.UUID) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Delete", ctx, organizationID, ledgerID, id) + ret := m.ctrl.Call(m, "Delete", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) return ret0 } // Delete indicates an expected call of Delete. -func (mr *MockRepositoryMockRecorder) Delete(ctx, organizationID, ledgerID, id any) *gomock.Call { +func (mr *MockRepositoryMockRecorder) Delete(arg0, arg1, arg2, arg3 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockRepository)(nil).Delete), ctx, organizationID, ledgerID, id) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockRepository)(nil).Delete), arg0, arg1, arg2, arg3) } // Find mocks base method. -func (m *MockRepository) Find(ctx context.Context, organizationID, ledgerID, id uuid.UUID) (*mmodel.Balance, error) { +func (m *MockRepository) Find(arg0 context.Context, arg1, arg2, arg3 uuid.UUID) (*mmodel.Balance, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Find", ctx, organizationID, ledgerID, id) + ret := m.ctrl.Call(m, "Find", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(*mmodel.Balance) ret1, _ := ret[1].(error) return ret0, ret1 } // Find indicates an expected call of Find. -func (mr *MockRepositoryMockRecorder) Find(ctx, organizationID, ledgerID, id any) *gomock.Call { +func (mr *MockRepositoryMockRecorder) Find(arg0, arg1, arg2, arg3 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Find", reflect.TypeOf((*MockRepository)(nil).Find), ctx, organizationID, ledgerID, id) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Find", reflect.TypeOf((*MockRepository)(nil).Find), arg0, arg1, arg2, arg3) } // ListAll mocks base method. -func (m *MockRepository) ListAll(ctx context.Context, organizationID, ledgerID uuid.UUID, filter http.Pagination) ([]*mmodel.Balance, http.CursorPagination, error) { +func (m *MockRepository) ListAll(arg0 context.Context, arg1, arg2 uuid.UUID, arg3 http.Pagination) ([]*mmodel.Balance, http.CursorPagination, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListAll", ctx, organizationID, ledgerID, filter) + ret := m.ctrl.Call(m, "ListAll", arg0, arg1, arg2, arg3) ret0, _ := ret[0].([]*mmodel.Balance) ret1, _ := ret[1].(http.CursorPagination) ret2, _ := ret[2].(error) @@ -97,15 +97,15 @@ func (m *MockRepository) ListAll(ctx context.Context, organizationID, ledgerID u } // ListAll indicates an expected call of ListAll. -func (mr *MockRepositoryMockRecorder) ListAll(ctx, organizationID, ledgerID, filter any) *gomock.Call { +func (mr *MockRepositoryMockRecorder) ListAll(arg0, arg1, arg2, arg3 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListAll", reflect.TypeOf((*MockRepository)(nil).ListAll), ctx, organizationID, ledgerID, filter) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListAll", reflect.TypeOf((*MockRepository)(nil).ListAll), arg0, arg1, arg2, arg3) } // ListAllByAccountID mocks base method. -func (m *MockRepository) ListAllByAccountID(ctx context.Context, organizationID, ledgerID, accountID uuid.UUID, filter http.Pagination) ([]*mmodel.Balance, http.CursorPagination, error) { +func (m *MockRepository) ListAllByAccountID(arg0 context.Context, arg1, arg2, arg3 uuid.UUID, arg4 http.Pagination) ([]*mmodel.Balance, http.CursorPagination, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListAllByAccountID", ctx, organizationID, ledgerID, accountID, filter) + ret := m.ctrl.Call(m, "ListAllByAccountID", arg0, arg1, arg2, arg3, arg4) ret0, _ := ret[0].([]*mmodel.Balance) ret1, _ := ret[1].(http.CursorPagination) ret2, _ := ret[2].(error) @@ -113,65 +113,65 @@ func (m *MockRepository) ListAllByAccountID(ctx context.Context, organizationID, } // ListAllByAccountID indicates an expected call of ListAllByAccountID. -func (mr *MockRepositoryMockRecorder) ListAllByAccountID(ctx, organizationID, ledgerID, accountID, filter any) *gomock.Call { +func (mr *MockRepositoryMockRecorder) ListAllByAccountID(arg0, arg1, arg2, arg3, arg4 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListAllByAccountID", reflect.TypeOf((*MockRepository)(nil).ListAllByAccountID), ctx, organizationID, ledgerID, accountID, filter) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListAllByAccountID", reflect.TypeOf((*MockRepository)(nil).ListAllByAccountID), arg0, arg1, arg2, arg3, arg4) } // ListByAccountIDs mocks base method. -func (m *MockRepository) ListByAccountIDs(ctx context.Context, organizationID, ledgerID uuid.UUID, ids []uuid.UUID) ([]*mmodel.Balance, error) { +func (m *MockRepository) ListByAccountIDs(arg0 context.Context, arg1, arg2 uuid.UUID, arg3 []uuid.UUID) ([]*mmodel.Balance, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListByAccountIDs", ctx, organizationID, ledgerID, ids) + ret := m.ctrl.Call(m, "ListByAccountIDs", arg0, arg1, arg2, arg3) ret0, _ := ret[0].([]*mmodel.Balance) ret1, _ := ret[1].(error) return ret0, ret1 } // ListByAccountIDs indicates an expected call of ListByAccountIDs. -func (mr *MockRepositoryMockRecorder) ListByAccountIDs(ctx, organizationID, ledgerID, ids any) *gomock.Call { +func (mr *MockRepositoryMockRecorder) ListByAccountIDs(arg0, arg1, arg2, arg3 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListByAccountIDs", reflect.TypeOf((*MockRepository)(nil).ListByAccountIDs), ctx, organizationID, ledgerID, ids) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListByAccountIDs", reflect.TypeOf((*MockRepository)(nil).ListByAccountIDs), arg0, arg1, arg2, arg3) } // ListByAliases mocks base method. -func (m *MockRepository) ListByAliases(ctx context.Context, organizationID, ledgerID uuid.UUID, aliases []string) ([]*mmodel.Balance, error) { +func (m *MockRepository) ListByAliases(arg0 context.Context, arg1, arg2 uuid.UUID, arg3 []string) ([]*mmodel.Balance, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListByAliases", ctx, organizationID, ledgerID, aliases) + ret := m.ctrl.Call(m, "ListByAliases", arg0, arg1, arg2, arg3) ret0, _ := ret[0].([]*mmodel.Balance) ret1, _ := ret[1].(error) return ret0, ret1 } // ListByAliases indicates an expected call of ListByAliases. -func (mr *MockRepositoryMockRecorder) ListByAliases(ctx, organizationID, ledgerID, aliases any) *gomock.Call { +func (mr *MockRepositoryMockRecorder) ListByAliases(arg0, arg1, arg2, arg3 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListByAliases", reflect.TypeOf((*MockRepository)(nil).ListByAliases), ctx, organizationID, ledgerID, aliases) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListByAliases", reflect.TypeOf((*MockRepository)(nil).ListByAliases), arg0, arg1, arg2, arg3) } // SelectForUpdate mocks base method. -func (m *MockRepository) SelectForUpdate(ctx context.Context, organizationID, ledgerID uuid.UUID, balances []*mmodel.Balance) error { +func (m *MockRepository) SelectForUpdate(arg0 context.Context, arg1, arg2 uuid.UUID, arg3 map[string]model.Amount, arg4 string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SelectForUpdate", ctx, organizationID, ledgerID, balances) + ret := m.ctrl.Call(m, "SelectForUpdate", arg0, arg1, arg2, arg3, arg4) ret0, _ := ret[0].(error) return ret0 } // SelectForUpdate indicates an expected call of SelectForUpdate. -func (mr *MockRepositoryMockRecorder) SelectForUpdate(ctx, organizationID, ledgerID, balances any) *gomock.Call { +func (mr *MockRepositoryMockRecorder) SelectForUpdate(arg0, arg1, arg2, arg3, arg4 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SelectForUpdate", reflect.TypeOf((*MockRepository)(nil).SelectForUpdate), ctx, organizationID, ledgerID, balances) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SelectForUpdate", reflect.TypeOf((*MockRepository)(nil).SelectForUpdate), arg0, arg1, arg2, arg3, arg4) } // Update mocks base method. -func (m *MockRepository) Update(ctx context.Context, organizationID, ledgerID, id uuid.UUID, balance mmodel.UpdateBalance) error { +func (m *MockRepository) Update(arg0 context.Context, arg1, arg2, arg3 uuid.UUID, arg4 mmodel.UpdateBalance) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Update", ctx, organizationID, ledgerID, id, balance) + ret := m.ctrl.Call(m, "Update", arg0, arg1, arg2, arg3, arg4) ret0, _ := ret[0].(error) return ret0 } // Update indicates an expected call of Update. -func (mr *MockRepositoryMockRecorder) Update(ctx, organizationID, ledgerID, id, balance any) *gomock.Call { +func (mr *MockRepositoryMockRecorder) Update(arg0, arg1, arg2, arg3, arg4 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockRepository)(nil).Update), ctx, organizationID, ledgerID, id, balance) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockRepository)(nil).Update), arg0, arg1, arg2, arg3, arg4) } diff --git a/components/transaction/internal/adapters/postgres/balance/balance.postgresql.go b/components/transaction/internal/adapters/postgres/balance/balance.postgresql.go index e62b23f4..f531da4e 100644 --- a/components/transaction/internal/adapters/postgres/balance/balance.postgresql.go +++ b/components/transaction/internal/adapters/postgres/balance/balance.postgresql.go @@ -6,6 +6,7 @@ import ( "errors" "github.com/LerianStudio/midaz/pkg" "github.com/LerianStudio/midaz/pkg/constant" + goldModel "github.com/LerianStudio/midaz/pkg/gold/transaction/model" "github.com/LerianStudio/midaz/pkg/mmodel" "github.com/LerianStudio/midaz/pkg/mopentelemetry" "github.com/LerianStudio/midaz/pkg/mpointers" @@ -30,7 +31,7 @@ type Repository interface { ListAllByAccountID(ctx context.Context, organizationID, ledgerID, accountID uuid.UUID, filter http.Pagination) ([]*mmodel.Balance, http.CursorPagination, error) ListByAccountIDs(ctx context.Context, organizationID, ledgerID uuid.UUID, ids []uuid.UUID) ([]*mmodel.Balance, error) ListByAliases(ctx context.Context, organizationID, ledgerID uuid.UUID, aliases []string) ([]*mmodel.Balance, error) - SelectForUpdate(ctx context.Context, organizationID, ledgerID uuid.UUID, balances []*mmodel.Balance) error + SelectForUpdate(ctx context.Context, organizationID, ledgerID uuid.UUID, fromTo map[string]goldModel.Amount, operation string) error Update(ctx context.Context, organizationID, ledgerID, id uuid.UUID, balance mmodel.UpdateBalance) error Delete(ctx context.Context, organizationID, ledgerID, id uuid.UUID) error } @@ -487,7 +488,7 @@ func (r *BalancePostgreSQLRepository) ListByAliases(ctx context.Context, organiz } // SelectForUpdate a Balance entity into Postgresql. -func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organizationID, ledgerID uuid.UUID, balances []*mmodel.Balance) error { +func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organizationID, ledgerID uuid.UUID, fromTo map[string]goldModel.Amount, operation string) error { tracer := pkg.NewTracerFromContext(ctx) logger := pkg.NewLoggerFromContext(ctx) @@ -526,10 +527,10 @@ func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organ } }() - for _, blc := range balances { - query := "SELECT * FROM balance WHERE organization_id = $1 AND ledger_id = $2 AND id = $3 AND version = $4 AND deleted_at IS NULL FOR UPDATE" + for key := range fromTo { + query := "SELECT * FROM balance WHERE organization_id = $1 AND ledger_id = $2 AND alias = $3 AND deleted_at IS NULL FOR UPDATE" - row := tx.QueryRowContext(ctx, query, organizationID, ledgerID, blc.ID, blc.Version) + row := tx.QueryRowContext(ctx, query, organizationID, ledgerID, key) var balance BalancePostgreSQLModel err = row.Scan( @@ -552,20 +553,44 @@ func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organ ) if err != nil { - mopentelemetry.HandleSpanError(&span, "register not found", err) if errors.Is(err, sql.ErrNoRows) { - - logger.Errorf("register not found for ID %s", blc.ID) + logger.Errorf("registro não encontrado para ID %s", key) return err } - logger.Errorf("err on select for update: %v", err) + logger.Errorf("erro no select for update: %v", err) return err } + calculateBalances := goldModel.OperateBalances(fromTo[key], + goldModel.Balance{ + Scale: balance.Scale, + Available: balance.Available, + OnHold: balance.OnHold, + }, + operation) + + blc := mmodel.Balance{ + ID: balance.ID, + Alias: balance.Alias, + OrganizationID: balance.OrganizationID, + LedgerID: balance.LedgerID, + AssetCode: balance.AssetCode, + Available: calculateBalances.Available, + Scale: calculateBalances.Scale, + OnHold: calculateBalances.OnHold, + AllowSending: balance.AllowSending, + AllowReceiving: balance.AllowReceiving, + AccountType: balance.AccountType, + Version: balance.Version, + CreatedAt: balance.CreatedAt, + UpdatedAt: balance.UpdatedAt, + } + var updates []string + var args []any updates = append(updates, "available = $"+strconv.Itoa(len(args)+1)) diff --git a/components/transaction/internal/services/command/update-balance.go b/components/transaction/internal/services/command/update-balance.go index 33a2d400..bd226dd9 100644 --- a/components/transaction/internal/services/command/update-balance.go +++ b/components/transaction/internal/services/command/update-balance.go @@ -23,19 +23,16 @@ func (uc *UseCase) UpdateBalances(ctx context.Context, organizationID, ledgerID logger.Errorf("Failed to convert balances from struct to JSON string: %v", err.Error()) } - result := make(chan []*mmodel.Balance) - - var balancesToUpdate []*mmodel.Balance + err = uc.BalanceRepo.SelectForUpdate(ctxProcessBalances, organizationID, ledgerID, validate.From, constant.DEBIT) + if err != nil { + mopentelemetry.HandleSpanError(&spanUpdateBalances, "Failed to update balances on database", err) - go goldModel.UpdateBalances(constant.DEBIT, validate.From, balances, result) - rDebit := <-result - balancesToUpdate = append(balancesToUpdate, rDebit...) + logger.Error("Failed to update balances on database", err.Error()) - go goldModel.UpdateBalances(constant.CREDIT, validate.To, balances, result) - rCredit := <-result - balancesToUpdate = append(balancesToUpdate, rCredit...) + return err + } - err = uc.BalanceRepo.SelectForUpdate(ctxProcessBalances, organizationID, ledgerID, balancesToUpdate) + err = uc.BalanceRepo.SelectForUpdate(ctxProcessBalances, organizationID, ledgerID, validate.To, constant.CREDIT) if err != nil { mopentelemetry.HandleSpanError(&spanUpdateBalances, "Failed to update balances on database", err) From 17b02aa091be7b17077bdeeffd538f791bad2de6 Mon Sep 17 00:00:00 2001 From: MartinezAvellan Date: Thu, 20 Feb 2025 12:56:58 +0100 Subject: [PATCH 3/4] feature: add acid to select for update; --- .../adapters/postgres/balance/balance.mock.go | 2 +- .../postgres/balance/balance.postgresql.go | 62 +++++++++---------- .../services/command/update-balance.go | 24 ++++--- pkg/gold/transaction/model/transaction.go | 7 ++- 4 files changed, 50 insertions(+), 45 deletions(-) diff --git a/components/transaction/internal/adapters/postgres/balance/balance.mock.go b/components/transaction/internal/adapters/postgres/balance/balance.mock.go index 9ea3aa24..4fe4ce41 100644 --- a/components/transaction/internal/adapters/postgres/balance/balance.mock.go +++ b/components/transaction/internal/adapters/postgres/balance/balance.mock.go @@ -149,7 +149,7 @@ func (mr *MockRepositoryMockRecorder) ListByAliases(arg0, arg1, arg2, arg3 any) } // SelectForUpdate mocks base method. -func (m *MockRepository) SelectForUpdate(arg0 context.Context, arg1, arg2 uuid.UUID, arg3 map[string]model.Amount, arg4 string) error { +func (m *MockRepository) SelectForUpdate(arg0 context.Context, arg1, arg2 uuid.UUID, arg3 []string, arg4 map[string]model.Amount) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SelectForUpdate", arg0, arg1, arg2, arg3, arg4) ret0, _ := ret[0].(error) diff --git a/components/transaction/internal/adapters/postgres/balance/balance.postgresql.go b/components/transaction/internal/adapters/postgres/balance/balance.postgresql.go index f531da4e..b2413263 100644 --- a/components/transaction/internal/adapters/postgres/balance/balance.postgresql.go +++ b/components/transaction/internal/adapters/postgres/balance/balance.postgresql.go @@ -31,7 +31,7 @@ type Repository interface { ListAllByAccountID(ctx context.Context, organizationID, ledgerID, accountID uuid.UUID, filter http.Pagination) ([]*mmodel.Balance, http.CursorPagination, error) ListByAccountIDs(ctx context.Context, organizationID, ledgerID uuid.UUID, ids []uuid.UUID) ([]*mmodel.Balance, error) ListByAliases(ctx context.Context, organizationID, ledgerID uuid.UUID, aliases []string) ([]*mmodel.Balance, error) - SelectForUpdate(ctx context.Context, organizationID, ledgerID uuid.UUID, fromTo map[string]goldModel.Amount, operation string) error + SelectForUpdate(ctx context.Context, organizationID, ledgerID uuid.UUID, aliases []string, fromTo map[string]goldModel.Amount) error Update(ctx context.Context, organizationID, ledgerID, id uuid.UUID, balance mmodel.UpdateBalance) error Delete(ctx context.Context, organizationID, ledgerID, id uuid.UUID) error } @@ -488,7 +488,7 @@ func (r *BalancePostgreSQLRepository) ListByAliases(ctx context.Context, organiz } // SelectForUpdate a Balance entity into Postgresql. -func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organizationID, ledgerID uuid.UUID, fromTo map[string]goldModel.Amount, operation string) error { +func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organizationID, ledgerID uuid.UUID, aliases []string, fromTo map[string]goldModel.Amount) error { tracer := pkg.NewTracerFromContext(ctx) logger := pkg.NewLoggerFromContext(ctx) @@ -527,13 +527,22 @@ func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organ } }() - for key := range fromTo { - query := "SELECT * FROM balance WHERE organization_id = $1 AND ledger_id = $2 AND alias = $3 AND deleted_at IS NULL FOR UPDATE" + var balances []BalancePostgreSQLModel - row := tx.QueryRowContext(ctx, query, organizationID, ledgerID, key) + query := "SELECT * FROM balance WHERE organization_id = $1 AND ledger_id = $2 AND alias = ANY($3) AND deleted_at IS NULL FOR UPDATE" + rows, err := tx.QueryContext(ctx, query, organizationID, ledgerID, aliases) + if err != nil { + mopentelemetry.HandleSpanError(&span, "Failed to execute query", err) + + logger.Errorf("Failed to execute query: %v - err: %v", query, err) + return err + } + defer rows.Close() + + for rows.Next() { var balance BalancePostgreSQLModel - err = row.Scan( + if err := rows.Scan( &balance.ID, &balance.OrganizationID, &balance.LedgerID, @@ -550,11 +559,9 @@ func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organ &balance.CreatedAt, &balance.UpdatedAt, &balance.DeletedAt, - ) - - if err != nil { + ); err != nil { if errors.Is(err, sql.ErrNoRows) { - logger.Errorf("registro não encontrado para ID %s", key) + logger.Errorf("register not found") return err } @@ -564,50 +571,37 @@ func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organ return err } - calculateBalances := goldModel.OperateBalances(fromTo[key], + balances = append(balances, balance) + } + + for _, balance := range balances { + calculateBalances := goldModel.OperateBalances(fromTo[balance.Alias], goldModel.Balance{ Scale: balance.Scale, Available: balance.Available, OnHold: balance.OnHold, }, - operation) - - blc := mmodel.Balance{ - ID: balance.ID, - Alias: balance.Alias, - OrganizationID: balance.OrganizationID, - LedgerID: balance.LedgerID, - AssetCode: balance.AssetCode, - Available: calculateBalances.Available, - Scale: calculateBalances.Scale, - OnHold: calculateBalances.OnHold, - AllowSending: balance.AllowSending, - AllowReceiving: balance.AllowReceiving, - AccountType: balance.AccountType, - Version: balance.Version, - CreatedAt: balance.CreatedAt, - UpdatedAt: balance.UpdatedAt, - } + fromTo[balance.Alias].Operation) var updates []string var args []any updates = append(updates, "available = $"+strconv.Itoa(len(args)+1)) - args = append(args, blc.Available) + args = append(args, calculateBalances.Available) updates = append(updates, "on_hold = $"+strconv.Itoa(len(args)+1)) - args = append(args, blc.OnHold) + args = append(args, calculateBalances.OnHold) updates = append(updates, "scale = $"+strconv.Itoa(len(args)+1)) - args = append(args, blc.Scale) + args = append(args, calculateBalances.Scale) updates = append(updates, "version = $"+strconv.Itoa(len(args)+1)) - version := blc.Version + 1 + version := balance.Version + 1 args = append(args, version) updates = append(updates, "updated_at = $"+strconv.Itoa(len(args)+1)) - args = append(args, time.Now(), organizationID, ledgerID, blc.ID) + args = append(args, time.Now(), organizationID, ledgerID, balance.ID) queryUpdate := `UPDATE balance SET ` + strings.Join(updates, ", ") + ` WHERE organization_id = $` + strconv.Itoa(len(args)-2) + diff --git a/components/transaction/internal/services/command/update-balance.go b/components/transaction/internal/services/command/update-balance.go index bd226dd9..b68310bb 100644 --- a/components/transaction/internal/services/command/update-balance.go +++ b/components/transaction/internal/services/command/update-balance.go @@ -23,16 +23,26 @@ func (uc *UseCase) UpdateBalances(ctx context.Context, organizationID, ledgerID logger.Errorf("Failed to convert balances from struct to JSON string: %v", err.Error()) } - err = uc.BalanceRepo.SelectForUpdate(ctxProcessBalances, organizationID, ledgerID, validate.From, constant.DEBIT) - if err != nil { - mopentelemetry.HandleSpanError(&spanUpdateBalances, "Failed to update balances on database", err) - - logger.Error("Failed to update balances on database", err.Error()) + fromTo := make(map[string]goldModel.Amount) + for k, v := range validate.From { + fromTo[k] = goldModel.Amount{ + Asset: v.Asset, + Value: v.Value, + Scale: v.Scale, + Operation: constant.DEBIT, + } + } - return err + for k, v := range validate.To { + fromTo[k] = goldModel.Amount{ + Asset: v.Asset, + Value: v.Value, + Scale: v.Scale, + Operation: constant.CREDIT, + } } - err = uc.BalanceRepo.SelectForUpdate(ctxProcessBalances, organizationID, ledgerID, validate.To, constant.CREDIT) + err = uc.BalanceRepo.SelectForUpdate(ctxProcessBalances, organizationID, ledgerID, validate.Aliases, fromTo) if err != nil { mopentelemetry.HandleSpanError(&spanUpdateBalances, "Failed to update balances on database", err) diff --git a/pkg/gold/transaction/model/transaction.go b/pkg/gold/transaction/model/transaction.go index 2d8adc44..67e3d0e4 100644 --- a/pkg/gold/transaction/model/transaction.go +++ b/pkg/gold/transaction/model/transaction.go @@ -34,9 +34,10 @@ type Metadata struct { // swagger:model Amount // @Description Amount is the struct designed to represent the amount of an operation. type Amount struct { - Asset string `json:"asset,omitempty" validate:"required" example:"BRL"` - Value int64 `json:"value,omitempty" validate:"required" example:"1000"` - Scale int64 `json:"scale,omitempty" validate:"gte=0" example:"2"` + Asset string `json:"asset,omitempty" validate:"required" example:"BRL"` + Value int64 `json:"value,omitempty" validate:"required" example:"1000"` + Scale int64 `json:"scale,omitempty" validate:"gte=0" example:"2"` + Operation string `json:"operation,omitempty"` } // @name Amount // Share structure for marshaling/unmarshalling JSON. From 5d83b0dff59175d323b9da77681efa6a3ae453fb Mon Sep 17 00:00:00 2001 From: MartinezAvellan Date: Thu, 20 Feb 2025 13:12:59 +0100 Subject: [PATCH 4/4] fix: make lint and make sec; :bug: --- .../internal/adapters/postgres/balance/balance.postgresql.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/components/transaction/internal/adapters/postgres/balance/balance.postgresql.go b/components/transaction/internal/adapters/postgres/balance/balance.postgresql.go index b2413263..98be3395 100644 --- a/components/transaction/internal/adapters/postgres/balance/balance.postgresql.go +++ b/components/transaction/internal/adapters/postgres/balance/balance.postgresql.go @@ -530,6 +530,7 @@ func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organ var balances []BalancePostgreSQLModel query := "SELECT * FROM balance WHERE organization_id = $1 AND ledger_id = $2 AND alias = ANY($3) AND deleted_at IS NULL FOR UPDATE" + rows, err := tx.QueryContext(ctx, query, organizationID, ledgerID, aliases) if err != nil { mopentelemetry.HandleSpanError(&span, "Failed to execute query", err) @@ -538,6 +539,7 @@ func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organ return err } + defer rows.Close() for rows.Next() { @@ -621,6 +623,7 @@ func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organ rowsAffected, err := result.RowsAffected() if err != nil || rowsAffected == 0 { mopentelemetry.HandleSpanError(&span, "Err or zero rows affected", err) + if err == nil { err = sql.ErrNoRows }