Skip to content

Commit

Permalink
More fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
leboiko committed Jan 20, 2025
1 parent 14dbe54 commit 3cbe319
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ spec:
serviceAccountName: secrets-access-sa
containers:
- name: histoflux
image: ghcr.io/0xintuition/histoflux:1.0.32
image: ghcr.io/0xintuition/histoflux:1.0.36
imagePullPolicy: Always
volumeMounts:
- name: secrets-store-inline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ spec:
serviceAccountName: secrets-access-sa
containers:
- name: indexer-and-cache-migration
image: ghcr.io/0xintuition/indexer-and-cache-migrations:1.0.32
image: ghcr.io/0xintuition/indexer-and-cache-migrations:1.0.36
imagePullPolicy: Always
volumeMounts:
- name: secrets-store-inline
Expand Down
41 changes: 18 additions & 23 deletions histoflux/src/app_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,6 @@ impl SqsProducer {
let mut listener = PgListener::connect(&self.env.database_url).await?;
listener.listen("raw_logs_channel").await?;

// Get current timestamp before processing historical
let start_time = chrono::Utc::now();

info!("Start pulling historical records");
self.process_historical_records().await?;

Expand All @@ -209,7 +206,7 @@ impl SqsProducer {
info!("Waiting for notifications");
match listener.recv().await {
Ok(notification) => {
self.process_notification(notification, start_time).await?;
self.process_notification(notification).await?;
}
Err(e) => {
// Log the error but continue the loop
Expand All @@ -226,30 +223,28 @@ impl SqsProducer {
async fn process_notification(
&self,
notification: PgNotification,
start_time: chrono::DateTime<chrono::Utc>,
) -> Result<(), HistoFluxError> {
info!("Processing notification: {:?}", notification);
let payload: NotificationPayload = serde_json::from_str(notification.payload())?;
info!("Payload: {:?}", payload);

if payload.raw_log.block_timestamp < start_time.timestamp() {
// Convert numeric fields to strings if RawLog expects them as strings
let raw_log = RawLog::builder()
.gs_id(payload.raw_log.gs_id.to_string())
.block_number(payload.raw_log.block_number)
.block_hash(payload.raw_log.block_hash)
.transaction_hash(payload.raw_log.transaction_hash)
.transaction_index(payload.raw_log.transaction_index)
.log_index(payload.raw_log.log_index)
.address(payload.raw_log.address)
.data(payload.raw_log.data)
.topics(payload.raw_log.topics)
.block_timestamp(payload.raw_log.block_timestamp)
.build();
let message = serde_json::to_string(&raw_log)?;
self.send_message(message).await?;
info!("Sent message to SQS");
}
// Convert numeric fields to strings if RawLog expects them as strings
let raw_log = RawLog::builder()
.gs_id(payload.raw_log.gs_id.to_string())
.block_number(payload.raw_log.block_number)
.block_hash(payload.raw_log.block_hash)
.transaction_hash(payload.raw_log.transaction_hash)
.transaction_index(payload.raw_log.transaction_index)
.log_index(payload.raw_log.log_index)
.address(payload.raw_log.address)
.data(payload.raw_log.data)
.topics(payload.raw_log.topics)
.block_timestamp(payload.raw_log.block_timestamp)
.build();
let message = serde_json::to_string(&raw_log)?;
self.send_message(message).await?;
info!("Sent message to SQS");

Ok(())
}
}
Expand Down

0 comments on commit 3cbe319

Please sign in to comment.