Skip to content

Commit

Permalink
Reset substreams/firehose block ingestor backoff (#5047)
Browse files Browse the repository at this point in the history
* Reset substreams/firehose block ingestor backoff
fixes #4976

* fix cursor comparison
  • Loading branch information
YaroShkvorets authored Mar 15, 2024
1 parent 3c03b3f commit 1b94a73
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
9 changes: 7 additions & 2 deletions chain/substreams/src/block_ingestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,14 @@ impl BlockIngestor for SubstreamsBlockIngestor {
// If the error is retryable it will print the error and return the cursor
// therefore if we get an error here it has to be a fatal error.
// This is a bit brittle and should probably be improved at some point.
let res = self.process_blocks(latest_cursor, stream).await;
let res = self.process_blocks(latest_cursor.clone(), stream).await;
match res {
Ok(cursor) => latest_cursor = cursor,
Ok(cursor) => {
if cursor.as_ref() != latest_cursor.as_ref() {
backoff.reset();
latest_cursor = cursor;
}
}
Err(BlockStreamError::Fatal(e)) => {
error!(
self.logger,
Expand Down
6 changes: 5 additions & 1 deletion graph/src/blockchain/firehose_block_ingestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,11 @@ where
info!(logger, "Blockstream connected, consuming blocks");

// Consume the stream of blocks until an error is hit
latest_cursor = self.process_blocks(latest_cursor, stream).await
let cursor = self.process_blocks(latest_cursor.clone(), stream).await;
if cursor != latest_cursor {
backoff.reset();
latest_cursor = cursor;
}
}
Err(e) => {
error!(logger, "Unable to connect to endpoint: {:#}", e);
Expand Down

0 comments on commit 1b94a73

Please sign in to comment.