Skip to content

Commit

Permalink
add firehose connection headers
Browse files Browse the repository at this point in the history
  • Loading branch information
YaroShkvorets authored and leoyvens committed Apr 5, 2024
1 parent 4d2479c commit 4218250
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 6 deletions.
2 changes: 1 addition & 1 deletion chain/ethereum/examples/firehose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn main() -> Result<(), Error> {
},
final_blocks_only: false,
..Default::default()
})
}, &firehose::ConnectionHeaders::new())
.await
{
Ok(s) => s,
Expand Down
2 changes: 1 addition & 1 deletion graph/src/blockchain/firehose_block_ingestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ where
final_blocks_only: false,
transforms: self.default_transforms.iter().map(|t| t.into()).collect(),
..Default::default()
})
}, &firehose::ConnectionHeaders::new())
.await;

match result {
Expand Down
4 changes: 3 additions & 1 deletion graph/src/blockchain/firehose_block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ fn stream_blocks<C: Blockchain, F: FirehoseMapper<C>>(
debug!(&logger, "Going to check continuity of chain on first block");
}

let headers = firehose::ConnectionHeaders::new().with_deployment(deployment.clone());

// Back off exponentially whenever we encounter a connection error or a stream with bad data
let mut backoff = ExponentialBackoff::new(Duration::from_millis(500), Duration::from_secs(45));

Expand Down Expand Up @@ -240,7 +242,7 @@ fn stream_blocks<C: Blockchain, F: FirehoseMapper<C>>(
}

let mut connect_start = Instant::now();
let req = endpoint.clone().stream_blocks(request);
let req = endpoint.clone().stream_blocks(request, &headers);
let result = tokio::time::timeout(Duration::from_secs(120), req).await.map_err(|x| x.into()).and_then(|x| x);

match result {
Expand Down
5 changes: 4 additions & 1 deletion graph/src/blockchain/substreams_block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use super::block_stream::{
use super::client::ChainClient;
use crate::blockchain::block_stream::{BlockStream, BlockStreamEvent};
use crate::blockchain::Blockchain;
use crate::firehose::ConnectionHeaders;
use crate::prelude::*;
use crate::substreams::Modules;
use crate::substreams_rpc::{ModulesProgress, Request, Response};
Expand Down Expand Up @@ -174,6 +175,8 @@ fn stream_blocks<C: Blockchain, F: BlockStreamMapper<C>>(

let stop_block_num = manifest_end_block_num as u64;

let headers = ConnectionHeaders::new().with_deployment(deployment.clone());

// Back off exponentially whenever we encounter a connection error or a stream with bad data
let mut backoff = ExponentialBackoff::new(Duration::from_millis(500), Duration::from_secs(45));

Expand Down Expand Up @@ -203,7 +206,7 @@ fn stream_blocks<C: Blockchain, F: BlockStreamMapper<C>>(
};


let result = endpoint.clone().substreams(request).await;
let result = endpoint.clone().substreams(request, &headers).await;

match result {
Ok(stream) => {
Expand Down
31 changes: 29 additions & 2 deletions graph/src/firehose/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
data::value::Word,
endpoint::{ConnectionType, EndpointMetrics, Provider, RequestLabels},
firehose::decode_firehose_block,
prelude::{anyhow, debug, info},
prelude::{anyhow, debug, info, DeploymentHash},
substreams_rpc,
};

Expand All @@ -26,8 +26,9 @@ use std::{
};
use tonic::codegen::InterceptedService;
use tonic::{
Request,
codegen::CompressionEncoding,
metadata::{Ascii, MetadataValue},
metadata::{Ascii, MetadataKey, MetadataValue},
transport::{Channel, ClientTlsConfig},
};

Expand All @@ -53,6 +54,28 @@ pub struct FirehoseEndpoint {
channel: Channel,
}

#[derive(Debug)]
pub struct ConnectionHeaders(HashMap<MetadataKey<Ascii>, MetadataValue<Ascii>>);

impl ConnectionHeaders {
pub fn new() -> Self {
Self(HashMap::new())
}
pub fn with_deployment(mut self, deployment: DeploymentHash) -> Self {
if let Ok(deployment) = deployment.parse() {
self.0.insert("x-deployment-id".parse().unwrap(), deployment);
}
self
}
pub fn add_to_request<T>(&self, request: T) -> Request<T> {
let mut request = Request::new(request);
self.0.iter().for_each(|(k, v)| {
request.metadata_mut().insert(k, v.clone());
});
request
}
}

#[derive(Clone, Debug, PartialEq, Ord, Eq, PartialOrd)]
pub enum AvailableCapacity {
Unavailable,
Expand Down Expand Up @@ -406,8 +429,10 @@ impl FirehoseEndpoint {
pub async fn stream_blocks(
self: Arc<Self>,
request: firehose::Request,
headers: &ConnectionHeaders,
) -> Result<tonic::Streaming<firehose::Response>, anyhow::Error> {
let mut client = self.new_stream_client();
let request = headers.add_to_request(request);
let response_stream = client.blocks(request).await?;
let block_stream = response_stream.into_inner();

Expand All @@ -417,8 +442,10 @@ impl FirehoseEndpoint {
pub async fn substreams(
self: Arc<Self>,
request: substreams_rpc::Request,
headers: &ConnectionHeaders,
) -> Result<tonic::Streaming<substreams_rpc::Response>, anyhow::Error> {
let mut client = self.new_substreams_client();
let request = headers.add_to_request(request);
let response_stream = client.blocks(request).await?;
let block_stream = response_stream.into_inner();

Expand Down

0 comments on commit 4218250

Please sign in to comment.