diff --git a/graph/src/env/mod.rs b/graph/src/env/mod.rs index 66ab846ab1a..4383ce17b5c 100644 --- a/graph/src/env/mod.rs +++ b/graph/src/env/mod.rs @@ -240,6 +240,13 @@ pub struct EnvVars { pub firehose_disable_extended_blocks_for_chains: Vec, pub block_write_capacity: usize, + + /// Set by the environment variable `GRAPH_FIREHOSE_FETCH_BLOCK_RETRY_LIMIT`. + /// The default value is 10. + pub firehose_block_fetch_retry_limit: usize, + /// Set by the environment variable `GRAPH_FIREHOSE_FETCH_BLOCK_TIMEOUT_SECS`. + /// The default value is 60 seconds. + pub firehose_block_fetch_timeout: u64, } impl EnvVars { @@ -330,6 +337,8 @@ impl EnvVars { inner.firehose_disable_extended_blocks_for_chains, ), block_write_capacity: inner.block_write_capacity.0, + firehose_block_fetch_retry_limit: inner.firehose_block_fetch_retry_limit, + firehose_block_fetch_timeout: inner.firehose_block_fetch_timeout, }) } @@ -493,6 +502,10 @@ struct Inner { firehose_disable_extended_blocks_for_chains: Option, #[envconfig(from = "GRAPH_NODE_BLOCK_WRITE_CAPACITY", default = "4_000_000_000")] block_write_capacity: NoUnderscores, + #[envconfig(from = "GRAPH_FIREHOSE_FETCH_BLOCK_RETRY_LIMIT", default = "10")] + firehose_block_fetch_retry_limit: usize, + #[envconfig(from = "GRAPH_FIREHOSE_FETCH_BLOCK_TIMEOUT_SECS", default = "60")] + firehose_block_fetch_timeout: u64, } #[derive(Clone, Debug)] diff --git a/graph/src/firehose/endpoints.rs b/graph/src/firehose/endpoints.rs index 45ff9c045ee..2c622daeb36 100644 --- a/graph/src/firehose/endpoints.rs +++ b/graph/src/firehose/endpoints.rs @@ -16,7 +16,7 @@ use async_trait::async_trait; use futures03::StreamExt; use http0::uri::{Scheme, Uri}; use itertools::Itertools; -use slog::{error, info, Logger}; +use slog::{error, info, trace, Logger}; use std::{collections::HashMap, fmt::Display, ops::ControlFlow, sync::Arc, time::Duration}; use tokio::sync::OnceCell; use tonic::codegen::InterceptedService; @@ -33,6 +33,7 @@ use crate::components::network_provider::NetworkDetails; use crate::components::network_provider::ProviderCheckStrategy; use crate::components::network_provider::ProviderManager; use crate::components::network_provider::ProviderName; +use crate::prelude::retry; /// This is constant because we found this magic number of connections after /// which the grpc connections start to hang. @@ -425,7 +426,7 @@ impl FirehoseEndpoint { } pub async fn load_blocks_by_numbers( - &self, + self: Arc, numbers: Vec, logger: &Logger, ) -> Result, anyhow::Error> @@ -435,21 +436,39 @@ impl FirehoseEndpoint { let mut blocks = Vec::with_capacity(numbers.len()); for number in numbers { - debug!( + let provider_name = self.provider.as_str(); + + trace!( logger, "Loading block for block number {}", number; - "provider" => self.provider.as_str(), + "provider" => provider_name, ); - match self.get_block_by_number::(number, logger).await { + let retry_log_message = format!("get_block_by_number for block {}", number); + let endpoint_for_retry = self.cheap_clone(); + + let logger_for_retry = logger.clone(); + let logger_for_error = logger.clone(); + + let block = retry(retry_log_message, &logger_for_retry) + .limit(ENV_VARS.firehose_block_fetch_retry_limit) + .timeout_secs(ENV_VARS.firehose_block_fetch_timeout) + .run(move || { + let e = endpoint_for_retry.cheap_clone(); + let l = logger_for_retry.clone(); + Box::pin(async move { e.get_block_by_number::(number, &l).await }) + }) + .await; + + match block { Ok(block) => { blocks.push(block); } Err(e) => { error!( - logger, + logger_for_error, "Failed to load block number {}: {}", number, e; - "provider" => self.provider.as_str(), + "provider" => provider_name, ); return Err(anyhow::format_err!( "failed to load block number {}: {}",