Skip to content

Commit

Permalink
Merge pull request #531 from LerianStudio/feature/MIDAZ-530
Browse files Browse the repository at this point in the history
Feature/MIDAZ-530
  • Loading branch information
MartinezAvellan authored Feb 20, 2025
2 parents 73f2ec3 + cc32b57 commit 33fbca5
Show file tree
Hide file tree
Showing 12 changed files with 99 additions and 102 deletions.
2 changes: 1 addition & 1 deletion components/audit/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ RABBITMQ_PORT_HOST=3003
RABBITMQ_PORT_AMPQ=3004
RABBITMQ_DEFAULT_USER=audit
RABBITMQ_DEFAULT_PASS=lerian
RABBITMQ_QUEUE=audit_queue
RABBITMQ_QUEUE=audit.append_log.queue

# TRILLIAN
TRILLIAN_DATABASE_NAME=audit-db
Expand Down
14 changes: 4 additions & 10 deletions components/audit/internal/adapters/rabbitmq/consumer.rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,14 @@ func (cr *ConsumerRoutes) RunConsumers() error {

err := handlerFunc(ctx, msg.Body)
if err != nil {
cr.Logger.Errorf("Error processing message from queue %s: %v", queue, err)
cr.Logger.Errorf("Error processing message, resend messages to queue %s: %v", queue, err)

if nackErr := msg.Nack(false, true); nackErr != nil {
cr.Logger.Errorf("Error nack message from queue %s: %v", queue, nackErr)
}
_ = msg.Nack(false, true)

cr.Logger.Fatalf("Error processing message from queue %s: %v", queue, err)

return
continue
}

if ackErr := msg.Ack(false); ackErr != nil {
cr.Logger.Errorf("Error ack message from queue %s: %v", queue, ackErr)
}
_ = msg.Ack(false)
}
}(queueName, handler)
}
Expand Down
7 changes: 4 additions & 3 deletions components/audit/internal/bootstrap/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"github.com/LerianStudio/midaz/pkg/mmodel"
"github.com/LerianStudio/midaz/pkg/mopentelemetry"
"os"

"github.com/LerianStudio/midaz/components/audit/internal/adapters/rabbitmq"
"github.com/LerianStudio/midaz/components/audit/internal/services"
Expand All @@ -25,7 +26,7 @@ func NewMultiQueueConsumer(routes *rabbitmq.ConsumerRoutes, useCase *services.Us
}

// Registry handlers for each queue
routes.Register("audit_queue", consumer.handleAuditQueue)
routes.Register(os.Getenv("RABBITMQ_QUEUE"), consumer.handleAuditQueue)

return consumer
}
Expand All @@ -35,15 +36,15 @@ func (mq *MultiQueueConsumer) Run(l *pkg.Launcher) error {
return mq.consumerRoutes.RunConsumers()
}

// handleAuditQueue process messages from "audit_queue".
// handleAuditQueue process messages from queue.
func (mq *MultiQueueConsumer) handleAuditQueue(ctx context.Context, body []byte) error {
logger := pkg.NewLoggerFromContext(ctx)
tracer := pkg.NewTracerFromContext(ctx)

ctx, span := tracer.Start(ctx, "consumer.handleAuditQueue")
defer span.End()

logger.Info("Processing message from audit_queue")
logger.Info("Processing message from queue")

var message mmodel.Queue

Expand Down
30 changes: 15 additions & 15 deletions components/infra/rabbitmq/etc/definitions.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,62 +62,62 @@
],
"queues": [
{
"name": "audit_queue",
"name": "audit.append_log.queue",
"vhost": "/",
"durable": true
},
{
"name": "transaction_balance_queue",
"name": "transaction.balance_create.queue",
"vhost": "/",
"durable": true
},
{
"name": "balance_retry_queue",
"name": "transaction.transaction_balance_operation.queue",
"vhost": "/",
"durable": true
}
],
"exchanges": [
{
"name": "audit_exchange",
"name": "audit.append_log.exchange",
"vhost": "/",
"type": "direct",
"durable": true
},
{
"name": "transaction_balance_exchange",
"name": "transaction.balance_create.exchange",
"vhost": "/",
"type": "direct",
"durable": true
},
{
"name": "balance_retry_queue_exchange",
"name": "transaction.transaction_balance_operation.exchange",
"vhost": "/",
"type": "direct",
"durable": true
}
],
"bindings": [
{
"source": "audit_exchange",
"source": "audit.append_log.exchange",
"vhost": "/",
"destination": "audit_queue",
"destination": "audit.append_log.queue",
"destination_type": "queue",
"routing_key": "audit_key"
"routing_key": "audit.append_log.key"
},
{
"source": "transaction_balance_exchange",
"source": "transaction.balance_create.exchange",
"vhost": "/",
"destination": "transaction_balance_queue",
"destination": "transaction.balance_create.queue",
"destination_type": "queue",
"routing_key": "transaction_balance_key"
"routing_key": "transaction.balance_create.key"
},
{
"source": "balance_retry_queue_exchange",
"source": "transaction.transaction_balance_operation.exchange",
"vhost": "/",
"destination": "balance_retry_queue",
"destination": "transaction.transaction_balance_operation.queue",
"destination_type": "queue",
"routing_key": "balance_retry_queue_key"
"routing_key": "transaction.transaction_balance_operation.key"
}
]
}
4 changes: 2 additions & 2 deletions components/onboarding/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ RABBITMQ_PORT_HOST=3003
RABBITMQ_PORT_AMPQ=3004
RABBITMQ_DEFAULT_USER=onboarding
RABBITMQ_DEFAULT_PASS=lerian
RABBITMQ_EXCHANGE=transaction_balance_exchange
RABBITMQ_KEY=transaction_balance_key
RABBITMQ_EXCHANGE=transaction.balance_create.exchange
RABBITMQ_KEY=transaction.balance_create.key

