Skip to content

Commit

Permalink
fix adapter startup (#5503)
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas authored Jun 21, 2024
1 parent 224a29a commit 7b4fff2
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 83 deletions.
12 changes: 9 additions & 3 deletions chain/substreams/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use graph::blockchain::{
BasicBlockchainBuilder, BlockIngestor, BlockTime, EmptyNodeCapabilities, NoopDecoderHook,
NoopRuntimeAdapter,
};
use graph::components::adapter::ChainId;
use graph::components::store::DeploymentCursorTracker;
use graph::env::EnvVars;
use graph::prelude::{BlockHash, CheapClone, Entity, LoggerFactory, MetricsRegistry};
Expand Down Expand Up @@ -66,6 +67,7 @@ impl blockchain::Block for Block {
pub struct Chain {
chain_store: Arc<dyn ChainStore>,
block_stream_builder: Arc<dyn BlockStreamBuilder<Self>>,
chain_id: ChainId,

pub(crate) logger_factory: LoggerFactory,
pub(crate) client: Arc<ChainClient<Self>>,
Expand All @@ -79,13 +81,15 @@ impl Chain {
metrics_registry: Arc<MetricsRegistry>,
chain_store: Arc<dyn ChainStore>,
block_stream_builder: Arc<dyn BlockStreamBuilder<Self>>,
chain_id: ChainId,
) -> Self {
Self {
logger_factory,
client: chain_client,
metrics_registry,
chain_store,
block_stream_builder,
chain_id,
}
}
}
Expand Down Expand Up @@ -192,8 +196,9 @@ impl Blockchain for Chain {
Ok(Box::new(SubstreamsBlockIngestor::new(
self.chain_store.cheap_clone(),
self.client.cheap_clone(),
self.logger_factory.component_logger("", None),
"substreams".into(),
self.logger_factory
.component_logger("SubstreamsBlockIngestor", None),
self.chain_id.clone(),
self.metrics_registry.cheap_clone(),
)))
}
Expand All @@ -204,7 +209,7 @@ impl blockchain::BlockchainBuilder<super::Chain> for BasicBlockchainBuilder {
async fn build(self, _config: &Arc<EnvVars>) -> Chain {
let BasicBlockchainBuilder {
logger_factory,
name: _,
name,
chain_store,
firehose_endpoints,
metrics_registry,
Expand All @@ -216,6 +221,7 @@ impl blockchain::BlockchainBuilder<super::Chain> for BasicBlockchainBuilder {
logger_factory,
client: Arc::new(ChainClient::new_firehose(firehose_endpoints)),
metrics_registry,
chain_id: name,
}
}
}
195 changes: 118 additions & 77 deletions node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use ethereum::ProviderEthRpcMetrics;
use graph::anyhow::bail;
use graph::blockchain::client::ChainClient;
use graph::blockchain::{
BasicBlockchainBuilder, Blockchain as _, BlockchainBuilder as _, BlockchainKind, BlockchainMap,
BasicBlockchainBuilder, Blockchain, BlockchainBuilder as _, BlockchainKind, BlockchainMap,
ChainIdentifier,
};
use graph::cheap_clone::CheapClone;
Expand Down Expand Up @@ -318,7 +318,6 @@ pub async fn create_ethereum_networks_for_chain(
ProviderDetails::Web3Call(web3) => (web3, true),
ProviderDetails::Web3(web3) => (web3, false),
_ => {
// parsed_networks.insert_empty(network_name.to_string());
continue;
}
};
Expand Down Expand Up @@ -391,6 +390,15 @@ pub async fn create_ethereum_networks_for_chain(
}))
}

