Skip to content

Commit

Permalink
query tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed Feb 15, 2025
1 parent 7b786cb commit 8d0224a
Show file tree
Hide file tree
Showing 17 changed files with 709 additions and 14 deletions.
20 changes: 15 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 13 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,15 @@ bs58 = "0.5.1"
chrono = "0.4.38"
clap = { version = "4.5.4", features = ["derive", "env"] }
derivative = "2.2.0"
diesel = { version = "2.2.4", features = ["postgres", "serde_json", "numeric", "r2d2", "chrono", "uuid", "i-implement-a-third-party-backend-and-opt-into-breaking-changes"] }
diesel = { version = "2.2.4", features = [
"postgres",
"serde_json",
"numeric",
"r2d2",
"chrono",
"uuid",
"i-implement-a-third-party-backend-and-opt-into-breaking-changes",
] }
diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
diesel-dynamic-schema = { version = "0.2.1", features = ["postgres"] }
diesel_derives = "2.1.4"
Expand All @@ -56,7 +64,10 @@ serde_derive = "1.0.125"
serde_json = { version = "1.0", features = ["arbitrary_precision"] }
serde_regex = "1.1.0"
serde_yaml = "0.9.21"
slog = { version = "2.7.0", features = ["release_max_level_trace", "max_level_trace"] }
slog = { version = "2.7.0", features = [
"release_max_level_trace",
"max_level_trace",
] }
sqlparser = "0.46.0"
strum = { version = "0.26", features = ["derive"] }
syn = { version = "2.0.66", features = ["full"] }
Expand Down
6 changes: 6 additions & 0 deletions graph/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,10 @@ fn main() {
.out_dir("src/substreams_rpc")
.compile(&["proto/substreams-rpc.proto"], &["proto"])
.expect("Failed to compile Substreams RPC proto(s)");

tonic_build::configure()
.out_dir("src/grpc/pb")
.include_file("mod.rs")
.compile(&["proto/tracing.proto"], &["proto"])
.expect("Failed to compile Tracing proto(s)");
}
17 changes: 17 additions & 0 deletions graph/proto/tracing.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
syntax = "proto3";

package graph.tracing.v1;

service Stream {
rpc QueryTrace(Request) returns (stream Trace);
}

message Request {
int32 deployment_id = 1;
}

message Trace {
int32 deployment_id = 1;
string query = 2;
uint64 duration_millis = 3;
}
2 changes: 2 additions & 0 deletions graph/src/components/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub mod metrics;
/// Components dealing with versioning
pub mod versions;

pub mod tracing;

/// A component that receives events of type `T`.
pub trait EventConsumer<E> {
/// Get the event sink.
Expand Down
123 changes: 123 additions & 0 deletions graph/src/components/tracing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use std::{collections::HashMap, sync::atomic::AtomicBool};

use tokio::sync::{mpsc, RwLock};

use super::store::DeploymentId;

const DEFAULT_BUFFER_SIZE: usize = 100;

#[derive(Debug)]
pub struct Subscriptions<T> {
inner: RwLock<HashMap<DeploymentId, mpsc::Sender<T>>>,
}

/// A control structure for managing tracing subscriptions.
#[derive(Debug)]
pub struct TracingControl<T> {
enabled: AtomicBool,
subscriptions: Subscriptions<T>,
default_buffer_size: usize,
}

impl<T> Default for TracingControl<T> {
fn default() -> Self {
Self {
enabled: AtomicBool::new(false),
subscriptions: Subscriptions {
inner: RwLock::new(HashMap::new()),
},
default_buffer_size: DEFAULT_BUFFER_SIZE,
}
}
}

impl<T> TracingControl<T> {
pub fn new(default_buffer_size: Option<usize>) -> Self {
Self {
enabled: AtomicBool::new(false),
subscriptions: Subscriptions {
inner: RwLock::new(HashMap::new()),
},
default_buffer_size: default_buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE),
}
}

/// Creates a channel sender for the given deployment ID. Only one subscription
/// can exist for a given deployment ID. If tracing is disabled or no subscription
/// exists, it will return None. Calling producer when a dead subscription exists
/// will incur a cleanup cost.
pub async fn producer(&self, key: DeploymentId) -> Option<mpsc::Sender<T>> {
if !self.enabled.load(std::sync::atomic::Ordering::Relaxed) {
return None;
}

let subs = self.subscriptions.inner.read().await;
let tx = subs.get(&key);

match tx {
Some(tx) if tx.is_closed() => {
drop(subs);
let mut subs = self.subscriptions.inner.write().await;
subs.remove(&key);

if subs.is_empty() {
self.enabled
.store(false, std::sync::atomic::Ordering::Relaxed);
}

None
}
None => None,
tx => tx.cloned(),
}
}
pub async fn subscribe_with_chan_size(
&self,
key: DeploymentId,
buffer_size: usize,
) -> mpsc::Receiver<T> {
let (tx, rx) = mpsc::channel(buffer_size);
let mut guard = self.subscriptions.inner.write().await;
guard.insert(key, tx);
self.enabled
.store(true, std::sync::atomic::Ordering::Relaxed);

rx
}

/// Creates a new subscription for a given deployment ID. If a subscription already
/// exists, it will be replaced.
pub async fn subscribe(&self, key: DeploymentId) -> mpsc::Receiver<T> {
self.subscribe_with_chan_size(key, self.default_buffer_size)
.await
}
}

#[cfg(test)]
mod test {

use super::*;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Arc;

#[tokio::test]
async fn test_tracing_control() {
let control: TracingControl<()> = TracingControl::default();
let control = Arc::new(control);
assert_eq!(false, control.enabled.load(Relaxed));

let tx = control.producer(DeploymentId(123)).await;
assert!(tx.is_none());

let rx = control.subscribe(DeploymentId(123)).await;
assert_eq!(true, control.enabled.load(Relaxed));

drop(rx);
let tx = control.producer(DeploymentId(123)).await;
assert!(tx.is_none());
assert_eq!(false, control.enabled.load(Relaxed));

_ = control.subscribe(DeploymentId(123)).await;
assert_eq!(true, control.enabled.load(Relaxed));
}
}
4 changes: 2 additions & 2 deletions graph/src/data/query/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ lazy_static! {
pub static ref TRACE_NONE: Arc<Trace> = Arc::new(Trace::None);
}

#[derive(Debug, CacheWeight)]
#[derive(Debug, CacheWeight, Clone)]
pub struct TraceWithCacheStatus {
pub trace: Arc<Trace>,
pub cache_status: CacheStatus,
Expand All @@ -35,7 +35,7 @@ impl HttpTrace {
}
}

#[derive(Debug, CacheWeight)]
#[derive(Debug, CacheWeight, Clone)]
pub enum Trace {
None,
Root {
Expand Down
4 changes: 4 additions & 0 deletions graph/src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
use pb::graph::tracing::v1::{Request, Trace};
use tonic::async_trait;

pub mod pb;
Loading

0 comments on commit 8d0224a

Please sign in to comment.