diff --git a/chain/ethereum/examples/firehose.rs b/chain/ethereum/examples/firehose.rs index a95faa39c4c..c6f2b29beeb 100644 --- a/chain/ethereum/examples/firehose.rs +++ b/chain/ethereum/examples/firehose.rs @@ -54,7 +54,7 @@ async fn main() -> Result<(), Error> { }, final_blocks_only: false, ..Default::default() - }) + }, &firehose::ConnectionHeaders::new()) .await { Ok(s) => s, diff --git a/graph/src/blockchain/firehose_block_ingestor.rs b/graph/src/blockchain/firehose_block_ingestor.rs index 163695dbf89..0a85a8bc3e8 100644 --- a/graph/src/blockchain/firehose_block_ingestor.rs +++ b/graph/src/blockchain/firehose_block_ingestor.rs @@ -199,7 +199,7 @@ where final_blocks_only: false, transforms: self.default_transforms.iter().map(|t| t.into()).collect(), ..Default::default() - }) + }, &firehose::ConnectionHeaders::new()) .await; match result { diff --git a/graph/src/blockchain/firehose_block_stream.rs b/graph/src/blockchain/firehose_block_stream.rs index f39e7861733..159eca7666b 100644 --- a/graph/src/blockchain/firehose_block_stream.rs +++ b/graph/src/blockchain/firehose_block_stream.rs @@ -203,6 +203,8 @@ fn stream_blocks>( debug!(&logger, "Going to check continuity of chain on first block"); } + let headers = firehose::ConnectionHeaders::new().with_deployment(deployment.clone()); + // Back off exponentially whenever we encounter a connection error or a stream with bad data let mut backoff = ExponentialBackoff::new(Duration::from_millis(500), Duration::from_secs(45)); @@ -240,7 +242,7 @@ fn stream_blocks>( } let mut connect_start = Instant::now(); - let req = endpoint.clone().stream_blocks(request); + let req = endpoint.clone().stream_blocks(request, &headers); let result = tokio::time::timeout(Duration::from_secs(120), req).await.map_err(|x| x.into()).and_then(|x| x); match result { diff --git a/graph/src/blockchain/substreams_block_stream.rs b/graph/src/blockchain/substreams_block_stream.rs index 33d9dca1ed3..ef55165f790 100644 --- a/graph/src/blockchain/substreams_block_stream.rs +++ b/graph/src/blockchain/substreams_block_stream.rs @@ -4,6 +4,7 @@ use super::block_stream::{ use super::client::ChainClient; use crate::blockchain::block_stream::{BlockStream, BlockStreamEvent}; use crate::blockchain::Blockchain; +use crate::firehose::ConnectionHeaders; use crate::prelude::*; use crate::substreams::Modules; use crate::substreams_rpc::{ModulesProgress, Request, Response}; @@ -174,6 +175,8 @@ fn stream_blocks>( let stop_block_num = manifest_end_block_num as u64; + let headers = ConnectionHeaders::new().with_deployment(deployment.clone()); + // Back off exponentially whenever we encounter a connection error or a stream with bad data let mut backoff = ExponentialBackoff::new(Duration::from_millis(500), Duration::from_secs(45)); @@ -203,7 +206,7 @@ fn stream_blocks>( }; - let result = endpoint.clone().substreams(request).await; + let result = endpoint.clone().substreams(request, &headers).await; match result { Ok(stream) => { diff --git a/graph/src/firehose/endpoints.rs b/graph/src/firehose/endpoints.rs index 4756c49f722..db1e6cda623 100644 --- a/graph/src/firehose/endpoints.rs +++ b/graph/src/firehose/endpoints.rs @@ -7,7 +7,7 @@ use crate::{ data::value::Word, endpoint::{ConnectionType, EndpointMetrics, Provider, RequestLabels}, firehose::decode_firehose_block, - prelude::{anyhow, debug, info}, + prelude::{anyhow, debug, info, DeploymentHash}, substreams_rpc, }; @@ -26,8 +26,9 @@ use std::{ }; use tonic::codegen::InterceptedService; use tonic::{ + Request, codegen::CompressionEncoding, - metadata::{Ascii, MetadataValue}, + metadata::{Ascii, MetadataKey, MetadataValue}, transport::{Channel, ClientTlsConfig}, }; @@ -53,6 +54,28 @@ pub struct FirehoseEndpoint { channel: Channel, } +#[derive(Debug)] +pub struct ConnectionHeaders(HashMap, MetadataValue>); + +impl ConnectionHeaders { + pub fn new() -> Self { + Self(HashMap::new()) + } + pub fn with_deployment(mut self, deployment: DeploymentHash) -> Self { + if let Ok(deployment) = deployment.parse() { + self.0.insert("x-deployment-id".parse().unwrap(), deployment); + } + self + } + pub fn add_to_request(&self, request: T) -> Request { + let mut request = Request::new(request); + self.0.iter().for_each(|(k, v)| { + request.metadata_mut().insert(k, v.clone()); + }); + request + } +} + #[derive(Clone, Debug, PartialEq, Ord, Eq, PartialOrd)] pub enum AvailableCapacity { Unavailable, @@ -406,8 +429,10 @@ impl FirehoseEndpoint { pub async fn stream_blocks( self: Arc, request: firehose::Request, + headers: &ConnectionHeaders, ) -> Result, anyhow::Error> { let mut client = self.new_stream_client(); + let request = headers.add_to_request(request); let response_stream = client.blocks(request).await?; let block_stream = response_stream.into_inner(); @@ -417,8 +442,10 @@ impl FirehoseEndpoint { pub async fn substreams( self: Arc, request: substreams_rpc::Request, + headers: &ConnectionHeaders, ) -> Result, anyhow::Error> { let mut client = self.new_substreams_client(); + let request = headers.add_to_request(request); let response_stream = client.blocks(request).await?; let block_stream = response_stream.into_inner();