From 79c1bbdd39380b8fc4079e79719bd16735f835ce Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Tue, 8 Oct 2024 16:52:56 +0300 Subject: [PATCH 01/10] feat: tracing with OpenTelemetry OTLP Replace the "movement_timing" tracing target and the logging layer it targeted with an optionally installed OpenTelemetry OTLP exporter. The name of the tracing target matched to send OpenTelemetry events is "movement_telemetry". --- Cargo.lock | 298 +++++++++++++++--- Cargo.toml | 5 +- .../src/bin/e2e/followers_consistent.rs | 8 +- networks/suzuka/suzuka-full-node/src/main.rs | 8 +- .../src/tasks/execute_settle.rs | 2 +- .../src/tasks/transaction_ingress.rs | 4 +- protocol-units/da/m1/light-node/src/main.rs | 9 +- .../da/m1/light-node/src/v1/passthrough.rs | 2 +- .../da/m1/light-node/src/v1/sequencer.rs | 22 +- .../opt-executor/src/executor/mod.rs | 2 +- .../opt-executor/src/transaction_pipe.rs | 6 +- util/tracing/Cargo.toml | 8 +- util/tracing/src/lib.rs | 97 +++--- 13 files changed, 326 insertions(+), 145 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 324e5c320..6452b9711 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -402,7 +402,7 @@ dependencies = [ "serde_json", "tokio", "tokio-stream", - "tower", + "tower 0.4.13", "tracing", ] @@ -447,7 +447,7 @@ dependencies = [ "serde_json", "tokio", "tokio-stream", - "tower", + "tower 0.4.13", "tracing", "url", ] @@ -646,7 +646,7 @@ dependencies = [ "serde_json", "thiserror", "tokio", - "tower", + "tower 0.4.13", "tracing", "url", ] @@ -660,7 +660,7 @@ dependencies = [ "alloy-transport", "reqwest 0.12.8", "serde_json", - "tower", + "tower 0.4.13", "tracing", "url", ] @@ -3532,7 +3532,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.3.4", "bitflags 1.3.2", "bytes 1.7.2", "futures-util", @@ -3548,7 +3548,34 @@ dependencies = [ "rustversion", "serde", "sync_wrapper 0.1.2", - "tower", + "tower 0.4.13", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" +dependencies = [ + "async-trait", + "axum-core 0.4.5", + "bytes 1.7.2", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper 1.0.1", + "tower 0.5.1", "tower-layer", "tower-service", ] @@ -3570,6 +3597,26 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes 1.7.2", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 1.0.1", + "tower-layer", + "tower-service", +] + [[package]] name = "az" version = "1.2.1" @@ -5193,7 +5240,7 @@ dependencies = [ "bitflags 1.3.2", "crossterm_winapi", "libc", - "mio 0.8.11", + "mio", "parking_lot", "signal-hook", "signal-hook-mio", @@ -5209,7 +5256,7 @@ dependencies = [ "bitflags 1.3.2", "crossterm_winapi", "libc", - "mio 0.8.11", + "mio", "parking_lot", "signal-hook", "signal-hook-mio", @@ -6722,7 +6769,7 @@ dependencies = [ "tokio", "tokio-retry", "tonic 0.9.2", - "tower", + "tower 0.4.13", "tracing", ] @@ -7271,6 +7318,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -7324,6 +7372,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-timeout" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" +dependencies = [ + "hyper 1.4.1", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -7866,7 +7927,7 @@ dependencies = [ "serde_json", "thiserror", "tokio", - "tower", + "tower 0.4.13", "tracing", "url", ] @@ -8908,18 +8969,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "mio" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" -dependencies = [ - "hermit-abi 0.3.9", - "libc", - "wasi 0.11.0+wasi-snapshot-preview1", - "windows-sys 0.52.0", -] - [[package]] name = "mirai-annotations" version = "1.12.0" @@ -9626,7 +9675,11 @@ dependencies = [ name = "movement-tracing" version = "0.0.2" dependencies = [ - "tracing-appender", + "anyhow", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", + "tracing-opentelemetry", "tracing-subscriber 0.3.18", ] @@ -10121,6 +10174,71 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "803801d3d3b71cd026851a53f974ea03df3d179cb758b260136a6c9e22e196af" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "596b1719b3cab83addb20bcbffdf21575279d9436d9ccccfe651a3bf0ab5ab06" +dependencies = [ + "async-trait", + "futures-core", + "http 1.1.0", + "opentelemetry", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost 0.13.3", + "thiserror", + "tokio", + "tonic 0.12.3", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c43620e8f93359eb7e627a3b16ee92d8585774986f24f2ab010817426c5ce61" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost 0.13.3", + "tonic 0.12.3", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0da0d6b47a3dbc6e9c9e36a0520e25cf943e046843818faaa3f87365a548c82" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry", + "percent-encoding", + "rand 0.8.5", + "serde_json", + "thiserror", + "tokio", + "tokio-stream", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -11235,6 +11353,16 @@ dependencies = [ "prost-derive 0.12.6", ] +[[package]] +name = "prost" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f" +dependencies = [ + "bytes 1.7.2", + "prost-derive 0.13.3", +] + [[package]] name = "prost-build" version = "0.12.6" @@ -11282,6 +11410,19 @@ dependencies = [ "syn 2.0.79", ] +[[package]] +name = "prost-derive" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" +dependencies = [ + "anyhow", + "itertools 0.13.0", + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "prost-types" version = "0.11.9" @@ -12775,7 +12916,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34db1a06d485c9142248b7a054f034b349b212551f3dfd19c94d45a754a217cd" dependencies = [ "libc", - "mio 0.8.11", + "mio", "signal-hook", ] @@ -13791,21 +13932,22 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.40.0" +version = "1.38.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" +checksum = "eb2caba9f80616f438e09748d5acda951967e1ea58508ef53d9c6402485a46df" dependencies = [ "backtrace", "bytes 1.7.2", "libc", - "mio 1.0.2", + "mio", + "num_cpus", "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2 0.5.7", "tokio-macros", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.48.0", ] [[package]] @@ -13820,9 +13962,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.4.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" +checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", @@ -14049,7 +14191,7 @@ checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.6.20", "base64 0.21.7", "bytes 1.7.2", "flate2", @@ -14059,7 +14201,7 @@ dependencies = [ "http 0.2.12", "http-body 0.4.6", "hyper 0.14.30", - "hyper-timeout", + "hyper-timeout 0.4.1", "percent-encoding", "pin-project 1.1.5", "prost 0.11.9", @@ -14067,7 +14209,7 @@ dependencies = [ "tokio", "tokio-rustls 0.24.1", "tokio-stream", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", @@ -14082,7 +14224,7 @@ checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.6.20", "base64 0.21.7", "bytes 1.7.2", "flate2", @@ -14090,7 +14232,7 @@ dependencies = [ "http 0.2.12", "http-body 0.4.6", "hyper 0.14.30", - "hyper-timeout", + "hyper-timeout 0.4.1", "percent-encoding", "pin-project 1.1.5", "prost 0.12.6", @@ -14100,13 +14242,43 @@ dependencies = [ "tokio", "tokio-rustls 0.25.0", "tokio-stream", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", "zstd 0.12.4", ] +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum 0.7.7", + "base64 0.22.1", + "bytes 1.7.2", + "h2 0.4.6", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", + "hyper-timeout 0.5.1", + "hyper-util", + "percent-encoding", + "pin-project 1.1.5", + "prost 0.13.3", + "socket2 0.5.7", + "tokio", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tonic-build" version = "0.11.0" @@ -14173,6 +14345,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 0.1.2", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-http" version = "0.4.4" @@ -14215,18 +14401,6 @@ dependencies = [ "tracing-core", ] -[[package]] -name = "tracing-appender" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" -dependencies = [ - "crossbeam-channel", - "thiserror", - "time", - "tracing-subscriber 0.3.18", -] - [[package]] name = "tracing-attributes" version = "0.1.27" @@ -14259,6 +14433,24 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eabc56d23707ad55ba2a0750fc24767125d5a0f51993ba41ad2c441cc7b8dea" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber 0.3.18", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.1.3" @@ -14948,6 +15140,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.23.1" diff --git a/Cargo.toml b/Cargo.toml index 21ac63f45..e0610c224 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -246,6 +246,9 @@ move-vm-ext = { path = "types/move-vm-ext" } num-derive = "0.4.2" num-traits = "0.2.14" once_cell = "1.8.0" +opentelemetry = { version = "0.25" } +opentelemetry_sdk = { version = "0.25", features = ["rt-tokio"] } +opentelemetry-otlp = { version = "0.25" } parking_lot = { version = "0.12.1" } poem = { version = "=1.3.59", features = ["anyhow", "rustls"] } poem-openapi = { version = "=2.0.11", features = ["swagger-ui", "url"] } @@ -283,7 +286,7 @@ tonic-reflection = "0.11" tonic-web = "0.11" ### To try (experimental) std support, add `features = [ "std" ]` to risc0-zkvm tracing = "0.1.40" -tracing-appender = "0.2" +tracing-opentelemetry = { version = "0.26" } tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-test = "0.2.5" trie-db = "0.28.0" diff --git a/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs b/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs index 3b9353e05..b16d9c170 100644 --- a/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs +++ b/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs @@ -12,8 +12,6 @@ use tokio::sync::RwLock; use tracing::info; use url::Url; -const TIMING_LOG_ENV: &str = "SUZUKA_TIMING_LOG"; - pub fn get_suzuka_config( dot_movement: &DotMovement, ) -> Result { @@ -247,10 +245,8 @@ pub async fn basic_coin_transfers( #[tokio::main] async fn main() -> Result<(), anyhow::Error> { - let tracing_config = movement_tracing::Config { - timing_log_path: std::env::var_os(TIMING_LOG_ENV).map(Into::into), - }; - let _guard = movement_tracing::init_tracing_subscriber(tracing_config); + let tracing_config = movement_tracing::Config::from_env()?; + movement_tracing::init_tracing_subscriber(tracing_config)?; // get the lead dot movement from the environment let dot_movement = DotMovement::try_from_env()?; diff --git a/networks/suzuka/suzuka-full-node/src/main.rs b/networks/suzuka/suzuka-full-node/src/main.rs index eea463a88..29f71faa9 100644 --- a/networks/suzuka/suzuka-full-node/src/main.rs +++ b/networks/suzuka/suzuka-full-node/src/main.rs @@ -1,15 +1,11 @@ use suzuka_full_node::manager::Manager; -use std::env; use std::process::ExitCode; -const TIMING_LOG_ENV: &str = "SUZUKA_TIMING_LOG"; - #[tokio::main] async fn main() -> Result { - let tracing_config = - movement_tracing::Config { timing_log_path: env::var_os(TIMING_LOG_ENV).map(Into::into) }; - let _guard = movement_tracing::init_tracing_subscriber(tracing_config); + let tracing_config = movement_tracing::Config::from_env()?; + movement_tracing::init_tracing_subscriber(tracing_config)?; // get the config file let dot_movement = dot_movement::DotMovement::try_from_env()?; diff --git a/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs b/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs index e8a00da48..c278f6641 100644 --- a/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs +++ b/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs @@ -140,7 +140,7 @@ where // get the transactions let transactions_count = block.transactions().len(); - let span = info_span!(target: "movement_timing", "execute_block", id = %block_id); + let span = info_span!(target: "movement_telemetry", "execute_block", id = %block_id); let commitment = self.execute_block_with_retries(block, block_timestamp).instrument(span).await?; diff --git a/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs b/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs index a02108f53..b47d20a6f 100644 --- a/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs +++ b/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs @@ -65,7 +65,7 @@ impl Task { Ok(transaction) => match transaction { Some(transaction) => { info!( - target : "movement_timing", + target : "movement_telemetry", batch_id = %batch_id, tx_hash = %transaction.committed_hash(), sender = %transaction.sender(), @@ -93,7 +93,7 @@ impl Task { if transactions.len() > 0 { info!( - target: "movement_timing", + target: "movement_telemetry", batch_id = %batch_id, transaction_count = transactions.len(), "built_batch_write" diff --git a/protocol-units/da/m1/light-node/src/main.rs b/protocol-units/da/m1/light-node/src/main.rs index e9a8cb7f5..f72cf65b1 100644 --- a/protocol-units/da/m1/light-node/src/main.rs +++ b/protocol-units/da/m1/light-node/src/main.rs @@ -1,14 +1,9 @@ use m1_da_light_node::v1::{LightNodeV1, Manager}; -use std::env; - -const TIMING_LOG_ENV: &str = "M1_DA_LIGHT_NODE_TIMING_LOG"; - #[tokio::main] async fn main() -> Result<(), Box> { - let tracing_config = - movement_tracing::Config { timing_log_path: env::var_os(TIMING_LOG_ENV).map(Into::into) }; - let _guard = movement_tracing::init_tracing_subscriber(tracing_config); + let tracing_config = movement_tracing::Config::from_env()?; + movement_tracing::init_tracing_subscriber(tracing_config)?; let dot_movement = dot_movement::DotMovement::try_from_env()?; let config_path = dot_movement.get_config_json_path(); diff --git a/protocol-units/da/m1/light-node/src/v1/passthrough.rs b/protocol-units/da/m1/light-node/src/v1/passthrough.rs index c8d1cc3e9..1db727988 100644 --- a/protocol-units/da/m1/light-node/src/v1/passthrough.rs +++ b/protocol-units/da/m1/light-node/src/v1/passthrough.rs @@ -146,7 +146,7 @@ impl LightNodeV1 { Ok(verified_blobs) } - #[tracing::instrument(target = "movement_timing", level = "debug")] + #[tracing::instrument(target = "movement_telemetry", level = "debug")] async fn get_blobs_at_height(&self, height: u64) -> Result, anyhow::Error> { let celestia_blobs = self.get_celestia_blobs_at_height(height).await?; let mut blobs = Vec::new(); diff --git a/protocol-units/da/m1/light-node/src/v1/sequencer.rs b/protocol-units/da/m1/light-node/src/v1/sequencer.rs index 55f03bd7c..294b6a9e6 100644 --- a/protocol-units/da/m1/light-node/src/v1/sequencer.rs +++ b/protocol-units/da/m1/light-node/src/v1/sequencer.rs @@ -79,17 +79,17 @@ impl LightNodeV1 { // this has an internal timeout based on its building time // so in the worst case scenario we will roughly double the internal timeout let uid = LOGGING_UID.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - debug!(target: "movement_timing", uid = %uid, "waiting_for_next_block",); + debug!(target: "movement_telemetry", uid = %uid, "waiting_for_next_block",); let block = memseq.wait_for_next_block().await?; match block { Some(block) => { - info!(target: "movement_timing", block_id = %block.id(), uid = %uid, transaction_count = block.transactions().len(), "received_block"); + info!(target: "movement_telemetry", block_id = %block.id(), uid = %uid, transaction_count = block.transactions().len(), "received_block"); sender.send(block).await?; Ok(()) } None => { // no transactions to include - debug!(target: "movement_timing", uid = %uid, "no_transactions_to_include"); + debug!(target: "movement_telemetry", uid = %uid, "no_transactions_to_include"); Ok(()) } } @@ -97,7 +97,7 @@ impl LightNodeV1 { async fn submit_blocks(&self, blocks: &Vec) -> Result<(), anyhow::Error> { for block in blocks { - info!(target: "movement_timing", block_id = %block.block.id(), "inner_submitting_block"); + info!(target: "movement_telemetry", block_id = %block.block.id(), "inner_submitting_block"); } // get references to celestia blobs in the wrapped blocks let block_blobs = blocks @@ -108,14 +108,14 @@ impl LightNodeV1 { // use deref on the wrapped block to get the blob self.pass_through.submit_celestia_blobs(&block_blobs).await?; for block in blocks { - info!(target: "movement_timing", block_id = %block.block.id(), "inner_submitted_block"); + info!(target: "movement_telemetry", block_id = %block.block.id(), "inner_submitted_block"); } Ok(()) } pub async fn submit_with_heuristic(&self, blocks: Vec) -> Result<(), anyhow::Error> { for block in &blocks { - info!(target: "movement_timing", block_id = %block.id(), "submitting_block"); + info!(target: "movement_telemetry", block_id = %block.id(), "submitting_block"); } // wrap the blocks in a struct that can be split and compressed @@ -168,7 +168,7 @@ impl LightNodeV1 { info!("block group results: {:?}", block_group_results); for block_group_result in &block_group_results { - info!(target: "movement_timing", block_group_result = ?block_group_result, "block_group_result"); + info!(target: "movement_telemetry", block_group_result = ?block_group_result, "block_group_result"); } Ok(()) @@ -204,7 +204,7 @@ impl LightNodeV1 { Err(_) => { // The operation timed out debug!( - target: "movement_timing", + target: "movement_telemetry", batch_size = blocks.len(), "timed_out_building_block" ); @@ -213,7 +213,7 @@ impl LightNodeV1 { } } - info!(target: "movement_timing", block_count = blocks.len(), "read_blocks"); + info!(target: "movement_telemetry", block_count = blocks.len(), "read_blocks"); Ok(blocks) } @@ -232,11 +232,11 @@ impl LightNodeV1 { // submit the blobs, resizing as needed for block_id in &ids { - info!(target: "movement_timing", %block_id, "submitting_block_batch"); + info!(target: "movement_telemetry", %block_id, "submitting_block_batch"); } self.submit_with_heuristic(blocks).await?; for block_id in &ids { - info!(target: "movement_timing", %block_id, "submitted_block_batch"); + info!(target: "movement_telemetry", %block_id, "submitted_block_batch"); } Ok(()) diff --git a/protocol-units/execution/opt-executor/src/executor/mod.rs b/protocol-units/execution/opt-executor/src/executor/mod.rs index 05dae3bfe..ab14a262e 100644 --- a/protocol-units/execution/opt-executor/src/executor/mod.rs +++ b/protocol-units/execution/opt-executor/src/executor/mod.rs @@ -46,7 +46,7 @@ impl Executor { self.transactions_in_flight .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { info!( - target: "movement_timing", + target: "movement_telemetry", count, current, "decrementing_transactions_in_flight", diff --git a/protocol-units/execution/opt-executor/src/transaction_pipe.rs b/protocol-units/execution/opt-executor/src/transaction_pipe.rs index 163157697..03abf4473 100644 --- a/protocol-units/execution/opt-executor/src/transaction_pipe.rs +++ b/protocol-units/execution/opt-executor/src/transaction_pipe.rs @@ -89,7 +89,7 @@ impl TransactionPipe { match request { MempoolClientRequest::SubmitTransaction(transaction, callback) => { let span = info_span!( - target: "movement_timing", + target: "movement_telemetry", "submit_transaction", tx_hash = %transaction.committed_hash(), sender = %transaction.sender(), @@ -124,13 +124,13 @@ impl TransactionPipe { // For now, we are going to consider a transaction in flight until it exits the mempool and is sent to the DA as is indicated by WriteBatch. let in_flight = self.transactions_in_flight.load(std::sync::atomic::Ordering::Relaxed); info!( - target: "movement_timing", + target: "movement_telemetry", in_flight = %in_flight, "transactions_in_flight" ); if in_flight > self.in_flight_limit { info!( - target: "movement_timing", + target: "movement_telemetry", "shedding_load" ); let status = MempoolStatus::new(MempoolStatusCode::MempoolIsFull); diff --git a/util/tracing/Cargo.toml b/util/tracing/Cargo.toml index eaf030410..dfb008c69 100644 --- a/util/tracing/Cargo.toml +++ b/util/tracing/Cargo.toml @@ -11,8 +11,12 @@ publish.workspace = true rust-version.workspace = true [dependencies] -tracing-appender = { workspace = true } -tracing-subscriber = { workspace = true, features = ["json"] } +anyhow = { workspace = true } +tracing-subscriber = { workspace = true } +opentelemetry = { workspace = true } +opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } +opentelemetry-otlp = { workspace = true } +tracing-opentelemetry = { workspace = true } #console-subscriber = { workspace = true } [lints] diff --git a/util/tracing/src/lib.rs b/util/tracing/src/lib.rs index f018f7e20..b920012d8 100644 --- a/util/tracing/src/lib.rs +++ b/util/tracing/src/lib.rs @@ -1,78 +1,63 @@ -use tracing_appender::non_blocking::WorkerGuard as AppenderGuard; +use anyhow::anyhow; +use opentelemetry::trace::TracerProvider as _; +use opentelemetry_otlp::WithExportConfig as _; +use opentelemetry_sdk::runtime; use tracing_subscriber::filter::{self, EnvFilter, LevelFilter}; -use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::prelude::*; -use std::{env, fs::File, path::PathBuf}; +use std::env; -const TIMING_ENV: &str = "MOVEMENT_TIMING"; - -/// The default path name for the timing log file. -/// If the path not specified in [`Config`] and the `MOVEMENT_TIMING` -/// environment variable is set, the log file with this name will be created. -pub const DEFAULT_TIMING_LOG_FILE: &str = "movement-timing.log"; - -/// A guard for background log appender(s) returned by `init_tracing_subscriber`. -pub struct WorkerGuard { - _drop_me: Option, -} +const OTLP_TRACING_ENV: &str = "MOVEMENT_OTLP"; /// Options for the tracing subscriber. #[derive(Default)] pub struct Config { - /// Custom name for the timing log file. - pub timing_log_path: Option, + /// URL of the collector endpoint using the OTLP gRPC protocol. + pub otlp_grpc_url: Option, +} + +impl Config { + /// Get the tracing configuration from well-known environment variables. + pub fn from_env() -> Result { + let otlp_grpc_url = match env::var(OTLP_TRACING_ENV) { + Ok(url) => Some(url), + Err(env::VarError::NotPresent) => None, + Err(env::VarError::NotUnicode(s)) => { + return Err(anyhow!( + "value of environment variable {OTLP_TRACING_ENV} is not valid UTF-8: {}", + s.to_string_lossy() + )); + } + }; + Ok(Self { otlp_grpc_url }) + } } /// Sets up the tracing subscribers for a Movement process. This should be /// called at the beginning of a process' `main` function. -/// Returns a guard object that should be dropped at the end of the process' -/// `main`` function scope. -/// -/// This function may output encounted errors to the standard error stream, -/// as this is the only facility -pub fn init_tracing_subscriber(config: Config) -> WorkerGuard { +pub fn init_tracing_subscriber(config: Config) -> Result<(), anyhow::Error> { // TODO: compose console_subscriber as a layer let env_filter = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) .from_env_lossy(); let log_layer = tracing_subscriber::fmt::layer().with_filter(env_filter); - let (timing_layer, timing_writer_guard) = match env::var(TIMING_ENV) { - Err(env::VarError::NotPresent) => { - // Disable timing - (None, None) - } - Ok(timing_directives) => { - let env_filter = EnvFilter::new(timing_directives); - let timing_log_path = config - .timing_log_path - .as_deref() - .unwrap_or_else(|| DEFAULT_TIMING_LOG_FILE.as_ref()); - match File::create(timing_log_path) { - Ok(file) => { - let (writer, guard) = tracing_appender::non_blocking(file); - let layer = tracing_subscriber::fmt::layer() - .with_writer(writer) - .json() - .with_span_events(FmtSpan::CLOSE) - .with_filter(env_filter) - .with_filter(filter::filter_fn(|meta| meta.target() == "movement_timing")); - (Some(layer), Some(guard)) - } - Err(e) => { - eprintln!("can't create `{}`: {}", timing_log_path.display(), e); - (None, None) - } - } - } - Err(e) => { - eprintln!("invalid {TIMING_ENV}: {e}"); - (None, None) - } + let telemetry_layer = if let Some(endpoint) = config.otlp_grpc_url { + let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(endpoint); + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(exporter) + .install_batch(runtime::Tokio)? + .tracer("movement_tracing"); + let layer = tracing_opentelemetry::layer() + .with_tracer(tracer) + .with_filter(filter::filter_fn(|meta| meta.target() == "movement_telemetry")); + Some(layer) + } else { + None }; - tracing_subscriber::registry().with(log_layer).with(timing_layer).init(); + tracing_subscriber::registry().with(log_layer).with(telemetry_layer).init(); - WorkerGuard { _drop_me: timing_writer_guard } + Ok(()) } From e9c804ddb53ce76dd24212220322aa732532e241 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Tue, 8 Oct 2024 17:01:22 +0300 Subject: [PATCH 02/10] feat(process-compose):telemetry overlay --- .../suzuka-full-node/process-compose.telemetry.yml | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 process-compose/suzuka-full-node/process-compose.telemetry.yml diff --git a/process-compose/suzuka-full-node/process-compose.telemetry.yml b/process-compose/suzuka-full-node/process-compose.telemetry.yml new file mode 100644 index 000000000..f7e9b4bb8 --- /dev/null +++ b/process-compose/suzuka-full-node/process-compose.telemetry.yml @@ -0,0 +1,11 @@ +version: "3" + +environment: + +processes: + suzuka-full-node: + env: + MOVEMENT_OTLP: http://localhost:4317 + m1-da-light-node: + env: + MOVEMENT_OTLP: http://localhost:4317 From d9509c8e0f519014702a6bcc5baf79aec749a6fc Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Thu, 10 Oct 2024 13:59:32 +0300 Subject: [PATCH 03/10] feat(tracing)!: break out telemetry Provide telemetry API as separate from tracing rather than a globally installed layer. Installing an OpenTelemetry layer into the global tracing subscriber raises nasty reentrancy issues because the OTLP exporter stack also uses tracing under the hood. --- Cargo.lock | 27 ++++--- Cargo.toml | 9 ++- .../src/bin/e2e/followers_consistent.rs | 3 +- networks/suzuka/suzuka-full-node/src/main.rs | 9 ++- protocol-units/da/m1/light-node/src/main.rs | 9 ++- util/tracing/Cargo.toml | 1 + util/tracing/src/lib.rs | 65 +--------------- util/tracing/src/telemetry.rs | 75 +++++++++++++++++++ util/tracing/src/tracing.rs | 11 +++ 9 files changed, 127 insertions(+), 82 deletions(-) create mode 100644 util/tracing/src/telemetry.rs create mode 100644 util/tracing/src/tracing.rs diff --git a/Cargo.lock b/Cargo.lock index 6452b9711..1d044a1d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9678,6 +9678,7 @@ dependencies = [ "anyhow", "opentelemetry", "opentelemetry-otlp", + "opentelemetry-semantic-conventions", "opentelemetry_sdk", "tracing-opentelemetry", "tracing-subscriber 0.3.18", @@ -10176,9 +10177,9 @@ dependencies = [ [[package]] name = "opentelemetry" -version = "0.25.0" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "803801d3d3b71cd026851a53f974ea03df3d179cb758b260136a6c9e22e196af" +checksum = "570074cc999d1a58184080966e5bd3bf3a9a4af650c3b05047c2621e7405cd17" dependencies = [ "futures-core", "futures-sink", @@ -10190,9 +10191,9 @@ dependencies = [ [[package]] name = "opentelemetry-otlp" -version = "0.25.0" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "596b1719b3cab83addb20bcbffdf21575279d9436d9ccccfe651a3bf0ab5ab06" +checksum = "29e1f9c8b032d4f635c730c0efcf731d5e2530ea13fa8bef7939ddc8420696bd" dependencies = [ "async-trait", "futures-core", @@ -10208,9 +10209,9 @@ dependencies = [ [[package]] name = "opentelemetry-proto" -version = "0.25.0" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c43620e8f93359eb7e627a3b16ee92d8585774986f24f2ab010817426c5ce61" +checksum = "c9d3968ce3aefdcca5c27e3c4ea4391b37547726a70893aab52d3de95d5f8b34" dependencies = [ "opentelemetry", "opentelemetry_sdk", @@ -10218,11 +10219,17 @@ dependencies = [ "tonic 0.12.3", ] +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db945c1eaea8ac6a9677185357480d215bb6999faa9f691d0c4d4d641eab7a09" + [[package]] name = "opentelemetry_sdk" -version = "0.25.0" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0da0d6b47a3dbc6e9c9e36a0520e25cf943e046843818faaa3f87365a548c82" +checksum = "d2c627d9f4c9cdc1f21a29ee4bfbd6028fcb8bcf2a857b43f3abdf72c9c862f3" dependencies = [ "async-trait", "futures-channel", @@ -14435,9 +14442,9 @@ dependencies = [ [[package]] name = "tracing-opentelemetry" -version = "0.26.0" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5eabc56d23707ad55ba2a0750fc24767125d5a0f51993ba41ad2c441cc7b8dea" +checksum = "dc58af5d3f6c5811462cabb3289aec0093f7338e367e5a33d28c0433b3c7360b" dependencies = [ "js-sys", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index e0610c224..8f6974251 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -246,9 +246,10 @@ move-vm-ext = { path = "types/move-vm-ext" } num-derive = "0.4.2" num-traits = "0.2.14" once_cell = "1.8.0" -opentelemetry = { version = "0.25" } -opentelemetry_sdk = { version = "0.25", features = ["rt-tokio"] } -opentelemetry-otlp = { version = "0.25" } +opentelemetry = { version = "0.26" } +opentelemetry-otlp = { version = "0.26" } +opentelemetry_sdk = { version = "0.26", features = ["rt-tokio"] } +opentelemetry-semantic-conventions = { version = "0.26" } parking_lot = { version = "0.12.1" } poem = { version = "=1.3.59", features = ["anyhow", "rustls"] } poem-openapi = { version = "=2.0.11", features = ["swagger-ui", "url"] } @@ -286,7 +287,7 @@ tonic-reflection = "0.11" tonic-web = "0.11" ### To try (experimental) std support, add `features = [ "std" ]` to risc0-zkvm tracing = "0.1.40" -tracing-opentelemetry = { version = "0.26" } +tracing-opentelemetry = { version = "0.27" } tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-test = "0.2.5" trie-db = "0.28.0" diff --git a/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs b/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs index b16d9c170..b532120ab 100644 --- a/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs +++ b/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs @@ -245,8 +245,7 @@ pub async fn basic_coin_transfers( #[tokio::main] async fn main() -> Result<(), anyhow::Error> { - let tracing_config = movement_tracing::Config::from_env()?; - movement_tracing::init_tracing_subscriber(tracing_config)?; + movement_tracing::init_tracing_subscriber(); // get the lead dot movement from the environment let dot_movement = DotMovement::try_from_env()?; diff --git a/networks/suzuka/suzuka-full-node/src/main.rs b/networks/suzuka/suzuka-full-node/src/main.rs index 29f71faa9..f7d639da2 100644 --- a/networks/suzuka/suzuka-full-node/src/main.rs +++ b/networks/suzuka/suzuka-full-node/src/main.rs @@ -4,8 +4,13 @@ use std::process::ExitCode; #[tokio::main] async fn main() -> Result { - let tracing_config = movement_tracing::Config::from_env()?; - movement_tracing::init_tracing_subscriber(tracing_config)?; + movement_tracing::init_tracing_subscriber(); + let tracing_config = movement_tracing::telemetry::Config::from_env()?; + movement_tracing::telemetry::init_tracer_provider( + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_VERSION"), + tracing_config, + )?; // get the config file let dot_movement = dot_movement::DotMovement::try_from_env()?; diff --git a/protocol-units/da/m1/light-node/src/main.rs b/protocol-units/da/m1/light-node/src/main.rs index f72cf65b1..7a57a4081 100644 --- a/protocol-units/da/m1/light-node/src/main.rs +++ b/protocol-units/da/m1/light-node/src/main.rs @@ -2,8 +2,13 @@ use m1_da_light_node::v1::{LightNodeV1, Manager}; #[tokio::main] async fn main() -> Result<(), Box> { - let tracing_config = movement_tracing::Config::from_env()?; - movement_tracing::init_tracing_subscriber(tracing_config)?; + movement_tracing::init_tracing_subscriber(); + let tracing_config = movement_tracing::telemetry::Config::from_env()?; + movement_tracing::telemetry::init_tracer_provider( + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_VERSION"), + tracing_config, + )?; let dot_movement = dot_movement::DotMovement::try_from_env()?; let config_path = dot_movement.get_config_json_path(); diff --git a/util/tracing/Cargo.toml b/util/tracing/Cargo.toml index dfb008c69..42b38fb7e 100644 --- a/util/tracing/Cargo.toml +++ b/util/tracing/Cargo.toml @@ -16,6 +16,7 @@ tracing-subscriber = { workspace = true } opentelemetry = { workspace = true } opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } opentelemetry-otlp = { workspace = true } +opentelemetry-semantic-conventions = { workspace = true } tracing-opentelemetry = { workspace = true } #console-subscriber = { workspace = true } diff --git a/util/tracing/src/lib.rs b/util/tracing/src/lib.rs index b920012d8..2e2ea0613 100644 --- a/util/tracing/src/lib.rs +++ b/util/tracing/src/lib.rs @@ -1,63 +1,4 @@ -use anyhow::anyhow; -use opentelemetry::trace::TracerProvider as _; -use opentelemetry_otlp::WithExportConfig as _; -use opentelemetry_sdk::runtime; -use tracing_subscriber::filter::{self, EnvFilter, LevelFilter}; -use tracing_subscriber::prelude::*; +pub mod telemetry; +mod tracing; -use std::env; - -const OTLP_TRACING_ENV: &str = "MOVEMENT_OTLP"; - -/// Options for the tracing subscriber. -#[derive(Default)] -pub struct Config { - /// URL of the collector endpoint using the OTLP gRPC protocol. - pub otlp_grpc_url: Option, -} - -impl Config { - /// Get the tracing configuration from well-known environment variables. - pub fn from_env() -> Result { - let otlp_grpc_url = match env::var(OTLP_TRACING_ENV) { - Ok(url) => Some(url), - Err(env::VarError::NotPresent) => None, - Err(env::VarError::NotUnicode(s)) => { - return Err(anyhow!( - "value of environment variable {OTLP_TRACING_ENV} is not valid UTF-8: {}", - s.to_string_lossy() - )); - } - }; - Ok(Self { otlp_grpc_url }) - } -} - -/// Sets up the tracing subscribers for a Movement process. This should be -/// called at the beginning of a process' `main` function. -pub fn init_tracing_subscriber(config: Config) -> Result<(), anyhow::Error> { - // TODO: compose console_subscriber as a layer - let env_filter = EnvFilter::builder() - .with_default_directive(LevelFilter::INFO.into()) - .from_env_lossy(); - let log_layer = tracing_subscriber::fmt::layer().with_filter(env_filter); - - let telemetry_layer = if let Some(endpoint) = config.otlp_grpc_url { - let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(endpoint); - let tracer = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter(exporter) - .install_batch(runtime::Tokio)? - .tracer("movement_tracing"); - let layer = tracing_opentelemetry::layer() - .with_tracer(tracer) - .with_filter(filter::filter_fn(|meta| meta.target() == "movement_telemetry")); - Some(layer) - } else { - None - }; - - tracing_subscriber::registry().with(log_layer).with(telemetry_layer).init(); - - Ok(()) -} +pub use tracing::init_tracing_subscriber; diff --git a/util/tracing/src/telemetry.rs b/util/tracing/src/telemetry.rs new file mode 100644 index 000000000..254fb00eb --- /dev/null +++ b/util/tracing/src/telemetry.rs @@ -0,0 +1,75 @@ +//! OpenTelemetry support for Movement services. +//! +//! Telemetry is currently being exported to components as an API distinct +//! from the tracing framework, due to [issues][tracing-opentelemetry#159] +//! with integrating OpenTelemetry as a tracing subscriber. +//! +//! [tracing-opentelemetry#159]: https://github.com/tokio-rs/tracing-opentelemetry/issues/159 + +use anyhow::anyhow; +use opentelemetry::global::{self, BoxedTracer}; +use opentelemetry::trace::noop::NoopTracerProvider; +use opentelemetry::KeyValue; +use opentelemetry_otlp::WithExportConfig as _; +use opentelemetry_sdk::{runtime, trace::Config as TraceConfig, Resource}; +use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION}; + +use std::env; + +const OTLP_TRACING_ENV: &str = "MOVEMENT_OTLP"; + +/// Options for telemetry configuration. +#[derive(Default)] +pub struct Config { + /// URL of the collector endpoint using the OTLP gRPC protocol. + pub otlp_grpc_url: Option, +} + +impl Config { + /// Get the tracing configuration from well-known environment variables. + pub fn from_env() -> Result { + let otlp_grpc_url = match env::var(OTLP_TRACING_ENV) { + Ok(url) => Some(url), + Err(env::VarError::NotPresent) => None, + Err(env::VarError::NotUnicode(s)) => { + return Err(anyhow!( + "value of environment variable {OTLP_TRACING_ENV} is not valid UTF-8: {}", + s.to_string_lossy() + )); + } + }; + Ok(Self { otlp_grpc_url }) + } +} + +/// Global initialization of the OpenTelemetry tracer provider. +/// +/// This function should be called at the start of the program before any +/// threads are able to use OpenTelemetry tracers. The function will panic +/// if not called within a Tokio runtime. +pub fn init_tracer_provider( + service_name: &'static str, + service_version: &'static str, + config: Config, +) -> Result<(), anyhow::Error> { + if let Some(endpoint) = config.otlp_grpc_url { + let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(endpoint); + let provider = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(exporter) + .with_trace_config(TraceConfig::default().with_resource(Resource::new([ + KeyValue::new(SERVICE_NAME, service_name), + KeyValue::new(SERVICE_VERSION, service_version), + ]))) + .install_batch(runtime::Tokio)?; + global::set_tracer_provider(provider); + } else { + global::set_tracer_provider(NoopTracerProvider::new()); + } + Ok(()) +} + +/// Get the tracer configured for the process. +pub fn tracer() -> BoxedTracer { + global::tracer("movement") +} diff --git a/util/tracing/src/tracing.rs b/util/tracing/src/tracing.rs new file mode 100644 index 000000000..0c6514e30 --- /dev/null +++ b/util/tracing/src/tracing.rs @@ -0,0 +1,11 @@ +use tracing_subscriber::filter::{EnvFilter, LevelFilter}; + +/// Sets up the tracing subscribers for a Movement process. This should be +/// called at the beginning of a process' `main` function. +pub fn init_tracing_subscriber() { + // TODO: compose console_subscriber as a layer + let env_filter = EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(); + tracing_subscriber::fmt().with_env_filter(env_filter).init(); +} From ea573485e132f3efdea523e6247275ff161f53e6 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Thu, 10 Oct 2024 15:07:53 +0300 Subject: [PATCH 04/10] feat: produce some telemetry spans and events Replace the "movement_telemetry" traces with tracing with OpenTelemetry API, using the tracer provided by movement-tracing. --- Cargo.lock | 4 +- networks/suzuka/suzuka-full-node/Cargo.toml | 1 + .../src/tasks/execute_settle.rs | 23 +++++++--- .../execution/opt-executor/Cargo.toml | 7 +-- .../opt-executor/src/transaction_pipe.rs | 43 +++++++++++-------- 5 files changed, 51 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1d044a1d9..6abf95e99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8676,7 +8676,9 @@ dependencies = [ "lazy_static", "maptos-execution-util", "movement-rest", + "movement-tracing", "movement-types", + "opentelemetry", "poem", "poem-openapi", "rand 0.7.3", @@ -8687,7 +8689,6 @@ dependencies = [ "tempfile", "thiserror", "tokio", - "tonic 0.11.0", "tracing", "tracing-test", ] @@ -13461,6 +13462,7 @@ dependencies = [ "movement-rest", "movement-tracing", "movement-types", + "opentelemetry", "rocksdb", "serde_json", "sha2 0.10.8", diff --git a/networks/suzuka/suzuka-full-node/Cargo.toml b/networks/suzuka/suzuka-full-node/Cargo.toml index 27ce49fa2..5ed3cf198 100644 --- a/networks/suzuka/suzuka-full-node/Cargo.toml +++ b/networks/suzuka/suzuka-full-node/Cargo.toml @@ -27,6 +27,7 @@ tonic = { workspace = true } movement-types = { workspace = true } movement-rest = { workspace = true } movement-tracing = { workspace = true } +opentelemetry = { workspace = true } suzuka-config = { workspace = true } dot-movement = { workspace = true } godfig = { workspace = true } diff --git a/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs b/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs index c278f6641..6ca37cdb8 100644 --- a/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs +++ b/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs @@ -11,14 +11,17 @@ use maptos_dof_execution::{ SignatureVerifiedTransaction, SignedTransaction, Transaction, }; use mcr_settlement_manager::{CommitmentEventStream, McrSettlementManagerOperations}; +use movement_tracing::telemetry; use movement_types::block::{Block, BlockCommitment, BlockCommitmentEvent}; -use anyhow::Context; +use anyhow::Context as _; use futures::{future::Either, stream}; +use opentelemetry::trace::{FutureExt as _, TraceContextExt as _, Tracer as _}; +use opentelemetry::{Context as OtelContext, KeyValue}; use suzuka_config::execution_extension; use tokio::select; use tokio_stream::{Stream, StreamExt}; -use tracing::{debug, error, info, info_span, Instrument}; +use tracing::{debug, error, info}; pub struct Task { executor: E, @@ -138,11 +141,19 @@ where }) .await??; - // get the transactions + // get the transactions count before the block is consumed let transactions_count = block.transactions().len(); - let span = info_span!(target: "movement_telemetry", "execute_block", id = %block_id); - let commitment = - self.execute_block_with_retries(block, block_timestamp).instrument(span).await?; + + // execute the block + let tracer = telemetry::tracer(); + let span = tracer + .span_builder("execute_block") + .with_attributes([KeyValue::new("id", block_id.to_string())]) + .start(&tracer); + let commitment = self + .execute_block_with_retries(block, block_timestamp) + .with_context(OtelContext::current_with_span(span)) + .await?; // decrement the number of transactions in flight on the executor self.executor.decrement_transactions_in_flight(transactions_count as u64); diff --git a/protocol-units/execution/opt-executor/Cargo.toml b/protocol-units/execution/opt-executor/Cargo.toml index e084c33a2..155c317fa 100644 --- a/protocol-units/execution/opt-executor/Cargo.toml +++ b/protocol-units/execution/opt-executor/Cargo.toml @@ -28,6 +28,7 @@ poem = { workspace = true } poem-openapi = { workspace = true } derive_more = { workspace = true, default-features = true } lazy_static = "1.4.0" +opentelemetry = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } rand = { workspace = true } @@ -59,15 +60,15 @@ aptos-mempool = { workspace = true } aptos-temppath = { workspace = true } aptos-faucet-core = { workspace = true } aptos-cached-packages = { workspace = true } -maptos-execution-util = { workspace = true } -movement-types = { workspace = true } aptos-indexer-grpc-fullnode = { workspace = true } aptos-indexer-grpc-table-info = { workspace = true } aptos-indexer = { workspace = true } aptos-protos = { workspace = true } aptos-logger = { workspace = true } -tonic = { workspace = true } +maptos-execution-util = { workspace = true } movement-rest = { workspace = true } +movement-tracing = { workspace = true } +movement-types = { workspace = true } [dev-dependencies] dirs = { workspace = true } diff --git a/protocol-units/execution/opt-executor/src/transaction_pipe.rs b/protocol-units/execution/opt-executor/src/transaction_pipe.rs index 03abf4473..6bc711755 100644 --- a/protocol-units/execution/opt-executor/src/transaction_pipe.rs +++ b/protocol-units/execution/opt-executor/src/transaction_pipe.rs @@ -1,4 +1,5 @@ //! Task processing incoming transactions for the opt API. +use movement_tracing::telemetry; use aptos_config::config::NodeConfig; use aptos_mempool::core_mempool::CoreMempool; @@ -12,9 +13,11 @@ use aptos_vm_validator::vm_validator::{self, TransactionValidation, VMValidator} use futures::channel::mpsc as futures_mpsc; use futures::StreamExt; +use opentelemetry::trace::{FutureExt as _, TraceContextExt as _, Tracer as _}; +use opentelemetry::{Context as OtelContext, KeyValue}; use thiserror::Error; use tokio::sync::mpsc; -use tracing::{debug, info, info_span, warn, Instrument}; +use tracing::{debug, info, warn}; use std::sync::{atomic::AtomicU64, Arc}; use std::time::{Duration, Instant}; @@ -88,14 +91,22 @@ impl TransactionPipe { if let Some(request) = next { match request { MempoolClientRequest::SubmitTransaction(transaction, callback) => { - let span = info_span!( - target: "movement_telemetry", - "submit_transaction", - tx_hash = %transaction.committed_hash(), - sender = %transaction.sender(), - sequence_number = transaction.sequence_number(), - ); - let status = self.submit_transaction(transaction).instrument(span).await?; + let tracer = telemetry::tracer(); + let span = tracer + .span_builder("submit_transaction") + .with_attributes([ + KeyValue::new("tx_hash", transaction.committed_hash().to_string()), + KeyValue::new("sender", transaction.sender().to_string()), + KeyValue::new( + "sequence_number", + transaction.sequence_number().to_string(), + ), + ]) + .start(&tracer); + let status = self + .submit_transaction(transaction) + .with_context(OtelContext::current_with_span(span)) + .await?; callback.send(Ok(status)).unwrap_or_else(|_| { debug!("SubmitTransaction request canceled"); }); @@ -123,16 +134,14 @@ impl TransactionPipe { ) -> Result { // For now, we are going to consider a transaction in flight until it exits the mempool and is sent to the DA as is indicated by WriteBatch. let in_flight = self.transactions_in_flight.load(std::sync::atomic::Ordering::Relaxed); - info!( - target: "movement_telemetry", - in_flight = %in_flight, - "transactions_in_flight" + let otel_cx = OtelContext::current(); + let otel_span = otel_cx.span(); + otel_span.add_event( + "transactions_in_flight", + vec![KeyValue::new("in_flight", in_flight.to_string())], ); if in_flight > self.in_flight_limit { - info!( - target: "movement_telemetry", - "shedding_load" - ); + otel_span.add_event("shedding_load", vec![]); let status = MempoolStatus::new(MempoolStatusCode::MempoolIsFull); return Ok((status, None)); } From e27304a9aa6a1032a6ae50ab6f1ebb0be1048c3e Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Thu, 10 Oct 2024 15:49:33 +0300 Subject: [PATCH 05/10] feat(process-compose): add OTLP collector In the telemetry overlay for suzuka-full-node, add an OTLP collector start job running a docker container. --- .../suzuka-full-node/process-compose.telemetry.yml | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/process-compose/suzuka-full-node/process-compose.telemetry.yml b/process-compose/suzuka-full-node/process-compose.telemetry.yml index f7e9b4bb8..1bcefdc69 100644 --- a/process-compose/suzuka-full-node/process-compose.telemetry.yml +++ b/process-compose/suzuka-full-node/process-compose.telemetry.yml @@ -3,9 +3,22 @@ version: "3" environment: processes: + otlp-collector: + is_daemon: true + command: | + docker run -d --rm --name suzuka-otlp-collector -p16686:16686 -p4317:4317 -e COLLECTOR_OTLP_ENABLED=true jaegertracing/all-in-one:latest + shutdown: + command: | + docker stop suzuka-otlp-collector suzuka-full-node: + depends_on: + otlp-collector: + condition: process_started env: MOVEMENT_OTLP: http://localhost:4317 m1-da-light-node: + depends_on: + otlp-collector: + condition: process_started env: MOVEMENT_OTLP: http://localhost:4317 From 3baea3ec38532778c3b38f5e746d286544147d52 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Thu, 10 Oct 2024 17:00:17 +0300 Subject: [PATCH 06/10] fix(process-compose): correctly specify enviroment --- .../suzuka-full-node/process-compose.telemetry.yml | 8 ++++---- process-compose/suzuka-full-node/process-compose.yml | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/process-compose/suzuka-full-node/process-compose.telemetry.yml b/process-compose/suzuka-full-node/process-compose.telemetry.yml index 1bcefdc69..88180844b 100644 --- a/process-compose/suzuka-full-node/process-compose.telemetry.yml +++ b/process-compose/suzuka-full-node/process-compose.telemetry.yml @@ -14,11 +14,11 @@ processes: depends_on: otlp-collector: condition: process_started - env: - MOVEMENT_OTLP: http://localhost:4317 + environment: + - MOVEMENT_OTLP=http://localhost:4317 m1-da-light-node: depends_on: otlp-collector: condition: process_started - env: - MOVEMENT_OTLP: http://localhost:4317 + environment: + - MOVEMENT_OTLP=http://localhost:4317 diff --git a/process-compose/suzuka-full-node/process-compose.yml b/process-compose/suzuka-full-node/process-compose.yml index cfb8a1365..52381e599 100644 --- a/process-compose/suzuka-full-node/process-compose.yml +++ b/process-compose/suzuka-full-node/process-compose.yml @@ -44,8 +44,8 @@ processes: suzuka-full-node: command: | suzuka-full-node - env: - RUST_LOG: info,aptos-indexer=debug + environment: + - RUST_LOG=info depends_on: m1-da-light-node: condition: process_healthy From a249da59cf46d334c39b42b47dc76bf83598d445 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Thu, 10 Oct 2024 17:01:18 +0300 Subject: [PATCH 07/10] feat(tracing): Debug for telemetry::Config --- util/tracing/src/telemetry.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/util/tracing/src/telemetry.rs b/util/tracing/src/telemetry.rs index 254fb00eb..921fd3382 100644 --- a/util/tracing/src/telemetry.rs +++ b/util/tracing/src/telemetry.rs @@ -19,7 +19,7 @@ use std::env; const OTLP_TRACING_ENV: &str = "MOVEMENT_OTLP"; /// Options for telemetry configuration. -#[derive(Default)] +#[derive(Debug, Default)] pub struct Config { /// URL of the collector endpoint using the OTLP gRPC protocol. pub otlp_grpc_url: Option, @@ -53,6 +53,7 @@ pub fn init_tracer_provider( config: Config, ) -> Result<(), anyhow::Error> { if let Some(endpoint) = config.otlp_grpc_url { + dbg!(&endpoint); let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(endpoint); let provider = opentelemetry_otlp::new_pipeline() .tracing() @@ -62,6 +63,7 @@ pub fn init_tracer_provider( KeyValue::new(SERVICE_VERSION, service_version), ]))) .install_batch(runtime::Tokio)?; + dbg!(&provider); global::set_tracer_provider(provider); } else { global::set_tracer_provider(NoopTracerProvider::new()); From 1ff9351608d983aee5e61fcbe1924358244d20f6 Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Thu, 10 Oct 2024 23:39:30 +0300 Subject: [PATCH 08/10] chore(tracing): remove debug printouts --- util/tracing/src/telemetry.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/util/tracing/src/telemetry.rs b/util/tracing/src/telemetry.rs index 921fd3382..960da7f2e 100644 --- a/util/tracing/src/telemetry.rs +++ b/util/tracing/src/telemetry.rs @@ -53,7 +53,6 @@ pub fn init_tracer_provider( config: Config, ) -> Result<(), anyhow::Error> { if let Some(endpoint) = config.otlp_grpc_url { - dbg!(&endpoint); let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(endpoint); let provider = opentelemetry_otlp::new_pipeline() .tracing() @@ -63,7 +62,6 @@ pub fn init_tracer_provider( KeyValue::new(SERVICE_VERSION, service_version), ]))) .install_batch(runtime::Tokio)?; - dbg!(&provider); global::set_tracer_provider(provider); } else { global::set_tracer_provider(NoopTracerProvider::new()); From cc08d111ac307e41c5a6c7926fe8951d95190b3a Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Fri, 11 Oct 2024 00:18:03 +0300 Subject: [PATCH 09/10] feat(suzuka-full-node): telemetry in tx ingress Converted "movement_telemetry" tracing to OpenTelemetry in the transaction_ingress task module. --- .../src/tasks/transaction_ingress.rs | 64 +++++++++++++------ 1 file changed, 43 insertions(+), 21 deletions(-) diff --git a/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs b/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs index b47d20a6f..f56345e1b 100644 --- a/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs +++ b/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs @@ -3,12 +3,15 @@ use m1_da_light_node_client::{BatchWriteRequest, BlobWrite, LightNodeServiceClient}; use m1_da_light_node_util::config::Config as LightNodeConfig; use maptos_dof_execution::SignedTransaction; +use movement_tracing::telemetry; +use opentelemetry::trace::{FutureExt as _, TraceContextExt as _, Tracer as _}; +use opentelemetry::{Context as OtelContext, KeyValue}; use tokio::sync::mpsc; -use tracing::{info, warn}; +use tracing::warn; use std::ops::ControlFlow; -use std::sync::atomic::AtomicU64; +use std::sync::atomic::{self, AtomicU64}; use std::time::{Duration, Instant}; const LOGGING_UID: AtomicU64 = AtomicU64::new(0); @@ -29,8 +32,21 @@ impl Task { } pub async fn run(mut self) -> anyhow::Result<()> { - while let ControlFlow::Continue(()) = self.spawn_write_next_transaction_batch().await? {} - Ok(()) + let tracer = telemetry::tracer(); + loop { + let batch_id = LOGGING_UID.fetch_add(1, atomic::Ordering::Relaxed); + let span = tracer + .span_builder("build_batch") + .with_attributes([KeyValue::new("batch_id", batch_id.to_string())]) + .start(&tracer); + if let ControlFlow::Break(()) = self + .spawn_write_next_transaction_batch() + .with_context(OtelContext::current_with_span(span)) + .await? + { + break Ok(()); + } + } } /// Constructs a batch of transactions then spawns the write request to the DA in the background. @@ -45,7 +61,6 @@ impl Task { let mut transactions = Vec::new(); - let batch_id = LOGGING_UID.fetch_add(1, std::sync::atomic::Ordering::SeqCst); loop { let remaining = match half_building_time.checked_sub(start.elapsed().as_millis() as u64) { @@ -64,13 +79,17 @@ impl Task { { Ok(transaction) => match transaction { Some(transaction) => { - info!( - target : "movement_telemetry", - batch_id = %batch_id, - tx_hash = %transaction.committed_hash(), - sender = %transaction.sender(), - sequence_number = transaction.sequence_number(), - "received transaction", + let otel_cx = OtelContext::current(); + otel_cx.span().add_event( + "received_transaction", + vec![ + KeyValue::new("tx_hash", transaction.committed_hash().to_string()), + KeyValue::new("sender", transaction.sender().to_string()), + KeyValue::new( + "sequence_number", + transaction.sequence_number().to_string(), + ), + ], ); let serialized_aptos_transaction = serde_json::to_vec(&transaction)?; let movement_transaction = movement_types::transaction::Transaction::new( @@ -92,20 +111,23 @@ impl Task { } if transactions.len() > 0 { - info!( - target: "movement_telemetry", - batch_id = %batch_id, - transaction_count = transactions.len(), - "built_batch_write" + let otel_cx = OtelContext::current(); + otel_cx.span().add_event( + "built_batch_write", + vec![KeyValue::new("transaction_count", transactions.len().to_string())], ); let batch_write = BatchWriteRequest { blobs: transactions }; // spawn the actual batch write request in the background + let write_span = telemetry::tracer().start_with_context("batch_write", &otel_cx); let mut da_light_node_client = self.da_light_node_client.clone(); - tokio::spawn(async move { - if let Err(e) = da_light_node_client.batch_write(batch_write).await { - warn!("failed to write batch to DA: {:?}", e); + tokio::spawn( + async move { + if let Err(e) = da_light_node_client.batch_write(batch_write).await { + warn!("failed to write batch to DA: {:?}", e); + } } - }); + .with_context(otel_cx.with_span(write_span)), + ); } Ok(Continue(())) From 2d52cc89a80385122bffa4742e3b37377cc6765b Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Fri, 11 Oct 2024 10:43:50 +0300 Subject: [PATCH 10/10] feat: send numeric telemetry values The integer values are better represented as such in telemetry data rather than converted to strings. --- .../suzuka/suzuka-full-node/src/tasks/execute_settle.rs | 2 +- .../suzuka-full-node/src/tasks/transaction_ingress.rs | 6 +++--- .../execution/opt-executor/src/transaction_pipe.rs | 9 +++------ 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs b/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs index 6ca37cdb8..0c09e20f3 100644 --- a/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs +++ b/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs @@ -163,7 +163,7 @@ where self.da_db.set_synced_height(da_height - 1).await?; // set the block as executed - self.da_db.add_executed_block(block_id.to_string()).await?; + self.da_db.add_executed_block(block_id.clone()).await?; // todo: this needs defaults if self.settlement_enabled() { diff --git a/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs b/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs index f56345e1b..ab85118f7 100644 --- a/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs +++ b/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs @@ -37,7 +37,7 @@ impl Task { let batch_id = LOGGING_UID.fetch_add(1, atomic::Ordering::Relaxed); let span = tracer .span_builder("build_batch") - .with_attributes([KeyValue::new("batch_id", batch_id.to_string())]) + .with_attributes([KeyValue::new("batch_id", batch_id as i64)]) .start(&tracer); if let ControlFlow::Break(()) = self .spawn_write_next_transaction_batch() @@ -87,7 +87,7 @@ impl Task { KeyValue::new("sender", transaction.sender().to_string()), KeyValue::new( "sequence_number", - transaction.sequence_number().to_string(), + transaction.sequence_number() as i64, ), ], ); @@ -114,7 +114,7 @@ impl Task { let otel_cx = OtelContext::current(); otel_cx.span().add_event( "built_batch_write", - vec![KeyValue::new("transaction_count", transactions.len().to_string())], + vec![KeyValue::new("transaction_count", transactions.len() as i64)], ); let batch_write = BatchWriteRequest { blobs: transactions }; // spawn the actual batch write request in the background diff --git a/protocol-units/execution/opt-executor/src/transaction_pipe.rs b/protocol-units/execution/opt-executor/src/transaction_pipe.rs index 6bc711755..7fdf0fd33 100644 --- a/protocol-units/execution/opt-executor/src/transaction_pipe.rs +++ b/protocol-units/execution/opt-executor/src/transaction_pipe.rs @@ -17,7 +17,7 @@ use opentelemetry::trace::{FutureExt as _, TraceContextExt as _, Tracer as _}; use opentelemetry::{Context as OtelContext, KeyValue}; use thiserror::Error; use tokio::sync::mpsc; -use tracing::{debug, info, warn}; +use tracing::{debug, warn}; use std::sync::{atomic::AtomicU64, Arc}; use std::time::{Duration, Instant}; @@ -97,10 +97,7 @@ impl TransactionPipe { .with_attributes([ KeyValue::new("tx_hash", transaction.committed_hash().to_string()), KeyValue::new("sender", transaction.sender().to_string()), - KeyValue::new( - "sequence_number", - transaction.sequence_number().to_string(), - ), + KeyValue::new("sequence_number", transaction.sequence_number() as i64), ]) .start(&tracer); let status = self @@ -138,7 +135,7 @@ impl TransactionPipe { let otel_span = otel_cx.span(); otel_span.add_event( "transactions_in_flight", - vec![KeyValue::new("in_flight", in_flight.to_string())], + vec![KeyValue::new("in_flight", in_flight as i64)], ); if in_flight > self.in_flight_limit { otel_span.add_event("shedding_load", vec![]);