/// Networks as chains will create the necessary chains from the adapter information.
/// There are two major cases that are handled currently:
/// Deep integration chains (explicitly defined on the graph-node like Ethereum, Near, etc):
/// - These can have adapter of any type. Adapters of firehose and rpc types are used by the Chain implementation, aka deep integration
/// - The substreams adapters will trigger the creation of a Substreams chain, the priority for the block ingestor setup depends on the chain, if enabled at all.
/// Substreams Chain(chains the graph-node knows nothing about and are only accessible through substreams):
/// - This chain type is more generic and can only have adapters of substreams type.
/// - Substreams chain are created as a "secondary" chain for deep integrations but in that case the block ingestor should be run by the main/deep integration chain.
/// - These chains will use SubstreamsBlockIngestor by default.
pub async fn networks_as_chains(
config: &Arc<EnvVars>,
blockchain_map: &mut BlockchainMap,
Expand All @@ -405,30 +413,21 @@ pub async fn networks_as_chains(
let adapters = networks
.adapters
.iter()
.sorted_by_key(|a| a.chain_id())
.chunk_by(|a| a.chain_id())
.into_iter()
.map(|(chain_id, adapters)| (chain_id, adapters.into_iter().collect_vec()))
.collect_vec();

let substreams: Vec<&FirehoseAdapterConfig> = networks
.adapters
.iter()
.flat_map(|a| a.as_substreams())
.collect();

let chains = adapters.into_iter().map(|(chain_id, adapters)| {
let adapters: Vec<&AdapterConfiguration> = adapters.into_iter().collect();
let kind = adapters
.iter()
.map(|a| a.kind())
.reduce(|a1, a2| match (a1, a2) {
(BlockchainKind::Substreams, k) => k,
(k, BlockchainKind::Substreams) => k,
(k, _) => k,
})
.first()
.map(|a| a.blockchain_kind())
.expect("validation should have checked we have at least one provider");
(chain_id, adapters, kind)
});

for (chain_id, adapters, kind) in chains.into_iter() {
let chain_store = match store.chain_store(chain_id) {
Some(c) => c,
Expand All @@ -443,6 +442,36 @@ pub async fn networks_as_chains(
}
};

async fn add_substreams<C: Blockchain>(
networks: &Networks,
config: &Arc<EnvVars>,
chain_id: ChainId,
blockchain_map: &mut BlockchainMap,
logger_factory: LoggerFactory,
chain_store: Arc<dyn ChainStore>,
metrics_registry: Arc<MetricsRegistry>,
) {
let substreams_endpoints = networks.substreams_endpoints(chain_id.clone());
if substreams_endpoints.len() == 0 {
return;
}

blockchain_map.insert::<graph_chain_substreams::Chain>(
chain_id.clone(),
Arc::new(
BasicBlockchainBuilder {
logger_factory: logger_factory.clone(),
name: chain_id.clone(),
chain_store,
metrics_registry: metrics_registry.clone(),
firehose_endpoints: substreams_endpoints,
}
.build(config)
.await,
),
);
}

match kind {
BlockchainKind::Arweave => {
let firehose_endpoints = networks.firehose_endpoints(chain_id.clone());
Expand All @@ -453,14 +482,25 @@ pub async fn networks_as_chains(
BasicBlockchainBuilder {
logger_factory: logger_factory.clone(),
name: chain_id.clone(),
chain_store,
chain_store: chain_store.cheap_clone(),
firehose_endpoints,
metrics_registry: metrics_registry.clone(),
}
.build(config)
.await,
),
);

add_substreams::<graph_chain_arweave::Chain>(
networks,
config,
chain_id.clone(),
blockchain_map,
logger_factory.clone(),
chain_store,
metrics_registry.clone(),
)
.await;
}
BlockchainKind::Ethereum => {
// polling interval is set per chain so if set all adapter configuration will have
Expand Down Expand Up @@ -510,6 +550,17 @@ pub async fn networks_as_chains(

blockchain_map
.insert::<graph_chain_ethereum::Chain>(chain_id.clone(), Arc::new(chain));

add_substreams::<graph_chain_ethereum::Chain>(
networks,
config,
chain_id.clone(),
blockchain_map,
logger_factory.clone(),
chain_store,
metrics_registry.clone(),
)
.await;
}
BlockchainKind::Near => {
let firehose_endpoints = networks.firehose_endpoints(chain_id.clone());
Expand All @@ -519,14 +570,25 @@ pub async fn networks_as_chains(
BasicBlockchainBuilder {
logger_factory: logger_factory.clone(),
name: chain_id.clone(),
chain_store,
chain_store: chain_store.cheap_clone(),
firehose_endpoints,
metrics_registry: metrics_registry.clone(),
}
.build(config)
.await,
),
);

add_substreams::<graph_chain_near::Chain>(
networks,
config,
chain_id.clone(),
blockchain_map,
logger_factory.clone(),
chain_store,
metrics_registry.clone(),
)
.await;
}
BlockchainKind::Cosmos => {
let firehose_endpoints = networks.firehose_endpoints(chain_id.clone());
Expand All @@ -536,14 +598,24 @@ pub async fn networks_as_chains(
BasicBlockchainBuilder {
logger_factory: logger_factory.clone(),
name: chain_id.clone(),
chain_store,
chain_store: chain_store.cheap_clone(),
firehose_endpoints,
metrics_registry: metrics_registry.clone(),
}
.build(config)
.await,
),
);
add_substreams::<graph_chain_cosmos::Chain>(
networks,
config,
chain_id.clone(),
blockchain_map,
logger_factory.clone(),
chain_store,
metrics_registry.clone(),
)
.await;
}
BlockchainKind::Starknet => {
let firehose_endpoints = networks.firehose_endpoints(chain_id.clone());
Expand All @@ -553,75 +625,44 @@ pub async fn networks_as_chains(
BasicBlockchainBuilder {
logger_factory: logger_factory.clone(),
name: chain_id.clone(),
chain_store,
chain_store: chain_store.cheap_clone(),
firehose_endpoints,
metrics_registry: metrics_registry.clone(),
}
.build(config)
.await,
),
);
add_substreams::<graph_chain_starknet::Chain>(
networks,
config,
chain_id.clone(),
blockchain_map,
logger_factory.clone(),
chain_store,
metrics_registry.clone(),
)
.await;
}
BlockchainKind::Substreams => {
let substreams_endpoints = networks.substreams_endpoints(chain_id.clone());
blockchain_map.insert::<graph_chain_substreams::Chain>(
chain_id.clone(),
Arc::new(
BasicBlockchainBuilder {
logger_factory: logger_factory.clone(),
name: chain_id.clone(),
chain_store,
metrics_registry: metrics_registry.clone(),
firehose_endpoints: substreams_endpoints,
}
.build(config)
.await,
),
);
}
BlockchainKind::Substreams => {}
}
}

