diff --git a/devops/aws/histoflux/overlays/prod-base/deployment-patch.yaml b/devops/aws/histoflux/overlays/prod-base/deployment-patch.yaml index 587dfd66..9c806c0a 100644 --- a/devops/aws/histoflux/overlays/prod-base/deployment-patch.yaml +++ b/devops/aws/histoflux/overlays/prod-base/deployment-patch.yaml @@ -8,7 +8,7 @@ spec: serviceAccountName: secrets-access-sa containers: - name: histoflux - image: ghcr.io/0xintuition/histoflux:1.0.32 + image: ghcr.io/0xintuition/histoflux:1.0.36 imagePullPolicy: Always volumeMounts: - name: secrets-store-inline diff --git a/devops/aws/services/graphql/overlays/prod-base-sepolia/indexer-migrations.yaml b/devops/aws/services/graphql/overlays/prod-base-sepolia/indexer-migrations.yaml index 806c27b9..aa1ed4cb 100644 --- a/devops/aws/services/graphql/overlays/prod-base-sepolia/indexer-migrations.yaml +++ b/devops/aws/services/graphql/overlays/prod-base-sepolia/indexer-migrations.yaml @@ -8,7 +8,7 @@ spec: serviceAccountName: secrets-access-sa containers: - name: indexer-and-cache-migration - image: ghcr.io/0xintuition/indexer-and-cache-migrations:1.0.32 + image: ghcr.io/0xintuition/indexer-and-cache-migrations:1.0.36 imagePullPolicy: Always volumeMounts: - name: secrets-store-inline diff --git a/histoflux/src/app_context.rs b/histoflux/src/app_context.rs index 29ee565a..d91c906c 100644 --- a/histoflux/src/app_context.rs +++ b/histoflux/src/app_context.rs @@ -196,9 +196,6 @@ impl SqsProducer { let mut listener = PgListener::connect(&self.env.database_url).await?; listener.listen("raw_logs_channel").await?; - // Get current timestamp before processing historical - let start_time = chrono::Utc::now(); - info!("Start pulling historical records"); self.process_historical_records().await?; @@ -209,7 +206,7 @@ impl SqsProducer { info!("Waiting for notifications"); match listener.recv().await { Ok(notification) => { - self.process_notification(notification, start_time).await?; + self.process_notification(notification).await?; } Err(e) => { // Log the error but continue the loop @@ -226,30 +223,28 @@ impl SqsProducer { async fn process_notification( &self, notification: PgNotification, - start_time: chrono::DateTime, ) -> Result<(), HistoFluxError> { info!("Processing notification: {:?}", notification); let payload: NotificationPayload = serde_json::from_str(notification.payload())?; info!("Payload: {:?}", payload); - if payload.raw_log.block_timestamp < start_time.timestamp() { - // Convert numeric fields to strings if RawLog expects them as strings - let raw_log = RawLog::builder() - .gs_id(payload.raw_log.gs_id.to_string()) - .block_number(payload.raw_log.block_number) - .block_hash(payload.raw_log.block_hash) - .transaction_hash(payload.raw_log.transaction_hash) - .transaction_index(payload.raw_log.transaction_index) - .log_index(payload.raw_log.log_index) - .address(payload.raw_log.address) - .data(payload.raw_log.data) - .topics(payload.raw_log.topics) - .block_timestamp(payload.raw_log.block_timestamp) - .build(); - let message = serde_json::to_string(&raw_log)?; - self.send_message(message).await?; - info!("Sent message to SQS"); - } + // Convert numeric fields to strings if RawLog expects them as strings + let raw_log = RawLog::builder() + .gs_id(payload.raw_log.gs_id.to_string()) + .block_number(payload.raw_log.block_number) + .block_hash(payload.raw_log.block_hash) + .transaction_hash(payload.raw_log.transaction_hash) + .transaction_index(payload.raw_log.transaction_index) + .log_index(payload.raw_log.log_index) + .address(payload.raw_log.address) + .data(payload.raw_log.data) + .topics(payload.raw_log.topics) + .block_timestamp(payload.raw_log.block_timestamp) + .build(); + let message = serde_json::to_string(&raw_log)?; + self.send_message(message).await?; + info!("Sent message to SQS"); + Ok(()) } }