# SWAGGER
SWAGGER_TITLE=Onboarding API
Expand Down
12 changes: 6 additions & 6 deletions components/transaction/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ RABBITMQ_PORT_HOST=3003
RABBITMQ_PORT_AMPQ=3004
RABBITMQ_DEFAULT_USER=transaction
RABBITMQ_DEFAULT_PASS=lerian
RABBITMQ_AUDIT_EXCHANGE=audit_exchange
RABBITMQ_AUDIT_KEY=audit_key
RABBITMQ_QUEUE=transaction_balance_queue
RABBITMQ_BALANCE_RETRY_EXCHANGE=balance_retry_queue_exchange
RABBITMQ_BALANCE_RETRY_KEY=balance_retry_queue_key
RABBITMQ_BALANCE_RETRY_QUEUE=balance_retry_queue
RABBITMQ_AUDIT_EXCHANGE=audit.append_log.exchange
RABBITMQ_AUDIT_KEY=audit.append_log.key
RABBITMQ_BALANCE_CREATE_QUEUE=transaction.balance_create.queue
RABBITMQ_TRANSACTION_BALANCE_OPERATION_EXCHANGE=transaction.transaction_balance_operation.exchange
RABBITMQ_TRANSACTION_BALANCE_OPERATION_KEY=transaction.transaction_balance_operation.key
RABBITMQ_TRANSACTION_BALANCE_OPERATION_QUEUE=transaction.transaction_balance_operation.queue

# SWAGGER
SWAGGER_TITLE=Transaction API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,13 +563,12 @@ func (r *BalancePostgreSQLRepository) SelectForUpdate(ctx context.Context, organ
args = append(args, version)

updates = append(updates, "updated_at = $"+strconv.Itoa(len(args)+1))
args = append(args, time.Now(), organizationID, ledgerID, blc.ID, blc.Version)
args = append(args, time.Now(), organizationID, ledgerID, blc.ID)

queryUpdate := `UPDATE balance SET ` + strings.Join(updates, ", ") +
` WHERE organization_id = $` + strconv.Itoa(len(args)-3) +
` AND ledger_id = $` + strconv.Itoa(len(args)-2) +
` AND id = $` + strconv.Itoa(len(args)-1) +
` AND version = $` + strconv.Itoa(len(args)) +
` WHERE organization_id = $` + strconv.Itoa(len(args)-2) +
` AND ledger_id = $` + strconv.Itoa(len(args)-1) +
` AND id = $` + strconv.Itoa(len(args)) +
` AND deleted_at IS NULL`

result, err := tx.ExecContext(ctx, queryUpdate, args...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,14 @@ func (cr *ConsumerRoutes) RunConsumers() error {

err := handlerFunc(ctx, msg.Body)
if err != nil {
cr.Logger.Errorf("Error processing message from queue %s: %v", queue, err)
cr.Logger.Errorf("Error processing message, resend messages to queue %s: %v", queue, err)

if nackErr := msg.Nack(false, true); nackErr != nil {
cr.Logger.Errorf("Error nack message from queue %s: %v", queue, nackErr)
}
_ = msg.Nack(false, true)

cr.Logger.Errorf("Error processing message from queue %s: %v", queue, err)

return
continue
}

if ackErr := msg.Ack(false); ackErr != nil {
cr.Logger.Errorf("Error ack message from queue %s: %v", queue, ackErr)
}
_ = msg.Ack(false)
}
}(queueName, handler)
}
Expand Down
88 changes: 44 additions & 44 deletions components/transaction/internal/bootstrap/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,48 +26,48 @@ const ApplicationName = "transaction"

