Skip to content

Commit

Permalink
Revamped OpenTelemetry layer
Browse files Browse the repository at this point in the history
  • Loading branch information
mzabaluev committed Oct 15, 2024
1 parent 69c5323 commit 2031bcd
Show file tree
Hide file tree
Showing 14 changed files with 154 additions and 144 deletions.
4 changes: 1 addition & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,12 @@ pub async fn basic_coin_transfers(

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
movement_tracing::init_tracing_subscriber();
let tracing_config = movement_tracing::Config::from_env()?;
movement_tracing::init_tracing_subscriber(
env!("CARGO_BIN_NAME"),
env!("CARGO_PKG_VERSION"),
&tracing_config,
)?;

// get the lead dot movement from the environment
let dot_movement = DotMovement::try_from_env()?;
Expand Down
1 change: 0 additions & 1 deletion networks/suzuka/suzuka-full-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ tonic = { workspace = true }
movement-types = { workspace = true }
movement-rest = { workspace = true }
movement-tracing = { workspace = true }
opentelemetry = { workspace = true }
suzuka-config = { workspace = true }
dot-movement = { workspace = true }
godfig = { workspace = true }
Expand Down
9 changes: 4 additions & 5 deletions networks/suzuka/suzuka-full-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ use std::process::ExitCode;

#[tokio::main]
async fn main() -> Result<ExitCode, anyhow::Error> {
movement_tracing::init_tracing_subscriber();
let tracing_config = movement_tracing::telemetry::Config::from_env()?;
movement_tracing::telemetry::init_tracer_provider(
env!("CARGO_PKG_NAME"),
let tracing_config = movement_tracing::Config::from_env()?;
movement_tracing::init_tracing_subscriber(
env!("CARGO_BIN_NAME"),
env!("CARGO_PKG_VERSION"),
tracing_config,
&tracing_config,
)?;

// get the config file
Expand Down
21 changes: 5 additions & 16 deletions networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,14 @@ use maptos_dof_execution::{
SignatureVerifiedTransaction, SignedTransaction, Transaction,
};
use mcr_settlement_manager::{CommitmentEventStream, McrSettlementManagerOperations};
use movement_tracing::telemetry;
use movement_types::block::{Block, BlockCommitment, BlockCommitmentEvent};

use anyhow::Context as _;
use anyhow::Context;
use futures::{future::Either, stream};
use opentelemetry::trace::{FutureExt as _, TraceContextExt as _, Tracer as _};
use opentelemetry::{Context as OtelContext, KeyValue};
use suzuka_config::execution_extension;
use tokio::select;
use tokio_stream::{Stream, StreamExt};
use tracing::{debug, error, info};
use tracing::{debug, error, info, info_span, Instrument};

pub struct Task<E, S> {
executor: E,
Expand Down Expand Up @@ -143,17 +140,9 @@ where

// get the transactions count before the block is consumed
let transactions_count = block.transactions().len();

// execute the block
let tracer = telemetry::tracer();
let span = tracer
.span_builder("execute_block")
.with_attributes([KeyValue::new("id", block_id.to_string())])
.start(&tracer);
let commitment = self
.execute_block_with_retries(block, block_timestamp)
.with_context(OtelContext::current_with_span(span))
.await?;
let span = info_span!(target: "movement_telemetry", "execute_block", id = %block_id);
let commitment =
self.execute_block_with_retries(block, block_timestamp).instrument(span).await?;

// decrement the number of transactions in flight on the executor
self.executor.decrement_transactions_in_flight(transactions_count as u64);
Expand Down
56 changes: 20 additions & 36 deletions networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@
use m1_da_light_node_client::{BatchWriteRequest, BlobWrite, LightNodeServiceClient};
use m1_da_light_node_util::config::Config as LightNodeConfig;
use maptos_dof_execution::SignedTransaction;
use movement_tracing::telemetry;

use opentelemetry::trace::{FutureExt as _, TraceContextExt as _, Tracer as _};
use opentelemetry::{Context as OtelContext, KeyValue};
use tokio::sync::mpsc;
use tracing::warn;
use tracing::{info, info_span, warn, Instrument};

use std::ops::ControlFlow;
use std::sync::atomic::{self, AtomicU64};
Expand All @@ -32,21 +29,17 @@ impl Task {
}

pub async fn run(mut self) -> anyhow::Result<()> {
let tracer = telemetry::tracer();
loop {
let batch_id = LOGGING_UID.fetch_add(1, atomic::Ordering::Relaxed);
let span = tracer
.span_builder("build_batch")
.with_attributes([KeyValue::new("batch_id", batch_id as i64)])
.start(&tracer);
if let ControlFlow::Break(()) = self
.spawn_write_next_transaction_batch()
.with_context(OtelContext::current_with_span(span))
.await?
let span =
info_span!(target: "movement_telemetry", "write_batch", batch_id = %batch_id);
if let ControlFlow::Break(()) =
self.spawn_write_next_transaction_batch().instrument(span).await?
{
break Ok(());
break;
}
}
Ok(())
}

/// Constructs a batch of transactions then spawns the write request to the DA in the background.
Expand Down Expand Up @@ -79,17 +72,12 @@ impl Task {
{
Ok(transaction) => match transaction {
Some(transaction) => {
let otel_cx = OtelContext::current();
otel_cx.span().add_event(
info!(
target: "movement_telemetry",
tx_hash = %transaction.committed_hash(),
sender = %transaction.sender(),
sequence_number = transaction.sequence_number(),
"received_transaction",
vec![
KeyValue::new("tx_hash", transaction.committed_hash().to_string()),
KeyValue::new("sender", transaction.sender().to_string()),
KeyValue::new(
"sequence_number",
transaction.sequence_number() as i64,
),
],
);
let serialized_aptos_transaction = serde_json::to_vec(&transaction)?;
let movement_transaction = movement_types::transaction::Transaction::new(
Expand All @@ -111,23 +99,19 @@ impl Task {
}

if transactions.len() > 0 {
let otel_cx = OtelContext::current();
otel_cx.span().add_event(
"built_batch_write",
vec![KeyValue::new("transaction_count", transactions.len() as i64)],
info!(
target: "movement_telemetry",
transaction_count = transactions.len(),
"built_batch_write"
);
let batch_write = BatchWriteRequest { blobs: transactions };
// spawn the actual batch write request in the background
let write_span = telemetry::tracer().start_with_context("batch_write", &otel_cx);
let mut da_light_node_client = self.da_light_node_client.clone();
tokio::spawn(
async move {
if let Err(e) = da_light_node_client.batch_write(batch_write).await {
warn!("failed to write batch to DA: {:?}", e);
}
tokio::spawn(async move {
if let Err(e) = da_light_node_client.batch_write(batch_write).await {
warn!("failed to write batch to DA: {:?}", e);
}
.with_context(otel_cx.with_span(write_span)),
);
});
}

Ok(Continue(()))
Expand Down
9 changes: 4 additions & 5 deletions protocol-units/da/m1/light-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ use m1_da_light_node::v1::{LightNodeV1, Manager};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
movement_tracing::init_tracing_subscriber();
let tracing_config = movement_tracing::telemetry::Config::from_env()?;
movement_tracing::telemetry::init_tracer_provider(
env!("CARGO_PKG_NAME"),
let tracing_config = movement_tracing::Config::from_env()?;
movement_tracing::init_tracing_subscriber(
env!("CARGO_BIN_NAME"),
env!("CARGO_PKG_VERSION"),
tracing_config,
&tracing_config,
)?;

let dot_movement = dot_movement::DotMovement::try_from_env()?;
Expand Down
2 changes: 0 additions & 2 deletions protocol-units/execution/opt-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ poem = { workspace = true }
poem-openapi = { workspace = true }
derive_more = { workspace = true, default-features = true }
lazy_static = "1.4.0"
opentelemetry = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
rand = { workspace = true }
Expand Down Expand Up @@ -67,7 +66,6 @@ aptos-protos = { workspace = true }
aptos-logger = { workspace = true }
maptos-execution-util = { workspace = true }
movement-rest = { workspace = true }
movement-tracing = { workspace = true }
movement-types = { workspace = true }

[dev-dependencies]
Expand Down
40 changes: 17 additions & 23 deletions protocol-units/execution/opt-executor/src/transaction_pipe.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//! Task processing incoming transactions for the opt API.
use movement_tracing::telemetry;
use aptos_config::config::NodeConfig;
use aptos_mempool::core_mempool::CoreMempool;
Expand All @@ -13,11 +12,9 @@ use aptos_vm_validator::vm_validator::{self, TransactionValidation, VMValidator}

use futures::channel::mpsc as futures_mpsc;
use futures::StreamExt;
use opentelemetry::trace::{FutureExt as _, TraceContextExt as _, Tracer as _};
use opentelemetry::{Context as OtelContext, KeyValue};
use thiserror::Error;
use tokio::sync::mpsc;
use tracing::{debug, warn};
use tracing::{debug, info, info_span, warn, Instrument};

use std::sync::{atomic::AtomicU64, Arc};
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -91,19 +88,14 @@ impl TransactionPipe {
if let Some(request) = next {
match request {
MempoolClientRequest::SubmitTransaction(transaction, callback) => {
let tracer = telemetry::tracer();
let span = tracer
.span_builder("submit_transaction")
.with_attributes([
KeyValue::new("tx_hash", transaction.committed_hash().to_string()),
KeyValue::new("sender", transaction.sender().to_string()),
KeyValue::new("sequence_number", transaction.sequence_number() as i64),
])
.start(&tracer);
let status = self
.submit_transaction(transaction)
.with_context(OtelContext::current_with_span(span))
.await?;
let span = info_span!(
target: "movement_telemetry",
"submit_transaction",
tx_hash = %transaction.committed_hash(),
sender = %transaction.sender(),
sequence_number = transaction.sequence_number(),
);
let status = self.submit_transaction(transaction).instrument(span).await?;
callback.send(Ok(status)).unwrap_or_else(|_| {
debug!("SubmitTransaction request canceled");
});
Expand Down Expand Up @@ -131,14 +123,16 @@ impl TransactionPipe {
) -> Result<SubmissionStatus, Error> {
// For now, we are going to consider a transaction in flight until it exits the mempool and is sent to the DA as is indicated by WriteBatch.
let in_flight = self.transactions_in_flight.load(std::sync::atomic::Ordering::Relaxed);
let otel_cx = OtelContext::current();
let otel_span = otel_cx.span();
otel_span.add_event(
"transactions_in_flight",
vec![KeyValue::new("in_flight", in_flight as i64)],
info!(
target: "movement_telemetry",
in_flight = %in_flight,
"transactions_in_flight"
);
if in_flight > self.in_flight_limit {
otel_span.add_event("shedding_load", vec![]);
info!(
target: "movement_telemetry",
"shedding_load"
);
let status = MempoolStatus::new(MempoolStatusCode::MempoolIsFull);
return Ok((status, None));
}
Expand Down
3 changes: 2 additions & 1 deletion util/tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ rust-version.workspace = true

[dependencies]
anyhow = { workspace = true }
tracing-subscriber = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
opentelemetry-otlp = { workspace = true }
opentelemetry-semantic-conventions = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true }
#console-subscriber = { workspace = true }

[lints]
Expand Down
29 changes: 29 additions & 0 deletions util/tracing/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use anyhow::anyhow;
use std::env;

const OTLP_TRACING_ENV: &str = "MOVEMENT_OTLP";

/// Options for tracing configuration.
#[derive(Debug, Default)]
pub struct Config {
/// URL of the OpenTelemetry collector endpoint using the OTLP gRPC protocol.
/// If the value is `None`, telemetry is not exported.
pub otlp_grpc_url: Option<String>,
}

impl Config {
/// Get the tracing configuration from well-known environment variables.
pub fn from_env() -> Result<Self, anyhow::Error> {
let otlp_grpc_url = match env::var(OTLP_TRACING_ENV) {
Ok(url) => Some(url),
Err(env::VarError::NotPresent) => None,
Err(env::VarError::NotUnicode(s)) => {
return Err(anyhow!(
"value of environment variable {OTLP_TRACING_ENV} is not valid UTF-8: {}",
s.to_string_lossy()
));
}
};
Ok(Self { otlp_grpc_url })
}
}
11 changes: 10 additions & 1 deletion util/tracing/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
pub mod telemetry;
//! Tracing setup for Movement services.
//!
//! Exporting of tracing data via [OpenTelemetry] is optionally supported
//! by setting "movement_telemetry" as the target in tracing spans and events.
//!
//! [OpenTelemetry]: https://opentelemetry.io/
mod config;
mod telemetry;
mod tracing;

pub use config::Config;
pub use tracing::init_tracing_subscriber;
Loading

0 comments on commit 2031bcd

Please sign in to comment.