diff --git a/Cargo.lock b/Cargo.lock index b2ae24e0169..2fa1dab762d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -643,9 +643,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.6.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +checksum = "f61dac84819c6588b558454b194026eb1f09c293b9036ae9b159e74e73ab6cf9" dependencies = [ "serde", ] @@ -1817,6 +1817,7 @@ dependencies = [ "humantime", "hyper 1.4.0", "hyper-util", + "indexer-watcher", "isatty", "itertools 0.13.0", "lazy_static", @@ -2017,6 +2018,7 @@ dependencies = [ "graph-chain-substreams", "graph-core", "graph-graphql", + "graph-server-grpc", "graph-server-http", "graph-server-index-node", "graph-server-json-rpc", @@ -2079,6 +2081,16 @@ dependencies = [ "wasmtime", ] +[[package]] +name = "graph-server-grpc" +version = "0.36.0" +dependencies = [ + "graph", + "graph-store-postgres", + "tokio", + "tonic", +] + [[package]] name = "graph-server-http" version = "0.36.0" @@ -2606,9 +2618,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.6" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ab92f4f49ee4fb4f997c784b7a2e0fa70050211e0b6a287f898c3c9785ca956" +checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" dependencies = [ "bytes", "futures-channel", @@ -2619,7 +2631,6 @@ dependencies = [ "pin-project-lite", "socket2", "tokio", - "tower 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)", "tower-service 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "tracing", ] @@ -2730,6 +2741,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "indexer-watcher" +version = "0.1.0" +source = "git+https://github.com/graphprotocol/indexer-rs?tag=indexer-service-rs-v1.4.2#b8b376f07c506c684e22011ca2d4f73c31e7097d" +dependencies = [ + "anyhow", + "tokio", + "tracing", +] + [[package]] name = "indexmap" version = "1.9.3" @@ -2970,9 +2991,9 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" [[package]] name = "libc" -version = "0.2.155" +version = "0.2.169" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" [[package]] name = "libredox" @@ -3127,13 +3148,13 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.11" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" dependencies = [ "libc", "wasi", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -5138,21 +5159,20 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.38.0" +version = "1.43.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a" +checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" dependencies = [ "backtrace", "bytes", "libc", "mio", - "num_cpus", "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -5167,9 +5187,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.3.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 751a19d6213..88b9c046d17 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,15 @@ bs58 = "0.5.1" chrono = "0.4.38" clap = { version = "4.5.4", features = ["derive", "env"] } derivative = "2.2.0" -diesel = { version = "2.2.4", features = ["postgres", "serde_json", "numeric", "r2d2", "chrono", "uuid", "i-implement-a-third-party-backend-and-opt-into-breaking-changes"] } +diesel = { version = "2.2.4", features = [ + "postgres", + "serde_json", + "numeric", + "r2d2", + "chrono", + "uuid", + "i-implement-a-third-party-backend-and-opt-into-breaking-changes", +] } diesel-derive-enum = { version = "2.1.0", features = ["postgres"] } diesel-dynamic-schema = { version = "0.2.1", features = ["postgres"] } diesel_derives = "2.1.4" @@ -56,19 +64,24 @@ serde_derive = "1.0.125" serde_json = { version = "1.0", features = ["arbitrary_precision"] } serde_regex = "1.1.0" serde_yaml = "0.9.21" -slog = { version = "2.7.0", features = ["release_max_level_trace", "max_level_trace"] } +slog = { version = "2.7.0", features = [ + "release_max_level_trace", + "max_level_trace", +] } sqlparser = "0.46.0" strum = { version = "0.26", features = ["derive"] } syn = { version = "2.0.66", features = ["full"] } test-store = { path = "./store/test-store" } thiserror = "1.0.25" -tokio = { version = "1.38.0", features = ["full"] } +tokio = { version = "1", features = ["full"] } tonic = { version = "0.11.0", features = ["tls-roots", "gzip"] } tonic-build = { version = "0.11.0", features = ["prost"] } tower-http = { version = "0.5.2", features = ["cors"] } wasmparser = "0.118.1" wasmtime = "15.0.1" +indexer-watcher = { git = "https://github.com/graphprotocol/indexer-rs", tag = "indexer-service-rs-v1.4.2" } + # Incremental compilation on Rust 1.58 causes an ICE on build. As soon as graph node builds again, these can be removed. [profile.test] incremental = false diff --git a/graph/Cargo.toml b/graph/Cargo.toml index 3ea0c0bf349..f210812ecac 100644 --- a/graph/Cargo.toml +++ b/graph/Cargo.toml @@ -12,7 +12,9 @@ atomic_refcell = "0.1.13" # We require this precise version of bigdecimal. Updating to later versions # has caused PoI differences; if you update this version, you will need to # make sure that it does not cause PoI changes -old_bigdecimal = { version = "=0.1.2", features = ["serde"], package = "bigdecimal" } +old_bigdecimal = { version = "=0.1.2", features = [ + "serde", +], package = "bigdecimal" } bytes = "1.0.1" bs58 = { workspace = true } cid = "0.11.1" @@ -64,14 +66,7 @@ slog-envlogger = "2.1.0" slog-term = "2.7.0" petgraph = "0.6.5" tiny-keccak = "1.5.0" -tokio = { version = "1.38.0", features = [ - "time", - "sync", - "macros", - "test-util", - "rt-multi-thread", - "parking_lot", -] } +tokio.workspace = true tokio-stream = { version = "0.1.15", features = ["sync"] } tokio-retry = "0.3.0" toml = "0.8.8" @@ -92,11 +87,13 @@ defer = "0.2" # Our fork contains patches to make some fields optional for Celo and Fantom compatibility. # Without the "arbitrary_precision" feature, we get the error `data did not match any variant of untagged enum Response`. web3 = { git = "https://github.com/graphprotocol/rust-web3", branch = "graph-patches-onto-0.18", features = [ - "arbitrary_precision", "test" + "arbitrary_precision", + "test", ] } serde_plain = "1.0.2" csv = "1.3.0" object_store = { version = "0.11.0", features = ["gcp"] } +indexer-watcher.workspace = true [dev-dependencies] clap.workspace = true diff --git a/graph/build.rs b/graph/build.rs index 3cc00c0dc07..79f63c5eb58 100644 --- a/graph/build.rs +++ b/graph/build.rs @@ -25,4 +25,10 @@ fn main() { .out_dir("src/substreams_rpc") .compile(&["proto/substreams-rpc.proto"], &["proto"]) .expect("Failed to compile Substreams RPC proto(s)"); + + tonic_build::configure() + .out_dir("src/grpc/pb") + .include_file("mod.rs") + .compile(&["proto/tracing.proto"], &["proto"]) + .expect("Failed to compile Tracing proto(s)"); } diff --git a/graph/proto/tracing.proto b/graph/proto/tracing.proto new file mode 100644 index 00000000000..455b2bef323 --- /dev/null +++ b/graph/proto/tracing.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package graph.tracing.v1; + +service Stream { rpc QueryTrace(Request) returns (stream Response); } + +message Request { int32 deployment_id = 1; } + +message Response { repeated Trace traces = 1; } + +message Trace { + int32 deployment_id = 1; + string query = 2; + uint64 duration_millis = 3; + uint32 children = 4; + optional uint64 conn_wait_millis = 5; + optional uint64 permit_wait_millis = 6; + optional uint64 entity_count = 7; +} diff --git a/graph/src/components/mod.rs b/graph/src/components/mod.rs index 8abdc96f0b0..5561b3142ec 100644 --- a/graph/src/components/mod.rs +++ b/graph/src/components/mod.rs @@ -60,6 +60,8 @@ pub mod metrics; /// Components dealing with versioning pub mod versions; +pub mod tracing; + /// A component that receives events of type `T`. pub trait EventConsumer { /// Get the event sink. diff --git a/graph/src/components/tracing.rs b/graph/src/components/tracing.rs new file mode 100644 index 00000000000..247d42aca01 --- /dev/null +++ b/graph/src/components/tracing.rs @@ -0,0 +1,201 @@ +use lazy_static::lazy_static; + +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use tokio::sync::{mpsc, watch::Receiver, RwLock}; + +use super::store::DeploymentId; + +const DEFAULT_BUFFER_SIZE: usize = 100; +#[cfg(not(test))] +const INDEXER_WATCHER_INTERVAL: Duration = Duration::from_secs(10); +#[cfg(test)] +const INDEXER_WATCHER_INTERVAL: Duration = Duration::from_millis(100); +lazy_static! { + pub static ref TRACING_RUNTIME: tokio::runtime::Runtime = + tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); +} + +#[derive(Debug, Clone)] +pub struct Subscriptions { + inner: Arc>>>, +} + +impl Default for Subscriptions { + fn default() -> Self { + Self { + inner: Arc::new(RwLock::new(HashMap::new())), + } + } +} + +pub type SubscriptionsWatcher = Receiver>; + +/// A control structure for managing tracing subscriptions. +#[derive(Debug)] +pub struct TracingControl { + watcher: Receiver>>, + subscriptions: Subscriptions, + default_buffer_size: usize, +} + +impl TracingControl { + /// Starts a new tracing control instance.If an async runtime is not available, a new one will be created. + pub fn start() -> Self { + Self::new(DEFAULT_BUFFER_SIZE) + } + + pub fn new(buffer_size: usize) -> Self { + let subscriptions = Subscriptions::default(); + let subs = subscriptions.clone(); + + let watcher = std::thread::spawn(move || { + let handle = + tokio::runtime::Handle::try_current().unwrap_or(TRACING_RUNTIME.handle().clone()); + + handle.block_on(async move { + indexer_watcher::new_watcher(INDEXER_WATCHER_INTERVAL, move || { + let subs = subs.clone(); + + async move { Ok(subs.inner.read().await.clone()) } + }) + .await + }) + }) + .join() + .unwrap() + .unwrap(); + + Self { + watcher, + subscriptions, + default_buffer_size: buffer_size, + } + } + + /// Returns a producer for a given deployment ID. If the producer is closed, it will return None. + /// The producer could still be closed in the meantime. + pub fn producer(&self, key: DeploymentId) -> Option> { + self.watcher + .borrow() + .get(&key) + .cloned() + .filter(|sender| !sender.is_closed()) + } + + /// Creates a new subscription for a given deployment ID with a given buffer size. If a subscription already + /// exists, it will be replaced. + pub async fn subscribe_with_chan_size( + &self, + key: DeploymentId, + buffer_size: usize, + ) -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(buffer_size); + let mut guard = self.subscriptions.inner.write().await; + guard.insert(key, tx); + + rx + } + + /// Creates a new subscription for a given deployment ID. If a subscription already + /// exists, it will be replaced. + pub async fn subscribe(&self, key: DeploymentId) -> mpsc::Receiver { + self.subscribe_with_chan_size(key, self.default_buffer_size) + .await + } +} + +#[cfg(test)] +mod test { + + use anyhow::anyhow; + use tokio::time::{self, Instant}; + use tokio_retry::Retry; + + use super::*; + use std::sync::Arc; + + #[tokio::test] + async fn test_watcher() { + let x = time::Instant::now(); + let x = indexer_watcher::new_watcher(Duration::from_millis(10), move || { + let x = x.clone(); + + async move { + let now = Instant::now(); + Ok(now.duration_since(x)) + } + }) + .await + .unwrap(); + + Retry::spawn(vec![Duration::from_secs(10); 3].into_iter(), move || { + let x = x.clone(); + async move { + let count = x.borrow().clone(); + println!("{}", count.as_millis()); + Err::(anyhow!("millis: {}", count.as_millis())) + } + }) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_tracing_control() { + let control: TracingControl<()> = TracingControl::start(); + let control = Arc::new(control); + + // produce before subscription + let tx = control.producer(DeploymentId(123)); + assert!(tx.is_none()); + + // drop the subscription + let rx = control.subscribe(DeploymentId(123)).await; + + let c = control.clone(); + // check subscription is none because channel is closed + let tx = Retry::spawn(vec![INDEXER_WATCHER_INTERVAL; 3].into_iter(), move || { + let control = c.clone(); + async move { + match control.producer(DeploymentId(123)) { + Some(sender) if !sender.is_closed() => Ok(sender), + // producer should never return a closed sender + Some(_) => unreachable!(), + None => Err(anyhow!("Sender not created yet")), + } + } + }) + .await + .unwrap(); + assert!(!tx.is_closed()); + drop(rx); + + // check subscription is none because channel is closed + let tx = control.producer(DeploymentId(123)); + assert!(tx.is_none()); + + // re-create subscription + let _rx = control.subscribe(DeploymentId(123)).await; + + // check old subscription was replaced + let c = control.clone(); + let tx = Retry::spawn(vec![INDEXER_WATCHER_INTERVAL; 3].into_iter(), move || { + let tx = c.producer(DeploymentId(123)); + async move { + match tx { + Some(sender) if !sender.is_closed() => Ok(sender), + Some(_) => Err(anyhow!("Sender is closed")), + None => Err(anyhow!("Sender not created yet")), + } + } + }) + .await + .unwrap(); + assert!(!tx.is_closed()) + } +} diff --git a/graph/src/data/query/trace.rs b/graph/src/data/query/trace.rs index cf2d153dca4..dc8264a10df 100644 --- a/graph/src/data/query/trace.rs +++ b/graph/src/data/query/trace.rs @@ -14,7 +14,7 @@ lazy_static! { pub static ref TRACE_NONE: Arc = Arc::new(Trace::None); } -#[derive(Debug, CacheWeight)] +#[derive(Debug, CacheWeight, Clone)] pub struct TraceWithCacheStatus { pub trace: Arc, pub cache_status: CacheStatus, @@ -35,7 +35,7 @@ impl HttpTrace { } } -#[derive(Debug, CacheWeight)] +#[derive(Debug, CacheWeight, Clone)] pub enum Trace { None, Root { diff --git a/graph/src/env/mod.rs b/graph/src/env/mod.rs index 4383ce17b5c..b27ba535d79 100644 --- a/graph/src/env/mod.rs +++ b/graph/src/env/mod.rs @@ -247,6 +247,8 @@ pub struct EnvVars { /// Set by the environment variable `GRAPH_FIREHOSE_FETCH_BLOCK_TIMEOUT_SECS`. /// The default value is 60 seconds. pub firehose_block_fetch_timeout: u64, + /// Set by the environment variable `GRAPH_ENABLE_TRACING_GRPC_SERVER`. + pub enable_tracing_grpc_server: bool, } impl EnvVars { @@ -339,6 +341,7 @@ impl EnvVars { block_write_capacity: inner.block_write_capacity.0, firehose_block_fetch_retry_limit: inner.firehose_block_fetch_retry_limit, firehose_block_fetch_timeout: inner.firehose_block_fetch_timeout, + enable_tracing_grpc_server: inner.enable_tracing_grpc_server.0, }) } @@ -506,6 +509,8 @@ struct Inner { firehose_block_fetch_retry_limit: usize, #[envconfig(from = "GRAPH_FIREHOSE_FETCH_BLOCK_TIMEOUT_SECS", default = "60")] firehose_block_fetch_timeout: u64, + #[envconfig(from = "GRAPH_NODE_ENABLE_QUERY_TRACING_GRPC", default = "false")] + enable_tracing_grpc_server: EnvVarBoolean, } #[derive(Clone, Debug)] diff --git a/graph/src/grpc/mod.rs b/graph/src/grpc/mod.rs new file mode 100644 index 00000000000..ffaf2083e26 --- /dev/null +++ b/graph/src/grpc/mod.rs @@ -0,0 +1 @@ +pub mod pb; diff --git a/graph/src/grpc/pb/graph.tracing.v1.rs b/graph/src/grpc/pb/graph.tracing.v1.rs new file mode 100644 index 00000000000..47e589c5bef --- /dev/null +++ b/graph/src/grpc/pb/graph.tracing.v1.rs @@ -0,0 +1,326 @@ +// This file is @generated by prost-build. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Request { + #[prost(int32, tag = "1")] + pub deployment_id: i32, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Response { + #[prost(message, repeated, tag = "1")] + pub traces: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Trace { + #[prost(int32, tag = "1")] + pub deployment_id: i32, + #[prost(string, tag = "2")] + pub query: ::prost::alloc::string::String, + #[prost(uint64, tag = "3")] + pub duration_millis: u64, + #[prost(uint32, tag = "4")] + pub children: u32, + #[prost(uint64, optional, tag = "5")] + pub conn_wait_millis: ::core::option::Option, + #[prost(uint64, optional, tag = "6")] + pub permit_wait_millis: ::core::option::Option, + #[prost(uint64, optional, tag = "7")] + pub entity_count: ::core::option::Option, +} +/// Generated client implementations. +pub mod stream_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct StreamClient { + inner: tonic::client::Grpc, + } + impl StreamClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl StreamClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> StreamClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + Send + Sync, + { + StreamClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn query_trace( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/graph.tracing.v1.Stream/QueryTrace", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("graph.tracing.v1.Stream", "QueryTrace")); + self.inner.server_streaming(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod stream_server { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with StreamServer. + #[async_trait] + pub trait Stream: Send + Sync + 'static { + /// Server streaming response type for the QueryTrace method. + type QueryTraceStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + Send + + 'static; + async fn query_trace( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + } + #[derive(Debug)] + pub struct StreamServer { + inner: _Inner, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + struct _Inner(Arc); + impl StreamServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + let inner = _Inner(inner); + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for StreamServer + where + T: Stream, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + let inner = self.inner.clone(); + match req.uri().path() { + "/graph.tracing.v1.Stream/QueryTrace" => { + #[allow(non_camel_case_types)] + struct QueryTraceSvc(pub Arc); + impl tonic::server::ServerStreamingService + for QueryTraceSvc { + type Response = super::Response; + type ResponseStream = T::QueryTraceStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::query_trace(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = QueryTraceSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } + } + } + } + impl Clone for StreamServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + impl Clone for _Inner { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } + } + impl std::fmt::Debug for _Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } + } + impl tonic::server::NamedService for StreamServer { + const NAME: &'static str = "graph.tracing.v1.Stream"; + } +} diff --git a/graph/src/grpc/pb/mod.rs b/graph/src/grpc/pb/mod.rs new file mode 100644 index 00000000000..611e514e404 --- /dev/null +++ b/graph/src/grpc/pb/mod.rs @@ -0,0 +1,8 @@ +// This file is @generated by prost-build. +pub mod graph { + pub mod tracing { + pub mod v1 { + include!("graph.tracing.v1.rs"); + } + } +} diff --git a/graph/src/lib.rs b/graph/src/lib.rs index 04872aab196..3de1fea3d9d 100644 --- a/graph/src/lib.rs +++ b/graph/src/lib.rs @@ -37,6 +37,8 @@ pub mod env; pub mod ipfs; +pub mod grpc; + /// Wrapper for spawning tasks that abort on panic, which is our default. mod task_spawn; pub use task_spawn::{ diff --git a/node/Cargo.toml b/node/Cargo.toml index 820ed8405a8..0c9b641f1da 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -32,6 +32,7 @@ graph-server-http = { path = "../server/http" } graph-server-index-node = { path = "../server/index-node" } graph-server-json-rpc = { path = "../server/json-rpc" } graph-server-websocket = { path = "../server/websocket" } +graph-server-grpc = { path = "../server/grpc" } graph-server-metrics = { path = "../server/metrics" } graph-store-postgres = { path = "../store/postgres" } graphman-server = { workspace = true } diff --git a/node/src/main.rs b/node/src/main.rs index 870cce97318..8862c0ed3f5 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -153,6 +153,9 @@ async fn main() { // Obtain metrics server port let metrics_port = opt.metrics_port; + // Obtain tracing server port + let tracing_grpc_port = opt.grpc_port; + // Obtain the fork base URL let fork_base = match &opt.fork_base { Some(url) => { @@ -516,6 +519,12 @@ async fn main() { // Run the index node server graph::spawn(async move { index_node_server.start(index_node_port).await }); + if env_vars.enable_tracing_grpc_server { + graph::spawn(async move { + graph_server_grpc::start(tracing_grpc_port).await.unwrap(); + }); + } + graph::spawn(async move { metrics_server .start(metrics_port) diff --git a/node/src/opt.rs b/node/src/opt.rs index e4dc44ba92a..ebe74868cf5 100644 --- a/node/src/opt.rs +++ b/node/src/opt.rs @@ -154,6 +154,13 @@ pub struct Opt { help = "Port for the Prometheus metrics server" )] pub metrics_port: u16, + #[clap( + long, + default_value = "8060", + value_name = "PORT", + help = "Port for the query tracing GRPC Server" + )] + pub grpc_port: u16, #[clap( long, default_value = "default", diff --git a/server/grpc/Cargo.toml b/server/grpc/Cargo.toml new file mode 100644 index 00000000000..10e101ffcd9 --- /dev/null +++ b/server/grpc/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "graph-server-grpc" +version.workspace = true +edition.workspace = true +authors.workspace = true +readme.workspace = true +homepage.workspace = true +repository.workspace = true +license.workspace = true + +[dependencies] +graph = { path = "../../graph" } +graph-store-postgres = { path = "../../store/postgres" } +tonic.workspace = true +tokio.workspace = true diff --git a/server/grpc/src/lib.rs b/server/grpc/src/lib.rs new file mode 100644 index 00000000000..69df7a7592f --- /dev/null +++ b/server/grpc/src/lib.rs @@ -0,0 +1,106 @@ +pub struct TracingServer; + +use std::pin::Pin; + +use graph::{ + data::query::Trace, + futures03::{stream::unfold, Stream, StreamExt as _}, + grpc::pb::graph::tracing::v1::{ + stream_server::{Stream as StreamProto, StreamServer}, + Request, Response as ResponseProto, Trace as TraceProto, + }, +}; +use graph_store_postgres::TRACING_CONTROL; +use tonic::{async_trait, Status}; + +type ResponseStream = Pin> + Send>>; + +#[async_trait] +impl StreamProto for TracingServer { + type QueryTraceStream = ResponseStream; + + async fn query_trace( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let Request { deployment_id } = request.into_inner(); + + let rx = TRACING_CONTROL + .subscribe(graph::components::store::DeploymentId(deployment_id)) + .await; + + fn query_traces(deployment_id: i32, trace: Trace, out: &mut Vec) { + match trace { + Trace::Root { + query, + variables: _, + query_id: _, + setup: _, + elapsed, + query_parsing: _, + blocks: _, + } => out.push(TraceProto { + deployment_id, + query: query.to_string(), + duration_millis: elapsed.as_millis() as u64, + children: 0, + conn_wait_millis: None, + permit_wait_millis: None, + entity_count: None, + }), + Trace::Query { + query, + elapsed, + conn_wait, + permit_wait, + entity_count, + children, + } => { + out.push(TraceProto { + deployment_id, + query: query.to_string(), + duration_millis: elapsed.as_millis() as u64, + children: children.len() as u32, + conn_wait_millis: Some(conn_wait.as_millis() as u64), + permit_wait_millis: Some(permit_wait.as_millis() as u64), + entity_count: Some(entity_count as u64), + }); + + for (_key, child) in children { + query_traces(deployment_id, child, out); + } + } + _ => return, + } + } + + let stream: Pin> + Send>> = + unfold(rx, move |mut rx| async move { + rx.recv().await.and_then(|trace| { + let mut traces = vec![]; + query_traces(deployment_id, trace, &mut traces); + + if traces.is_empty() { + None + } else { + Some((ResponseProto { traces }, rx)) + } + }) + }) + .map(Ok) + .boxed(); + + Ok(tonic::Response::new(stream as Self::QueryTraceStream)) + } +} + +pub async fn start(port: u16) -> Result<(), Box> { + let addr = format!("[::]:{}", port).parse()?; + println!("gRPC server listening on {}", addr); + tonic::transport::Server::builder() + .add_service(StreamServer::new(TracingServer)) + .serve(addr) + .await?; + + Ok(()) +} diff --git a/store/postgres/src/lib.rs b/store/postgres/src/lib.rs index 759e8601313..83ccd93a8eb 100644 --- a/store/postgres/src/lib.rs +++ b/store/postgres/src/lib.rs @@ -11,6 +11,16 @@ extern crate diesel_migrations; #[macro_use] extern crate diesel_derive_enum; +use std::sync::Arc; + +use graph::components::tracing::TracingControl; +use graph::data::query::Trace; +use graph::prelude::lazy_static; + +lazy_static! { + pub static ref TRACING_CONTROL: Arc> = Arc::new(TracingControl::start()); +} + mod advisory_lock; mod block_range; mod block_store; diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index de7e6895083..6930978cea5 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -79,7 +79,7 @@ use graph::prelude::{ use crate::block_range::{BoundSide, BLOCK_COLUMN, BLOCK_RANGE_COLUMN}; pub use crate::catalog::Catalog; use crate::connection_pool::ForeignServer; -use crate::{catalog, deployment}; +use crate::{catalog, deployment, TRACING_CONTROL}; use self::rollup::Rollup; @@ -824,11 +824,12 @@ impl Layout { elapsed: Duration, entity_count: usize, trace: bool, + trace_sender: Option>, ) -> Trace { // 20kB const MAXLEN: usize = 20_480; - if !ENV_VARS.log_sql_timing() && !trace { + if !ENV_VARS.log_sql_timing() && !trace && trace_sender.is_none() { return Trace::None; } @@ -840,6 +841,16 @@ impl Layout { Trace::None }; + match (&trace, trace_sender) { + (Trace::None, Some(sender)) => { + let _ = sender.try_send(Trace::query(&text, elapsed, entity_count)); + } + (trace, Some(sender)) => { + let _ = sender.try_send(trace.clone()); + } + (_, None) => {} + } + if ENV_VARS.log_sql_timing() { // If the query + bind variables is more than MAXLEN, truncate it; // this will happen when queries have very large bind variables @@ -910,7 +921,18 @@ impl Layout { )), } })?; - let trace = log_query_timing(logger, &query_clone, start.elapsed(), values.len(), trace); + + let id = self.site.id.into(); + let trace_sender = TRACING_CONTROL.producer(id); + + let trace = log_query_timing( + logger, + &query_clone, + start.elapsed(), + values.len(), + trace, + trace_sender, + ); let parent_type = filter_collection.parent_type()?.map(ColumnType::from); values diff --git a/tests/Cargo.toml b/tests/Cargo.toml index b79702ceb7f..3f711964bc4 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -9,7 +9,7 @@ assert-json-diff = "2.0.2" async-stream = "0.3.5" graph = { path = "../graph" } graph-chain-ethereum = { path = "../chain/ethereum" } -graph-chain-substreams= {path = "../chain/substreams"} +graph-chain-substreams = { path = "../chain/substreams" } graph-node = { path = "../node" } graph-core = { path = "../core" } graph-graphql = { path = "../graphql" } @@ -18,8 +18,11 @@ graph-server-index-node = { path = "../server/index-node" } graph-runtime-wasm = { path = "../runtime/wasm" } serde = { workspace = true } serde_yaml = { workspace = true } -slog = { version = "2.7.0", features = ["release_max_level_trace", "max_level_trace"] } -tokio = { version = "1.38.0", features = ["rt", "macros", "process"] } +slog = { version = "2.7.0", features = [ + "release_max_level_trace", + "max_level_trace", +] } +tokio.workspace = true # Once graph upgrades to web3 0.19, we don't need this anymore. The version # here needs to be kept in sync with the web3 version that the graph crate # uses until then