// Config is the top level configuration struct for the entire application.
type Config struct {
EnvName string `env:"ENV_NAME"`
LogLevel string `env:"LOG_LEVEL"`
ServerAddress string `env:"SERVER_ADDRESS"`
PrimaryDBHost string `env:"DB_HOST"`
PrimaryDBUser string `env:"DB_USER"`
PrimaryDBPassword string `env:"DB_PASSWORD"`
PrimaryDBName string `env:"DB_NAME"`
PrimaryDBPort string `env:"DB_PORT"`
ReplicaDBHost string `env:"DB_REPLICA_HOST"`
ReplicaDBUser string `env:"DB_REPLICA_USER"`
ReplicaDBPassword string `env:"DB_REPLICA_PASSWORD"`
ReplicaDBName string `env:"DB_REPLICA_NAME"`
ReplicaDBPort string `env:"DB_REPLICA_PORT"`
MongoURI string `env:"MONGO_URI"`
MongoDBHost string `env:"MONGO_HOST"`
MongoDBName string `env:"MONGO_NAME"`
MongoDBUser string `env:"MONGO_USER"`
MongoDBPassword string `env:"MONGO_PASSWORD"`
MongoDBPort string `env:"MONGO_PORT"`
CasdoorAddress string `env:"CASDOOR_ADDRESS"`
CasdoorClientID string `env:"CASDOOR_CLIENT_ID"`
CasdoorClientSecret string `env:"CASDOOR_CLIENT_SECRET"`
CasdoorOrganizationName string `env:"CASDOOR_ORGANIZATION_NAME"`
CasdoorApplicationName string `env:"CASDOOR_APPLICATION_NAME"`
CasdoorModelName string `env:"CASDOOR_MODEL_NAME"`
JWKAddress string `env:"CASDOOR_JWK_ADDRESS"`
RabbitURI string `env:"RABBITMQ_URI"`
RabbitMQHost string `env:"RABBITMQ_HOST"`
RabbitMQPortHost string `env:"RABBITMQ_PORT_HOST"`
RabbitMQPortAMQP string `env:"RABBITMQ_PORT_AMPQ"`
RabbitMQUser string `env:"RABBITMQ_DEFAULT_USER"`
RabbitMQPass string `env:"RABBITMQ_DEFAULT_PASS"`
RabbitMQQueue string `env:"RABBITMQ_QUEUE"`
OtelServiceName string `env:"OTEL_RESOURCE_SERVICE_NAME"`
OtelLibraryName string `env:"OTEL_LIBRARY_NAME"`
OtelServiceVersion string `env:"OTEL_RESOURCE_SERVICE_VERSION"`
OtelDeploymentEnv string `env:"OTEL_RESOURCE_DEPLOYMENT_ENVIRONMENT"`
OtelColExporterEndpoint string `env:"OTEL_EXPORTER_OTLP_ENDPOINT"`
RedisHost string `env:"REDIS_HOST"`
RedisPort string `env:"REDIS_PORT"`
RedisUser string `env:"REDIS_USER"`
RedisPassword string `env:"REDIS_PASSWORD"`
EnvName string `env:"ENV_NAME"`
LogLevel string `env:"LOG_LEVEL"`
ServerAddress string `env:"SERVER_ADDRESS"`
PrimaryDBHost string `env:"DB_HOST"`
PrimaryDBUser string `env:"DB_USER"`
PrimaryDBPassword string `env:"DB_PASSWORD"`
PrimaryDBName string `env:"DB_NAME"`
PrimaryDBPort string `env:"DB_PORT"`
ReplicaDBHost string `env:"DB_REPLICA_HOST"`
ReplicaDBUser string `env:"DB_REPLICA_USER"`
ReplicaDBPassword string `env:"DB_REPLICA_PASSWORD"`
ReplicaDBName string `env:"DB_REPLICA_NAME"`
ReplicaDBPort string `env:"DB_REPLICA_PORT"`
MongoURI string `env:"MONGO_URI"`
MongoDBHost string `env:"MONGO_HOST"`
MongoDBName string `env:"MONGO_NAME"`
MongoDBUser string `env:"MONGO_USER"`
MongoDBPassword string `env:"MONGO_PASSWORD"`
MongoDBPort string `env:"MONGO_PORT"`
CasdoorAddress string `env:"CASDOOR_ADDRESS"`
CasdoorClientID string `env:"CASDOOR_CLIENT_ID"`
CasdoorClientSecret string `env:"CASDOOR_CLIENT_SECRET"`
CasdoorOrganizationName string `env:"CASDOOR_ORGANIZATION_NAME"`
CasdoorApplicationName string `env:"CASDOOR_APPLICATION_NAME"`
CasdoorModelName string `env:"CASDOOR_MODEL_NAME"`
JWKAddress string `env:"CASDOOR_JWK_ADDRESS"`
RabbitURI string `env:"RABBITMQ_URI"`
RabbitMQHost string `env:"RABBITMQ_HOST"`
RabbitMQPortHost string `env:"RABBITMQ_PORT_HOST"`
RabbitMQPortAMQP string `env:"RABBITMQ_PORT_AMPQ"`
RabbitMQUser string `env:"RABBITMQ_DEFAULT_USER"`
RabbitMQPass string `env:"RABBITMQ_DEFAULT_PASS"`
RabbitMQBalanceCreateQueue string `env:"RABBITMQ_BALANCE_CREATE_QUEUE"`
OtelServiceName string `env:"OTEL_RESOURCE_SERVICE_NAME"`
OtelLibraryName string `env:"OTEL_LIBRARY_NAME"`
OtelServiceVersion string `env:"OTEL_RESOURCE_SERVICE_VERSION"`
OtelDeploymentEnv string `env:"OTEL_RESOURCE_DEPLOYMENT_ENVIRONMENT"`
OtelColExporterEndpoint string `env:"OTEL_EXPORTER_OTLP_ENDPOINT"`
RedisHost string `env:"REDIS_HOST"`
RedisPort string `env:"REDIS_PORT"`
RedisUser string `env:"REDIS_USER"`
RedisPassword string `env:"REDIS_PASSWORD"`
}

