Skip to content

Commit

Permalink
graphman: Fix rewind for deployment with multiple names
Browse files Browse the repository at this point in the history
  • Loading branch information
incrypto32 committed Jun 20, 2024
1 parent 9b9b8f9 commit 3adb067
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 25 deletions.
8 changes: 4 additions & 4 deletions node/src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,16 +210,16 @@ pub enum Command {
sleep: Duration,
/// The block hash of the target block
#[clap(
required_unless_present = "start-block",
conflicts_with = "start-block",
required_unless_present = "start_block",
conflicts_with = "start_block",
long,
short = 'H'
)]
block_hash: Option<String>,
/// The block number of the target block
#[clap(
required_unless_present = "start-block",
conflicts_with = "start-block",
required_unless_present = "start_block",
conflicts_with = "start_block",
long,
short = 'n'
)]
Expand Down
53 changes: 32 additions & 21 deletions node/src/manager/commands/rewind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use std::time::Duration;
use std::{collections::HashSet, convert::TryFrom};

use crate::manager::commands::assign::pause_or_resume;
use crate::manager::deployment::{Deployment, DeploymentSearch};
use crate::manager::deployment::DeploymentSearch;
use graph::anyhow::bail;
use graph::components::store::{BlockStore as _, ChainStore as _};
use graph::components::store::{BlockStore as _, ChainStore as _, DeploymentLocator};
use graph::env::ENV_VARS;
use graph::prelude::{anyhow, BlockNumber, BlockPtr};
use graph_store_postgres::command_support::catalog::{self as store_catalog};
Expand All @@ -15,16 +15,20 @@ use graph_store_postgres::{BlockStore, NotificationSender};

async fn block_ptr(
store: Arc<BlockStore>,
searches: &[DeploymentSearch],
deployments: &[Deployment],
locators: &HashSet<(String, DeploymentLocator)>,
searches: &Vec<DeploymentSearch>,
hash: &str,
number: BlockNumber,
force: bool,
) -> Result<BlockPtr, anyhow::Error> {
let block_ptr_to = BlockPtr::try_from((hash, number as i64))
.map_err(|e| anyhow!("error converting to block pointer: {}", e))?;

let chains = deployments.iter().map(|d| &d.chain).collect::<HashSet<_>>();
let chains = locators
.iter()
.map(|(chain, _)| chain)
.collect::<HashSet<_>>();

if chains.len() > 1 {
let names = searches
.iter()
Expand All @@ -33,8 +37,10 @@ async fn block_ptr(
.join(", ");
bail!("the deployments matching `{names}` are on different chains");
}
let chain = chains.iter().next().unwrap();
let chain_store = match store.chain_store(chain) {

let chain = chains.iter().next().unwrap().to_string();

let chain_store = match store.chain_store(&chain) {
None => bail!("can not find chain store for {}", chain),
Some(store) => store,
};
Expand Down Expand Up @@ -78,19 +84,26 @@ pub async fn run(
let subgraph_store = store.subgraph_store();
let block_store = store.block_store();

let mut deployments = Vec::new();
let mut locators = HashSet::new();

for search in &searches {
let results = search.lookup(&primary)?;
if results.len() > 1 {

let deployment_locators: HashSet<(String, DeploymentLocator)> = results
.iter()
.map(|deployment| (deployment.chain.clone(), deployment.locator()))
.collect();

if deployment_locators.len() > 1 {
bail!(
"Multiple deployments found for the search : {}. Try using the id of the deployment (eg: sgd143) to uniquely identify the deployment.",
search
);
}
deployments.extend(results);
locators.extend(deployment_locators);
}

if deployments.is_empty() {
if locators.is_empty() {
println!("No deployments found");
return Ok(());
}
Expand All @@ -101,8 +114,8 @@ pub async fn run(
Some(
block_ptr(
block_store,
&locators,
&searches,
&deployments,
block_hash.as_deref().unwrap_or_default(),
block_number.unwrap_or_default(),
force,
Expand All @@ -112,8 +125,7 @@ pub async fn run(
};

println!("Checking if its safe to rewind deployments");
for deployment in &deployments {
let locator = &deployment.locator();
for (_, locator) in &locators {
let site = conn
.locate_site(locator.clone())?
.ok_or_else(|| anyhow!("failed to locate site for {locator}"))?;
Expand All @@ -133,8 +145,8 @@ pub async fn run(
}

println!("Pausing deployments");
for deployment in &deployments {
pause_or_resume(primary.clone(), &sender, &deployment.locator(), true)?;
for (_, locator) in &locators {
pause_or_resume(primary.clone(), &sender, &locator, true)?;
}

// There's no good way to tell that a subgraph has in fact stopped
Expand All @@ -146,15 +158,14 @@ pub async fn run(
thread::sleep(sleep);

println!("\nRewinding deployments");
for deployment in &deployments {
let loc = deployment.locator();
for (chain, loc) in &locators {
let block_store = store.block_store();
let deployment_details = subgraph_store.load_deployment_by_id(loc.clone().into())?;
let block_ptr_to = block_ptr_to.clone();

let start_block = deployment_details.start_block.or_else(|| {
block_store
.chain_store(&deployment.chain)
.chain_store(chain)
.and_then(|chain_store| chain_store.genesis_block_ptr().ok())
});

Expand All @@ -174,8 +185,8 @@ pub async fn run(
}

println!("Resuming deployments");
for deployment in &deployments {
pause_or_resume(primary.clone(), &sender, &deployment.locator(), false)?;
for (_, locator) in &locators {
pause_or_resume(primary.clone(), &sender, locator, false)?;
}
Ok(())
}

0 comments on commit 3adb067

Please sign in to comment.