diff --git a/Cargo.lock b/Cargo.lock index 381329605..2cf8a1a69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8677,9 +8677,7 @@ dependencies = [ "lazy_static", "maptos-execution-util", "movement-rest", - "movement-tracing", "movement-types", - "opentelemetry", "poem", "poem-openapi", "rand 0.7.3", @@ -9682,6 +9680,7 @@ dependencies = [ "opentelemetry-otlp", "opentelemetry-semantic-conventions", "opentelemetry_sdk", + "tracing", "tracing-opentelemetry", "tracing-subscriber 0.3.18", ] @@ -13463,7 +13462,6 @@ dependencies = [ "movement-rest", "movement-tracing", "movement-types", - "opentelemetry", "rocksdb", "serde_json", "sha2 0.10.8", diff --git a/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs b/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs index b532120ab..7dd2481b5 100644 --- a/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs +++ b/networks/suzuka/suzuka-client/src/bin/e2e/followers_consistent.rs @@ -245,7 +245,12 @@ pub async fn basic_coin_transfers( #[tokio::main] async fn main() -> Result<(), anyhow::Error> { - movement_tracing::init_tracing_subscriber(); + let tracing_config = movement_tracing::Config::from_env()?; + movement_tracing::init_tracing_subscriber( + env!("CARGO_BIN_NAME"), + env!("CARGO_PKG_VERSION"), + &tracing_config, + )?; // get the lead dot movement from the environment let dot_movement = DotMovement::try_from_env()?; diff --git a/networks/suzuka/suzuka-full-node/Cargo.toml b/networks/suzuka/suzuka-full-node/Cargo.toml index 5ed3cf198..27ce49fa2 100644 --- a/networks/suzuka/suzuka-full-node/Cargo.toml +++ b/networks/suzuka/suzuka-full-node/Cargo.toml @@ -27,7 +27,6 @@ tonic = { workspace = true } movement-types = { workspace = true } movement-rest = { workspace = true } movement-tracing = { workspace = true } -opentelemetry = { workspace = true } suzuka-config = { workspace = true } dot-movement = { workspace = true } godfig = { workspace = true } diff --git a/networks/suzuka/suzuka-full-node/src/main.rs b/networks/suzuka/suzuka-full-node/src/main.rs index f7d639da2..3fbddf9b1 100644 --- a/networks/suzuka/suzuka-full-node/src/main.rs +++ b/networks/suzuka/suzuka-full-node/src/main.rs @@ -4,12 +4,11 @@ use std::process::ExitCode; #[tokio::main] async fn main() -> Result { - movement_tracing::init_tracing_subscriber(); - let tracing_config = movement_tracing::telemetry::Config::from_env()?; - movement_tracing::telemetry::init_tracer_provider( - env!("CARGO_PKG_NAME"), + let tracing_config = movement_tracing::Config::from_env()?; + movement_tracing::init_tracing_subscriber( + env!("CARGO_BIN_NAME"), env!("CARGO_PKG_VERSION"), - tracing_config, + &tracing_config, )?; // get the config file diff --git a/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs b/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs index 0c09e20f3..e47297b1f 100644 --- a/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs +++ b/networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs @@ -11,17 +11,14 @@ use maptos_dof_execution::{ SignatureVerifiedTransaction, SignedTransaction, Transaction, }; use mcr_settlement_manager::{CommitmentEventStream, McrSettlementManagerOperations}; -use movement_tracing::telemetry; use movement_types::block::{Block, BlockCommitment, BlockCommitmentEvent}; -use anyhow::Context as _; +use anyhow::Context; use futures::{future::Either, stream}; -use opentelemetry::trace::{FutureExt as _, TraceContextExt as _, Tracer as _}; -use opentelemetry::{Context as OtelContext, KeyValue}; use suzuka_config::execution_extension; use tokio::select; use tokio_stream::{Stream, StreamExt}; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, info_span, Instrument}; pub struct Task { executor: E, @@ -143,17 +140,9 @@ where // get the transactions count before the block is consumed let transactions_count = block.transactions().len(); - - // execute the block - let tracer = telemetry::tracer(); - let span = tracer - .span_builder("execute_block") - .with_attributes([KeyValue::new("id", block_id.to_string())]) - .start(&tracer); - let commitment = self - .execute_block_with_retries(block, block_timestamp) - .with_context(OtelContext::current_with_span(span)) - .await?; + let span = info_span!(target: "movement_telemetry", "execute_block", id = %block_id); + let commitment = + self.execute_block_with_retries(block, block_timestamp).instrument(span).await?; // decrement the number of transactions in flight on the executor self.executor.decrement_transactions_in_flight(transactions_count as u64); diff --git a/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs b/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs index ab85118f7..140511995 100644 --- a/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs +++ b/networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs @@ -3,12 +3,9 @@ use m1_da_light_node_client::{BatchWriteRequest, BlobWrite, LightNodeServiceClient}; use m1_da_light_node_util::config::Config as LightNodeConfig; use maptos_dof_execution::SignedTransaction; -use movement_tracing::telemetry; -use opentelemetry::trace::{FutureExt as _, TraceContextExt as _, Tracer as _}; -use opentelemetry::{Context as OtelContext, KeyValue}; use tokio::sync::mpsc; -use tracing::warn; +use tracing::{info, info_span, warn, Instrument}; use std::ops::ControlFlow; use std::sync::atomic::{self, AtomicU64}; @@ -32,21 +29,17 @@ impl Task { } pub async fn run(mut self) -> anyhow::Result<()> { - let tracer = telemetry::tracer(); loop { let batch_id = LOGGING_UID.fetch_add(1, atomic::Ordering::Relaxed); - let span = tracer - .span_builder("build_batch") - .with_attributes([KeyValue::new("batch_id", batch_id as i64)]) - .start(&tracer); - if let ControlFlow::Break(()) = self - .spawn_write_next_transaction_batch() - .with_context(OtelContext::current_with_span(span)) - .await? + let span = + info_span!(target: "movement_telemetry", "write_batch", batch_id = %batch_id); + if let ControlFlow::Break(()) = + self.spawn_write_next_transaction_batch().instrument(span).await? { - break Ok(()); + break; } } + Ok(()) } /// Constructs a batch of transactions then spawns the write request to the DA in the background. @@ -79,17 +72,12 @@ impl Task { { Ok(transaction) => match transaction { Some(transaction) => { - let otel_cx = OtelContext::current(); - otel_cx.span().add_event( + info!( + target: "movement_telemetry", + tx_hash = %transaction.committed_hash(), + sender = %transaction.sender(), + sequence_number = transaction.sequence_number(), "received_transaction", - vec![ - KeyValue::new("tx_hash", transaction.committed_hash().to_string()), - KeyValue::new("sender", transaction.sender().to_string()), - KeyValue::new( - "sequence_number", - transaction.sequence_number() as i64, - ), - ], ); let serialized_aptos_transaction = serde_json::to_vec(&transaction)?; let movement_transaction = movement_types::transaction::Transaction::new( @@ -111,23 +99,19 @@ impl Task { } if transactions.len() > 0 { - let otel_cx = OtelContext::current(); - otel_cx.span().add_event( - "built_batch_write", - vec![KeyValue::new("transaction_count", transactions.len() as i64)], + info!( + target: "movement_telemetry", + transaction_count = transactions.len(), + "built_batch_write" ); let batch_write = BatchWriteRequest { blobs: transactions }; // spawn the actual batch write request in the background - let write_span = telemetry::tracer().start_with_context("batch_write", &otel_cx); let mut da_light_node_client = self.da_light_node_client.clone(); - tokio::spawn( - async move { - if let Err(e) = da_light_node_client.batch_write(batch_write).await { - warn!("failed to write batch to DA: {:?}", e); - } + tokio::spawn(async move { + if let Err(e) = da_light_node_client.batch_write(batch_write).await { + warn!("failed to write batch to DA: {:?}", e); } - .with_context(otel_cx.with_span(write_span)), - ); + }); } Ok(Continue(())) diff --git a/protocol-units/da/m1/light-node/src/main.rs b/protocol-units/da/m1/light-node/src/main.rs index 7a57a4081..3bd572087 100644 --- a/protocol-units/da/m1/light-node/src/main.rs +++ b/protocol-units/da/m1/light-node/src/main.rs @@ -2,12 +2,11 @@ use m1_da_light_node::v1::{LightNodeV1, Manager}; #[tokio::main] async fn main() -> Result<(), Box> { - movement_tracing::init_tracing_subscriber(); - let tracing_config = movement_tracing::telemetry::Config::from_env()?; - movement_tracing::telemetry::init_tracer_provider( - env!("CARGO_PKG_NAME"), + let tracing_config = movement_tracing::Config::from_env()?; + movement_tracing::init_tracing_subscriber( + env!("CARGO_BIN_NAME"), env!("CARGO_PKG_VERSION"), - tracing_config, + &tracing_config, )?; let dot_movement = dot_movement::DotMovement::try_from_env()?; diff --git a/protocol-units/execution/opt-executor/Cargo.toml b/protocol-units/execution/opt-executor/Cargo.toml index 155c317fa..54c1bb0ab 100644 --- a/protocol-units/execution/opt-executor/Cargo.toml +++ b/protocol-units/execution/opt-executor/Cargo.toml @@ -28,7 +28,6 @@ poem = { workspace = true } poem-openapi = { workspace = true } derive_more = { workspace = true, default-features = true } lazy_static = "1.4.0" -opentelemetry = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } rand = { workspace = true } @@ -67,7 +66,6 @@ aptos-protos = { workspace = true } aptos-logger = { workspace = true } maptos-execution-util = { workspace = true } movement-rest = { workspace = true } -movement-tracing = { workspace = true } movement-types = { workspace = true } [dev-dependencies] diff --git a/protocol-units/execution/opt-executor/src/transaction_pipe.rs b/protocol-units/execution/opt-executor/src/transaction_pipe.rs index 7fdf0fd33..03abf4473 100644 --- a/protocol-units/execution/opt-executor/src/transaction_pipe.rs +++ b/protocol-units/execution/opt-executor/src/transaction_pipe.rs @@ -1,5 +1,4 @@ //! Task processing incoming transactions for the opt API. -use movement_tracing::telemetry; use aptos_config::config::NodeConfig; use aptos_mempool::core_mempool::CoreMempool; @@ -13,11 +12,9 @@ use aptos_vm_validator::vm_validator::{self, TransactionValidation, VMValidator} use futures::channel::mpsc as futures_mpsc; use futures::StreamExt; -use opentelemetry::trace::{FutureExt as _, TraceContextExt as _, Tracer as _}; -use opentelemetry::{Context as OtelContext, KeyValue}; use thiserror::Error; use tokio::sync::mpsc; -use tracing::{debug, warn}; +use tracing::{debug, info, info_span, warn, Instrument}; use std::sync::{atomic::AtomicU64, Arc}; use std::time::{Duration, Instant}; @@ -91,19 +88,14 @@ impl TransactionPipe { if let Some(request) = next { match request { MempoolClientRequest::SubmitTransaction(transaction, callback) => { - let tracer = telemetry::tracer(); - let span = tracer - .span_builder("submit_transaction") - .with_attributes([ - KeyValue::new("tx_hash", transaction.committed_hash().to_string()), - KeyValue::new("sender", transaction.sender().to_string()), - KeyValue::new("sequence_number", transaction.sequence_number() as i64), - ]) - .start(&tracer); - let status = self - .submit_transaction(transaction) - .with_context(OtelContext::current_with_span(span)) - .await?; + let span = info_span!( + target: "movement_telemetry", + "submit_transaction", + tx_hash = %transaction.committed_hash(), + sender = %transaction.sender(), + sequence_number = transaction.sequence_number(), + ); + let status = self.submit_transaction(transaction).instrument(span).await?; callback.send(Ok(status)).unwrap_or_else(|_| { debug!("SubmitTransaction request canceled"); }); @@ -131,14 +123,16 @@ impl TransactionPipe { ) -> Result { // For now, we are going to consider a transaction in flight until it exits the mempool and is sent to the DA as is indicated by WriteBatch. let in_flight = self.transactions_in_flight.load(std::sync::atomic::Ordering::Relaxed); - let otel_cx = OtelContext::current(); - let otel_span = otel_cx.span(); - otel_span.add_event( - "transactions_in_flight", - vec![KeyValue::new("in_flight", in_flight as i64)], + info!( + target: "movement_telemetry", + in_flight = %in_flight, + "transactions_in_flight" ); if in_flight > self.in_flight_limit { - otel_span.add_event("shedding_load", vec![]); + info!( + target: "movement_telemetry", + "shedding_load" + ); let status = MempoolStatus::new(MempoolStatusCode::MempoolIsFull); return Ok((status, None)); } diff --git a/util/tracing/Cargo.toml b/util/tracing/Cargo.toml index 42b38fb7e..6c9fad7f8 100644 --- a/util/tracing/Cargo.toml +++ b/util/tracing/Cargo.toml @@ -12,12 +12,13 @@ rust-version.workspace = true [dependencies] anyhow = { workspace = true } -tracing-subscriber = { workspace = true } opentelemetry = { workspace = true } opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } opentelemetry-otlp = { workspace = true } opentelemetry-semantic-conventions = { workspace = true } +tracing = { workspace = true } tracing-opentelemetry = { workspace = true } +tracing-subscriber = { workspace = true } #console-subscriber = { workspace = true } [lints] diff --git a/util/tracing/src/config.rs b/util/tracing/src/config.rs new file mode 100644 index 000000000..561bcb991 --- /dev/null +++ b/util/tracing/src/config.rs @@ -0,0 +1,29 @@ +use anyhow::anyhow; +use std::env; + +const OTLP_TRACING_ENV: &str = "MOVEMENT_OTLP"; + +/// Options for tracing configuration. +#[derive(Debug, Default)] +pub struct Config { + /// URL of the OpenTelemetry collector endpoint using the OTLP gRPC protocol. + /// If the value is `None`, telemetry is not exported. + pub otlp_grpc_url: Option, +} + +impl Config { + /// Get the tracing configuration from well-known environment variables. + pub fn from_env() -> Result { + let otlp_grpc_url = match env::var(OTLP_TRACING_ENV) { + Ok(url) => Some(url), + Err(env::VarError::NotPresent) => None, + Err(env::VarError::NotUnicode(s)) => { + return Err(anyhow!( + "value of environment variable {OTLP_TRACING_ENV} is not valid UTF-8: {}", + s.to_string_lossy() + )); + } + }; + Ok(Self { otlp_grpc_url }) + } +} diff --git a/util/tracing/src/lib.rs b/util/tracing/src/lib.rs index 2e2ea0613..a39f69333 100644 --- a/util/tracing/src/lib.rs +++ b/util/tracing/src/lib.rs @@ -1,4 +1,13 @@ -pub mod telemetry; +//! Tracing setup for Movement services. +//! +//! Exporting of tracing data via [OpenTelemetry] is optionally supported +//! by setting "movement_telemetry" as the target in tracing spans and events. +//! +//! [OpenTelemetry]: https://opentelemetry.io/ + +mod config; +mod telemetry; mod tracing; +pub use config::Config; pub use tracing::init_tracing_subscriber; diff --git a/util/tracing/src/telemetry.rs b/util/tracing/src/telemetry.rs index 921fd3382..46674dcf1 100644 --- a/util/tracing/src/telemetry.rs +++ b/util/tracing/src/telemetry.rs @@ -6,54 +6,52 @@ //! //! [tracing-opentelemetry#159]: https://github.com/tokio-rs/tracing-opentelemetry/issues/159 -use anyhow::anyhow; -use opentelemetry::global::{self, BoxedTracer}; -use opentelemetry::trace::noop::NoopTracerProvider; -use opentelemetry::KeyValue; +use crate::Config; + +use opentelemetry::{trace::TracerProvider as _, KeyValue}; use opentelemetry_otlp::WithExportConfig as _; -use opentelemetry_sdk::{runtime, trace::Config as TraceConfig, Resource}; +use opentelemetry_sdk::trace::{Config as TraceConfig, TracerProvider}; +use opentelemetry_sdk::{runtime, Resource}; use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION}; +use tracing::{error, Level, Subscriber}; +use tracing_opentelemetry::OpenTelemetryLayer; +use tracing_subscriber::filter; +use tracing_subscriber::prelude::*; +use tracing_subscriber::registry::LookupSpan; -use std::env; - -const OTLP_TRACING_ENV: &str = "MOVEMENT_OTLP"; - -/// Options for telemetry configuration. -#[derive(Debug, Default)] -pub struct Config { - /// URL of the collector endpoint using the OTLP gRPC protocol. - pub otlp_grpc_url: Option, -} +/// The scope guard object for the background tasks of the tracing subsystem. +/// +/// This object needs to be kept alive for the duration of the program. +#[derive(Debug)] +pub struct ScopeGuard(Option); -impl Config { - /// Get the tracing configuration from well-known environment variables. - pub fn from_env() -> Result { - let otlp_grpc_url = match env::var(OTLP_TRACING_ENV) { - Ok(url) => Some(url), - Err(env::VarError::NotPresent) => None, - Err(env::VarError::NotUnicode(s)) => { - return Err(anyhow!( - "value of environment variable {OTLP_TRACING_ENV} is not valid UTF-8: {}", - s.to_string_lossy() - )); +impl Drop for ScopeGuard { + fn drop(&mut self) { + if let Some(tracer_provider) = &self.0 { + // Make sure all batched traces are exported. + if let Err(e) = tracer_provider.shutdown() { + error!("OpenTelemetry tracer provider shutdown failed: {e}"); } - }; - Ok(Self { otlp_grpc_url }) + } } } -/// Global initialization of the OpenTelemetry tracer provider. +/// Adds an optional OpenTelemetry tracing layer to the provided subscriber. /// /// This function should be called at the start of the program before any /// threads are able to use OpenTelemetry tracers. The function will panic /// if not called within a Tokio runtime. -pub fn init_tracer_provider( +pub(crate) fn init_tracing_layer( + subscriber: S, service_name: &'static str, service_version: &'static str, - config: Config, -) -> Result<(), anyhow::Error> { - if let Some(endpoint) = config.otlp_grpc_url { - dbg!(&endpoint); + config: &Config, +) -> Result<(ScopeGuard, impl Subscriber), anyhow::Error> +where + S: Subscriber, + for<'span> S: LookupSpan<'span>, +{ + let (tracer_provider, layer) = if let Some(endpoint) = &config.otlp_grpc_url { let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(endpoint); let provider = opentelemetry_otlp::new_pipeline() .tracing() @@ -63,15 +61,11 @@ pub fn init_tracer_provider( KeyValue::new(SERVICE_VERSION, service_version), ]))) .install_batch(runtime::Tokio)?; - dbg!(&provider); - global::set_tracer_provider(provider); + let layer = OpenTelemetryLayer::new(provider.tracer("movement")) + .with_filter(filter::Targets::new().with_target("movement_telemetry", Level::INFO)); + (Some(provider), Some(layer)) } else { - global::set_tracer_provider(NoopTracerProvider::new()); - } - Ok(()) -} - -/// Get the tracer configured for the process. -pub fn tracer() -> BoxedTracer { - global::tracer("movement") + (None, None) + }; + Ok((ScopeGuard(tracer_provider), subscriber.with(layer))) } diff --git a/util/tracing/src/tracing.rs b/util/tracing/src/tracing.rs index 0c6514e30..88c333d61 100644 --- a/util/tracing/src/tracing.rs +++ b/util/tracing/src/tracing.rs @@ -1,11 +1,23 @@ -use tracing_subscriber::filter::{EnvFilter, LevelFilter}; +use crate::telemetry::{self, ScopeGuard}; +use crate::Config; +use tracing_subscriber::prelude::*; +use tracing_subscriber::{fmt, EnvFilter, Registry}; /// Sets up the tracing subscribers for a Movement process. This should be /// called at the beginning of a process' `main` function. -pub fn init_tracing_subscriber() { +/// +/// If successful, returns a guard object that should be dropped at the end +/// of the process' `main` function scope. +pub fn init_tracing_subscriber( + service_name: &'static str, + service_version: &'static str, + config: &Config, +) -> Result { // TODO: compose console_subscriber as a layer - let env_filter = EnvFilter::builder() - .with_default_directive(LevelFilter::INFO.into()) - .from_env_lossy(); - tracing_subscriber::fmt().with_env_filter(env_filter).init(); + let fmt_layer = fmt::layer().with_filter(EnvFilter::from_default_env()); + let registry = Registry::default().with(fmt_layer); + let (scope_guard, subscriber) = + telemetry::init_tracing_layer(registry, service_name, service_version, config)?; + subscriber.init(); + Ok(scope_guard) }