// InitServers initiate http and grpc servers.
Expand Down Expand Up @@ -132,7 +132,7 @@ func InitServers() *Service {
Port: cfg.RabbitMQPortAMQP,
User: cfg.RabbitMQUser,
Pass: cfg.RabbitMQPass,
Queue: cfg.RabbitMQQueue,
Queue: cfg.RabbitMQBalanceCreateQueue,
Logger: logger,
}

Expand Down Expand Up @@ -209,4 +209,4 @@ func InitServers() *Service {
MultiQueueConsumer: multiQueueConsumer,
Logger: logger,
}
}
}
4 changes: 2 additions & 2 deletions components/transaction/internal/bootstrap/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func NewMultiQueueConsumer(routes *rabbitmq.ConsumerRoutes, useCase *command.Use
}

// Registry handlers for each queue
routes.Register(os.Getenv("RABBITMQ_QUEUE"), consumer.handlerBalanceCreateQueue)
routes.Register(os.Getenv("RABBITMQ_BALANCE_RETRY_QUEUE"), consumer.handlerBTOQueue)
routes.Register(os.Getenv("RABBITMQ_BALANCE_CREATE_QUEUE"), consumer.handlerBalanceCreateQueue)
routes.Register(os.Getenv("RABBITMQ_TRANSACTION_BALANCE_OPERATION_QUEUE"), consumer.handlerBTOQueue)

return consumer
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (uc *UseCase) CreateBalanceTransactionOperationsAsync(ctx context.Context,
tracer := pkg.NewTracerFromContext(ctx)

var t transaction.TransactionQueue

for _, item := range data.QueueData {
logger.Infof("Unmarshal account ID: %v", item.ID.String())

Expand Down Expand Up @@ -125,3 +125,12 @@ func (uc *UseCase) CreateBalanceTransactionOperationsAsync(ctx context.Context,

return nil
}

func (uc *UseCase) CreateBTOAsync(ctx context.Context, data mmodel.Queue) {
logger := pkg.NewLoggerFromContext(ctx)

err := uc.CreateBalanceTransactionOperationsAsync(ctx, data)
if err != nil {
logger.Errorf("Failed to create balance transaction operations: %v", err)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ func (uc *UseCase) SendBTOExecuteAsync(ctx context.Context, organizationID, ledg

if _, err := uc.RabbitMQRepo.ProducerDefault(
ctxSendBTOQueue,
os.Getenv("RABBITMQ_BALANCE_RETRY_EXCHANGE"),
os.Getenv("RABBITMQ_BALANCE_RETRY_KEY"),
os.Getenv("RABBITMQ_TRANSACTION_BALANCE_OPERATION_EXCHANGE"),
os.Getenv("RABBITMQ_TRANSACTION_BALANCE_OPERATION_KEY"),
queueMessage,
); err != nil {
mopentelemetry.HandleSpanError(&spanSendBTOQueue, "Failed to send BTO to queue", err)

logger.Fatalf("Failed to send message: %s", err.Error())
logger.Errorf("Failed to send message: %s", err.Error())
}
}

0 comments on commit 33fbca5

Please sign in to comment.