Skip to content

Commit

Permalink
graphman config check providers (#5517)
Browse files Browse the repository at this point in the history
* graphman config check providers

* graphman chain update genesis
  • Loading branch information
mangas authored Jul 15, 2024
1 parent 0530ce1 commit e3aad48
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 15 deletions.
5 changes: 1 addition & 4 deletions chain/ethereum/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,7 @@ impl EthereumNetworkAdapters {
&self,
required_capabilities: &NodeCapabilities,
) -> impl Iterator<Item = &EthereumNetworkAdapter> + '_ {
let all = self
.manager
.get_all_unverified(&self.chain_id)
.unwrap_or_default();
let all = self.manager.get_all_unverified(&self.chain_id);

Self::available_with_capabilities(all, required_capabilities)
}
Expand Down
15 changes: 10 additions & 5 deletions graph/src/components/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ struct Ident {
chain_id: ChainId,
}

#[derive(Error, Debug, Clone)]
#[derive(Error, Debug, Clone, PartialEq)]
pub enum IdentValidatorError {
#[error("database error: {0}")]
UnknownError(String),
Expand All @@ -89,6 +89,12 @@ pub enum IdentValidatorError {

impl From<anyhow::Error> for IdentValidatorError {
fn from(value: anyhow::Error) -> Self {
Self::from(&value)
}
}

impl From<&anyhow::Error> for IdentValidatorError {
fn from(value: &anyhow::Error) -> Self {
IdentValidatorError::UnknownError(value.to_string())
}
}
Expand Down Expand Up @@ -308,13 +314,12 @@ impl<T: NetIdentifiable + Clone + 'static> ProviderManager<T> {
/// adapters that failed verification. For the most part this should be fine since ideally
/// get_all would have been used before. Nevertheless, it is possible that a misconfigured
/// adapter is returned from this list even after validation.
pub fn get_all_unverified(&self, chain_id: &ChainId) -> Result<Vec<&T>, ProviderManagerError> {
Ok(self
.inner
pub fn get_all_unverified(&self, chain_id: &ChainId) -> Vec<&T> {
self.inner
.adapters
.get(chain_id)
.map(|v| v.iter().map(|v| &v.1).collect())
.unwrap_or_default())
.unwrap_or_default()
}

/// get_all will trigger the verification of the endpoints for the provided chain_id, hence the
Expand Down
46 changes: 46 additions & 0 deletions node/src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use clap::{Parser, Subcommand};
use config::PoolSize;
use git_testament::{git_testament, render_testament};
use graph::bail;
use graph::blockchain::BlockHash;
use graph::cheap_clone::CheapClone;
use graph::components::adapter::ChainId;
use graph::endpoint::EndpointMetrics;
use graph::env::ENV_VARS;
use graph::log::logger_with_levels;
Expand Down Expand Up @@ -33,6 +35,7 @@ use graph_store_postgres::{
SubscriptionManager, PRIMARY_SHARD,
};
use lazy_static::lazy_static;
use std::str::FromStr;
use std::{collections::HashMap, num::ParseIntError, sync::Arc, time::Duration};
const VERSION_LABEL_KEY: &str = "version";

Expand Down Expand Up @@ -435,6 +438,11 @@ pub enum ConfigCommand {
features: String,
network: String,
},

/// Compare the NetIdentifier of all defined adapters with the existing
/// identifiers on the ChainStore.
CheckProviders {},

/// Show subgraph-specific settings
///
/// GRAPH_EXPERIMENTAL_SUBGRAPH_SETTINGS can add a file that contains
Expand Down Expand Up @@ -547,6 +555,16 @@ pub enum ChainCommand {
force: bool,
},

/// Update the genesis block hash for a chain
UpdateGenesis {
#[clap(long, short)]
force: bool,
#[clap(value_parser = clap::builder::NonEmptyStringValueParser::new())]
block_hash: String,
#[clap(value_parser = clap::builder::NonEmptyStringValueParser::new())]
chain_name: String,
},

/// Change the block cache shard for a chain
ChangeShard {
/// Chain name (must be an existing chain, see 'chain list')
Expand Down Expand Up @@ -1149,6 +1167,11 @@ async fn main() -> anyhow::Result<()> {
use ConfigCommand::*;

match cmd {
CheckProviders {} => {
let store = ctx.store().block_store();
let networks = ctx.networks(store.cheap_clone()).await?;
Ok(commands::config::check_provider_genesis(&networks, store).await)
}
Place { name, network } => {
commands::config::place(&ctx.config.deployment, &name, &network)
}
Expand Down Expand Up @@ -1326,6 +1349,29 @@ async fn main() -> anyhow::Result<()> {
shard,
)
}

UpdateGenesis {
force,
block_hash,
chain_name,
} => {
let store_builder = ctx.store_builder().await;
let store = ctx.store().block_store();
let networks = ctx.networks(store.cheap_clone()).await?;
let chain_id = ChainId::from(chain_name);
let block_hash = BlockHash::from_str(&block_hash)?;
commands::chain::update_chain_genesis(
&networks,
store_builder.coord.cheap_clone(),
store,
&logger,
chain_id,
block_hash,
force,
)
.await
}

CheckBlocks { method, chain_name } => {
use commands::check_blocks::{by_hash, by_number, by_range};
use CheckBlockMethod::*;
Expand Down
54 changes: 54 additions & 0 deletions node/src/manager/commands/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@ use std::sync::Arc;
use diesel::sql_query;
use diesel::Connection;
use diesel::RunQueryDsl;
use graph::blockchain::BlockHash;
use graph::blockchain::BlockPtr;
use graph::blockchain::ChainIdentifier;
use graph::cheap_clone::CheapClone;
use graph::components::adapter::ChainId;
use graph::components::adapter::IdentValidator;
use graph::components::store::StoreError;
use graph::prelude::BlockNumber;
use graph::prelude::ChainStore as _;
use graph::prelude::{anyhow, anyhow::bail};
use graph::slog::Logger;
use graph::{components::store::BlockStore as _, prelude::anyhow::Error};
use graph_store_postgres::add_chain;
use graph_store_postgres::connection_pool::PoolCoordinator;
use graph_store_postgres::find_chain;
use graph_store_postgres::update_chain_name;
use graph_store_postgres::BlockStore;
Expand All @@ -21,6 +27,8 @@ use graph_store_postgres::{
command_support::catalog::block_store, connection_pool::ConnectionPool,
};

use crate::network_setup::Networks;

pub async fn list(primary: ConnectionPool, store: Arc<BlockStore>) -> Result<(), Error> {
let mut chains = {
let mut conn = primary.get()?;
Expand Down Expand Up @@ -148,6 +156,52 @@ pub fn remove(primary: ConnectionPool, store: Arc<BlockStore>, name: String) ->
Ok(())
}

pub async fn update_chain_genesis(
networks: &Networks,
coord: Arc<PoolCoordinator>,
store: Arc<BlockStore>,
logger: &Logger,
chain_id: ChainId,
genesis_hash: BlockHash,
force: bool,
) -> Result<(), Error> {
let ident = networks.chain_identifier(logger, &chain_id).await?;
if !genesis_hash.eq(&ident.genesis_block_hash) {
println!(
"Expected adapter for chain {} to return genesis hash {} but got {}",
chain_id, genesis_hash, ident.genesis_block_hash
);
if !force {
println!("Not performing update");
return Ok(());
} else {
println!("--force used, updating anyway");
}
}

println!("Updating shard...");
// Update the local shard's genesis, whether or not it is the primary.
// The chains table is replicated from the primary and keeps another genesis hash.
// To keep those in sync we need to update the primary and then refresh the shard tables.
store.update_ident(
&chain_id,
&ChainIdentifier {
net_version: ident.net_version.clone(),
genesis_block_hash: genesis_hash,
},
)?;

// Update the primary public.chains
println!("Updating primary public.chains");
store.set_chain_identifier(chain_id, &ident)?;

// Refresh the new values
println!("Refresh mappings");
crate::manager::commands::database::remap(&coord, None, None, false).await?;

Ok(())
}

pub fn change_block_cache_shard(
primary_store: ConnectionPool,
store: Arc<BlockStore>,
Expand Down
34 changes: 32 additions & 2 deletions node/src/manager/commands/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{collections::BTreeMap, sync::Arc};
use graph::{
anyhow::{bail, Context},
components::{
adapter::{ChainId, MockIdentValidator},
adapter::{ChainId, IdentValidator, IdentValidatorError, MockIdentValidator, ProviderName},
subgraph::{Setting, Settings},
},
endpoint::EndpointMetrics,
Expand All @@ -16,10 +16,40 @@ use graph::{
slog::Logger,
};
use graph_chain_ethereum::NodeCapabilities;
use graph_store_postgres::DeploymentPlacer;
use graph_store_postgres::{BlockStore, DeploymentPlacer};

use crate::{config::Config, network_setup::Networks};

/// Compare the NetIdentifier of all defined adapters with the existing
/// identifiers on the ChainStore. If a ChainStore doesn't exist it will be show
/// as an error. It's intended to be run again an environment that has already
/// been setup by graph-node.
pub async fn check_provider_genesis(networks: &Networks, store: Arc<BlockStore>) {
println!("Checking providers");
for (chain_id, ids) in networks.all_chain_identifiers().await.into_iter() {
let (_oks, errs): (Vec<_>, Vec<_>) = ids
.into_iter()
.map(|(provider, id)| {
id.map_err(IdentValidatorError::from)
.and_then(|id| store.check_ident(chain_id, &id))
.map_err(|e| (provider, e))
})
.partition_result();
let errs = errs
.into_iter()
.dedup_by(|e1, e2| e1.eq(e2))
.collect::<Vec<(ProviderName, IdentValidatorError)>>();

if errs.is_empty() {
println!("chain_id: {}: status: OK", chain_id);
continue;
}

println!("chain_id: {}: status: NOK", chain_id);
println!("errors: {:?}", errs);
}
}

pub fn place(placer: &dyn DeploymentPlacer, name: &str, network: &str) -> Result<(), Error> {
match placer.place(name, network).map_err(|s| anyhow!(s))? {
None => {
Expand Down
41 changes: 39 additions & 2 deletions node/src/network_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use graph::{
blockchain::{Blockchain, BlockchainKind, BlockchainMap, ChainIdentifier},
cheap_clone::CheapClone,
components::{
adapter::{ChainId, IdentValidator, MockIdentValidator, NetIdentifiable, ProviderManager},
adapter::{
ChainId, IdentValidator, MockIdentValidator, NetIdentifiable, ProviderManager,
ProviderName,
},
metrics::MetricsRegistry,
},
endpoint::EndpointMetrics,
Expand Down Expand Up @@ -131,6 +134,40 @@ impl Networks {
}
}

/// Gets the chain identifier from all providers for every chain.
/// This function is intended for checking the status of providers and
/// whether they match their store counterparts more than for general
/// graph-node use. It may trigger verification (which would add delays on hot paths)
/// and it will also make calls on potentially unveried providers (this means the providers
/// have not been checked for correct net_version and genesis block hash)
pub async fn all_chain_identifiers(
&self,
) -> Vec<(
&ChainId,
Vec<(ProviderName, Result<ChainIdentifier, anyhow::Error>)>,
)> {
let mut out = vec![];
for chain_id in self.adapters.iter().map(|a| a.chain_id()).sorted().dedup() {
let mut inner = vec![];
for adapter in self.rpc_provider_manager.get_all_unverified(chain_id) {
inner.push((adapter.provider_name(), adapter.net_identifiers().await));
}
for adapter in self.firehose_provider_manager.get_all_unverified(chain_id) {
inner.push((adapter.provider_name(), adapter.net_identifiers().await));
}
for adapter in self
.substreams_provider_manager
.get_all_unverified(chain_id)
{
inner.push((adapter.provider_name(), adapter.net_identifiers().await));
}

out.push((chain_id, inner));
}

out
}

pub async fn chain_identifier(
&self,
logger: &Logger,
Expand All @@ -142,7 +179,7 @@ impl Networks {
chain_id: &ChainId,
provider_type: &str,
) -> Result<ChainIdentifier> {
for adapter in pm.get_all_unverified(chain_id).unwrap_or_default() {
for adapter in pm.get_all_unverified(chain_id) {
match adapter.net_identifiers().await {
Ok(ident) => return Ok(ident),
Err(err) => {
Expand Down
Loading

0 comments on commit e3aad48

Please sign in to comment.