Skip to content

Commit

Permalink
core, graph: add deployment_synced metric
Browse files Browse the repository at this point in the history
  • Loading branch information
isum committed Feb 12, 2025
1 parent 157c291 commit cd71601
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 0 deletions.
11 changes: 11 additions & 0 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ where
}

async fn run_inner(mut self, break_on_restart: bool) -> Result<Self, SubgraphRunnerError> {
self.update_deployment_synced_metric();

// If a subgraph failed for deterministic reasons, before start indexing, we first
// revert the deployment head. It should lead to the same result since the error was
// deterministic.
Expand Down Expand Up @@ -293,6 +295,8 @@ where
res
})?;

self.update_deployment_synced_metric();

// It is possible that the subgraph was unassigned, but the runner was in
// a retry delay state and did not observe the cancel signal.
if block_stream_cancel_handle.is_canceled() {
Expand Down Expand Up @@ -1231,6 +1235,13 @@ where

Ok((mods, processed_data_sources, persisted_data_sources))
}

fn update_deployment_synced_metric(&self) {
self.metrics
.subgraph
.deployment_synced
.record(self.inputs.store.is_deployment_synced());
}
}

#[derive(Debug)]
Expand Down
54 changes: 54 additions & 0 deletions graph/src/components/metrics/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct SubgraphInstanceMetrics {
pub firehose_connection_errors: Counter,
pub stopwatch: StopwatchMetrics,
pub deployment_status: DeploymentStatusMetric,
pub deployment_synced: DeploymentSyncedMetric,

trigger_processing_duration: Box<Histogram>,
blocks_processed_secs: Box<Counter>,
Expand Down Expand Up @@ -91,13 +92,16 @@ impl SubgraphInstanceMetrics {
)
.expect("failed to create blocks_processed_count counter");

let deployment_synced = DeploymentSyncedMetric::register(&registry, subgraph_hash);

Self {
block_trigger_count,
block_processing_duration,
block_ops_transaction_duration,
firehose_connection_errors,
stopwatch,
deployment_status,
deployment_synced,
trigger_processing_duration,
blocks_processed_secs,
blocks_processed_count,
Expand All @@ -120,6 +124,7 @@ impl SubgraphInstanceMetrics {
registry.unregister(self.block_trigger_count.clone());
registry.unregister(self.trigger_processing_duration.clone());
registry.unregister(self.block_ops_transaction_duration.clone());
registry.unregister(Box::new(self.deployment_synced.inner.clone()));
}
}

Expand Down Expand Up @@ -213,3 +218,52 @@ impl DeploymentStatusMetric {
self.inner.set(Self::STATUS_FAILED);
}
}

/// Indicates whether a deployment has reached the chain head since it was deployed.
pub struct DeploymentSyncedMetric {
inner: IntGauge,

// If, for some reason, a deployment reports that it is synced, and then reports that it is not
// synced during an execution, this prevents the metric from reverting to the not synced state.
previously_synced: std::sync::OnceLock<()>,
}

impl DeploymentSyncedMetric {
const NOT_SYNCED: i64 = 0;
const SYNCED: i64 = 1;

/// Registers the metric.
pub fn register(registry: &MetricsRegistry, deployment_hash: &str) -> Self {
let metric = registry
.new_int_gauge(
"deployment_synced",
"Indicates whether a deployment has reached the chain head since it was deployed.\n\
Possible values:\n\
0 - deployment is not synced;\n\
1 - deployment is synced;",
[("deployment", deployment_hash)],
)
.expect("failed to register `deployment_synced` gauge");

Self {
inner: metric,
previously_synced: std::sync::OnceLock::new(),
}
}

/// Records the current sync status of the deployment.
/// Will ignore all values after the first `true` is received.
pub fn record(&self, synced: bool) {
if self.previously_synced.get().is_some() {
return;
}

if synced {
self.inner.set(Self::SYNCED);
let _ = self.previously_synced.set(());
return;
}

self.inner.set(Self::NOT_SYNCED);
}
}

0 comments on commit cd71601

Please sign in to comment.