fn chain_store(
blockchain_map: &BlockchainMap,
kind: &BlockchainKind,
network: ChainId,
) -> anyhow::Result<Arc<dyn ChainStore>> {
let chain_store: Arc<dyn ChainStore> = match kind {
BlockchainKind::Arweave => blockchain_map
.get::<graph_chain_arweave::Chain>(network)
.map(|c| c.chain_store())?,
BlockchainKind::Ethereum => blockchain_map
.get::<graph_chain_ethereum::Chain>(network)
.map(|c| c.chain_store())?,
BlockchainKind::Near => blockchain_map
.get::<graph_chain_near::Chain>(network)
.map(|c| c.chain_store())?,
BlockchainKind::Cosmos => blockchain_map
.get::<graph_chain_cosmos::Chain>(network)
.map(|c| c.chain_store())?,
BlockchainKind::Substreams => blockchain_map
.get::<graph_chain_substreams::Chain>(network)
.map(|c| c.chain_store())?,
BlockchainKind::Starknet => blockchain_map
.get::<graph_chain_starknet::Chain>(network)
.map(|c| c.chain_store())?,
};

Ok(chain_store)
}

for FirehoseAdapterConfig {
chain_id,
kind,
adapters: _,
} in substreams.iter()
{
let chain_store = chain_store(&blockchain_map, kind, chain_id.clone()).expect(&format!(
"{} requires an rpc or firehose endpoint defined",
chain_id
));
let substreams_endpoints = networks.substreams_endpoints(chain_id.clone());

blockchain_map.insert::<graph_chain_substreams::Chain>(
chain_id.clone(),
Arc::new(
BasicBlockchainBuilder {
logger_factory: logger_factory.clone(),
name: chain_id.clone(),
chain_store,
firehose_endpoints: substreams_endpoints,
metrics_registry: metrics_registry.clone(),
}
.build(config)
.await,
),
);
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 7b4fff2

Please sign in to comment.