diff --git a/Cargo.lock b/Cargo.lock index a2c1cf5ca..f1b15f598 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2966,7 +2966,7 @@ dependencies = [ "aptos-metrics-core", "cfg-if", "once_cell", - "procfs", + "procfs 0.14.2", "prometheus", "sysinfo", ] @@ -12278,7 +12278,12 @@ dependencies = [ name = "movement-tracing" version = "0.0.2" dependencies = [ - "tracing-appender", + "opentelemetry", + "opentelemetry-prometheus", + "opentelemetry_sdk", + "prometheus", + "tokio", + "tracing", "tracing-subscriber 0.3.18", ] @@ -12809,6 +12814,55 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "236e667b670a5cdf90c258f5a55794ec5ac5027e960c224bff8367a59e1e6426" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.7", + "tracing", +] + +[[package]] +name = "opentelemetry-prometheus" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "765a76ba13ec77043903322f85dc5434d7d01a37e75536d0f871ed7b9b5bbf0d" +dependencies = [ + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "prometheus", + "protobuf", + "tracing", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84dfad6042089c7fc1f6118b7040dc2eb4ab520abbf410b79dc481032af39570" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "opentelemetry", + "percent-encoding", + "rand 0.8.5", + "serde_json", + "thiserror 2.0.7", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -14029,6 +14083,29 @@ dependencies = [ "rustix 0.36.17", ] +[[package]] +name = "procfs" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "731e0d9356b0c25f16f33b5be79b1c57b562f141ebfcdb0ad8ac2c13a24293b4" +dependencies = [ + "bitflags 2.6.0", + "hex", + "lazy_static", + "procfs-core", + "rustix 0.38.40", +] + +[[package]] +name = "procfs-core" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d3554923a69f4ce04c4a754260c338f505ce22642d3830e049a399fc2059a29" +dependencies = [ + "bitflags 2.6.0", + "hex", +] + [[package]] name = "prometheus" version = "0.13.4" @@ -14038,8 +14115,11 @@ dependencies = [ "cfg-if", "fnv", "lazy_static", + "libc", "memchr", "parking_lot 0.12.3", + "procfs 0.16.0", + "protobuf", "thiserror 1.0.69", ] @@ -17436,18 +17516,6 @@ dependencies = [ "tracing-core", ] -[[package]] -name = "tracing-appender" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" -dependencies = [ - "crossbeam-channel", - "thiserror 1.0.69", - "time", - "tracing-subscriber 0.3.18", -] - [[package]] name = "tracing-attributes" version = "0.1.27" diff --git a/docker/compose/movement-full-node/docker-compose.telemetry.yml b/docker/compose/movement-full-node/docker-compose.telemetry.yml new file mode 100644 index 000000000..e1e85c9eb --- /dev/null +++ b/docker/compose/movement-full-node/docker-compose.telemetry.yml @@ -0,0 +1,78 @@ +services: + prometheus: + image: prom/prometheus:v2.49.1 + container_name: movement-prometheus + command: + - '--config.file=/etc/prometheus/prometheus.yml' + - '--storage.tsdb.path=/prometheus' + - '--web.console.libraries=/usr/share/prometheus/console_libraries' + - '--web.console.templates=/usr/share/prometheus/consoles' + ports: + - "9090:9090" + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml + - prometheus_data:/prometheus + healthcheck: + test: wget --no-verbose --tries=1 --spider http://localhost:9090/-/healthy || exit 1 + interval: 5s + timeout: 3s + retries: 3 + + grafana: + image: grafana/grafana:10.2.3 + container_name: movement-grafana + ports: + - "3000:3000" + volumes: + - ./grafana/provisioning:/etc/grafana/provisioning + - ./grafana/dashboards:/var/lib/grafana/dashboards + environment: + - GF_SECURITY_ADMIN_PASSWORD=admin + - GF_USERS_ALLOW_SIGN_UP=false + depends_on: + - prometheus + + movement-full-node: + environment: + - MOVEMENT_METRICS_ADDR=0.0.0.0:9464 + - MOVEMENT_OTLP=http://otel-collector:4317 + ports: + - "9464:9464" + depends_on: + - prometheus: + condition: service_healthy + - otel-collector: + condition: service_healthy + + movement-celestia-da-light-node: + environment: + - MOVEMENT_METRICS_ADDR=0.0.0.0:9464 + - MOVEMENT_OTLP=http://otel-collector:4317 + ports: + - "9465:9464" + depends_on: + - prometheus: + condition: service_healthy + - otel-collector: + condition: service_healthy + + otel-collector: + image: otel/opentelemetry-collector:0.96.0 + container_name: movement-otel-collector + command: ["--config=/etc/otel-collector-config.yaml"] + volumes: + - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml + ports: + - "4317:4317" # OTLP gRPC + - "4318:4318" # OTLP HTTP + - "8888:8888" # Prometheus metrics exposed by the collector + - "8889:8889" # Prometheus exporter metrics + - "13133:13133" # Health check extension + healthcheck: + test: wget --no-verbose --tries=1 --spider http://localhost:13133 || exit 1 + interval: 5s + timeout: 3s + retries: 3 + +volumes: + prometheus_data: \ No newline at end of file diff --git a/docker/compose/movement-full-node/otel-collector-config.yaml b/docker/compose/movement-full-node/otel-collector-config.yaml new file mode 100644 index 000000000..c97867394 --- /dev/null +++ b/docker/compose/movement-full-node/otel-collector-config.yaml @@ -0,0 +1,37 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + +processors: + batch: + timeout: 1s + send_batch_size: 1024 + +exporters: + prometheus: + endpoint: "0.0.0.0:8889" + namespace: "movement" + const_labels: + label1: value1 + logging: + loglevel: debug + +extensions: + health_check: + endpoint: 0.0.0.0:13133 + +service: + extensions: [health_check] + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [logging] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [prometheus, logging] \ No newline at end of file diff --git a/docker/compose/movement-full-node/prometheus.yml b/docker/compose/movement-full-node/prometheus.yml new file mode 100644 index 000000000..a5abe3baf --- /dev/null +++ b/docker/compose/movement-full-node/prometheus.yml @@ -0,0 +1,21 @@ +global: + scrape_interval: 15s + evaluation_interval: 15s + +scrape_configs: + - job_name: 'movement-full-node' + static_configs: + - targets: ['movement-full-node:9464'] + metrics_path: / + scheme: http + + - job_name: 'movement-celestia-da-light-node' + static_configs: + - targets: ['movement-celestia-da-light-node:9464'] + metrics_path: / + scheme: http + + - job_name: 'otel-collector' + static_configs: + - targets: ['otel-collector:8889'] + metrics_path: /metrics \ No newline at end of file diff --git a/networks/movement/movement-client/src/bin/e2e/followers_consistent.rs b/networks/movement/movement-client/src/bin/e2e/followers_consistent.rs index 492982669..365a2fc2c 100644 --- a/networks/movement/movement-client/src/bin/e2e/followers_consistent.rs +++ b/networks/movement/movement-client/src/bin/e2e/followers_consistent.rs @@ -12,8 +12,6 @@ use tokio::sync::RwLock; use tracing::info; use url::Url; -const TIMING_LOG_ENV: &str = "SUZUKA_TIMING_LOG"; - pub fn get_movement_config( dot_movement: &DotMovement, ) -> Result { @@ -393,22 +391,14 @@ pub async fn basic_coin_transfers( #[tokio::main] async fn main() -> Result<(), anyhow::Error> { - let tracing_config = movement_tracing::Config { - timing_log_path: std::env::var_os(TIMING_LOG_ENV).map(Into::into), - }; + // Initialize metrics with a unique port for the test + let tracing_config = movement_tracing::Config::with_metrics_addr("0.0.0.0:9466"); let _guard = movement_tracing::init_tracing_subscriber(tracing_config); - // get the lead dot movement from the environment let dot_movement = DotMovement::try_from_env()?; + let config = get_movement_config(&dot_movement)?; - // get the follower count from the first argument - let follower_count = std::env::args() - .nth(1) - .ok_or_else(|| anyhow::anyhow!("Expected follower count as first argument"))?; - let follower_count = u8::from_str(follower_count.as_str())?; - - // run basic coin transfers - basic_coin_transfers(&dot_movement, follower_count).await?; + basic_coin_transfers(&dot_movement, 2).await?; Ok(()) } diff --git a/networks/movement/movement-full-node/src/main.rs b/networks/movement/movement-full-node/src/main.rs index 97129dfca..e5c373454 100644 --- a/networks/movement/movement-full-node/src/main.rs +++ b/networks/movement/movement-full-node/src/main.rs @@ -2,18 +2,15 @@ use clap::*; use movement_full_node::MovementFullNode; -const TIMING_LOG_ENV: &str = "SUZUKA_TIMING_LOG"; -use std::env; #[tokio::main] async fn main() -> Result<(), anyhow::Error> { - let tracing_config = - movement_tracing::Config { timing_log_path: env::var_os(TIMING_LOG_ENV).map(Into::into) }; - let _guard = movement_tracing::init_tracing_subscriber(tracing_config); + let tracing_config = movement_tracing::Config::with_metrics_addr("0.0.0.0:9464"); + let _guard = movement_tracing::init_tracing_subscriber(tracing_config); - let suzuka_util = MovementFullNode::parse(); + let suzuka_util = MovementFullNode::parse(); + suzuka_util.execute().await?; - suzuka_util.execute().await?; - - Ok(()) + Ok(()) } + diff --git a/process-compose/movement-full-node/process-compose.telemetry.yml b/process-compose/movement-full-node/process-compose.telemetry.yml new file mode 100644 index 000000000..1c6e7a921 --- /dev/null +++ b/process-compose/movement-full-node/process-compose.telemetry.yml @@ -0,0 +1,33 @@ +version: "3" + +environment: + +processes: + prometheus: + is_daemon: true + command: | + docker run -d --rm --name movement-prometheus \ + -p 9090:9090 \ + -v ${PWD}/docker/compose/movement-full-node/prometheus.yml:/etc/prometheus/prometheus.yml \ + prom/prometheus:v2.49.1 + shutdown: + command: | + docker stop movement-prometheus + readiness_probe: + initial_delay_seconds: 3 + exec: + command: wget --no-verbose --tries=1 --spider http://localhost:9090/-/healthy || exit 1 + + movement-full-node: + depends_on: + prometheus: + condition: process_started + environment: + - MOVEMENT_METRICS_ADDR=0.0.0.0:9464 + + movement-celestia-da-light-node: + depends_on: + prometheus: + condition: process_started + environment: + - MOVEMENT_METRICS_ADDR=0.0.0.0:9465 \ No newline at end of file diff --git a/protocol-units/da/movement/protocol/light-node/src/main.rs b/protocol-units/da/movement/protocol/light-node/src/main.rs index 309df3e47..c5f9c5c0d 100644 --- a/protocol-units/da/movement/protocol/light-node/src/main.rs +++ b/protocol-units/da/movement/protocol/light-node/src/main.rs @@ -5,14 +5,9 @@ use movement_da_light_node_verifier::signed::InKnownSignersVerifier; use movement_signer::cryptography::secp256k1::Secp256k1; use movement_signer_loader::LoadedSigner; -use std::env; - -const TIMING_LOG_ENV: &str = "MOVEMENT_DA_LIGHT_NODE_TIMING_LOG"; - #[tokio::main] async fn main() -> Result<(), Box> { - let tracing_config = - movement_tracing::Config { timing_log_path: env::var_os(TIMING_LOG_ENV).map(Into::into) }; + let tracing_config = movement_tracing::Config::with_metrics_addr("0.0.0.0:9464"); let _guard = movement_tracing::init_tracing_subscriber(tracing_config); let dot_movement = dot_movement::DotMovement::try_from_env()?; diff --git a/util/tracing/Cargo.toml b/util/tracing/Cargo.toml index eaf030410..165257782 100644 --- a/util/tracing/Cargo.toml +++ b/util/tracing/Cargo.toml @@ -11,9 +11,13 @@ publish.workspace = true rust-version.workspace = true [dependencies] -tracing-appender = { workspace = true } +tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["json"] } -#console-subscriber = { workspace = true } +opentelemetry = { version = "0.28", features = ["metrics"] } +opentelemetry_sdk = { version = "0.28", features = ["metrics", "rt-tokio"] } +opentelemetry-prometheus = { version = "0.28" } +prometheus = { version = "0.13.4", features = ["process"] } +tokio = { workspace = true, features = ["full"] } [lints] workspace = true diff --git a/util/tracing/src/lib.rs b/util/tracing/src/lib.rs index f018f7e20..5ad3debc5 100644 --- a/util/tracing/src/lib.rs +++ b/util/tracing/src/lib.rs @@ -1,78 +1,125 @@ -use tracing_appender::non_blocking::WorkerGuard as AppenderGuard; -use tracing_subscriber::filter::{self, EnvFilter, LevelFilter}; -use tracing_subscriber::fmt::format::FmtSpan; -use tracing_subscriber::prelude::*; +use opentelemetry::{ + global, + metrics::MeterProvider, + KeyValue, +}; +use opentelemetry_sdk::metrics::SdkMeterProvider; +use prometheus::{Encoder, Registry, TextEncoder}; +use std::{env, net::SocketAddr}; +use tokio::net::TcpListener; +use tracing_subscriber::{ + fmt, + prelude::*, + filter::{EnvFilter, LevelFilter}, + Layer, +}; -use std::{env, fs::File, path::PathBuf}; +const METRICS_ADDR_ENV: &str = "MOVEMENT_METRICS_ADDR"; +const DEFAULT_METRICS_ADDR: &str = "0.0.0.0:9464"; -const TIMING_ENV: &str = "MOVEMENT_TIMING"; +#[derive(Default)] +pub struct Config { + pub metrics_addr: Option, +} -/// The default path name for the timing log file. -/// If the path not specified in [`Config`] and the `MOVEMENT_TIMING` -/// environment variable is set, the log file with this name will be created. -pub const DEFAULT_TIMING_LOG_FILE: &str = "movement-timing.log"; +impl Config { + pub fn new() -> Self { + Self::default() + } -/// A guard for background log appender(s) returned by `init_tracing_subscriber`. -pub struct WorkerGuard { - _drop_me: Option, + pub fn with_metrics_addr(addr: impl Into) -> Self { + Self { + metrics_addr: Some(addr.into()), + } + } } -/// Options for the tracing subscriber. -#[derive(Default)] -pub struct Config { - /// Custom name for the timing log file. - pub timing_log_path: Option, +pub struct TelemetryGuard { + _meter_provider: SdkMeterProvider, + metrics_server: Option>, +} + +pub type WorkerGuard = TelemetryGuard; + +impl Drop for TelemetryGuard { + fn drop(&mut self) { + if let Some(server) = self.metrics_server.take() { + server.abort(); + } + } } -/// Sets up the tracing subscribers for a Movement process. This should be -/// called at the beginning of a process' `main` function. -/// Returns a guard object that should be dropped at the end of the process' -/// `main`` function scope. -/// -/// This function may output encounted errors to the standard error stream, -/// as this is the only facility pub fn init_tracing_subscriber(config: Config) -> WorkerGuard { - // TODO: compose console_subscriber as a layer + tokio::runtime::Runtime::new() + .unwrap() + .block_on(init_telemetry(config)) +} + +pub async fn init_telemetry(config: Config) -> TelemetryGuard { + let registry = Registry::new(); + let exporter = opentelemetry_prometheus::exporter() + .with_registry(registry.clone()) + .build() + .unwrap(); + + let meter_provider = SdkMeterProvider::builder() + .with_reader(exporter) + .build(); + + let meter = meter_provider.meter("movement"); + + let uptime_counter = meter + .u64_counter("movement.uptime.seconds") + .with_description("Service uptime in seconds") + .build(); + + let requests_histogram = meter + .u64_histogram("movement.requests.duration") + .with_description("Request duration in milliseconds") + .build(); + + uptime_counter.add(0, &[KeyValue::new("service", "movement")]); + requests_histogram.record(0, &[KeyValue::new("service", "movement")]); + + global::set_meter_provider(meter_provider.clone()); + let env_filter = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) .from_env_lossy(); - let log_layer = tracing_subscriber::fmt::layer().with_filter(env_filter); - let (timing_layer, timing_writer_guard) = match env::var(TIMING_ENV) { - Err(env::VarError::NotPresent) => { - // Disable timing - (None, None) - } - Ok(timing_directives) => { - let env_filter = EnvFilter::new(timing_directives); - let timing_log_path = config - .timing_log_path - .as_deref() - .unwrap_or_else(|| DEFAULT_TIMING_LOG_FILE.as_ref()); - match File::create(timing_log_path) { - Ok(file) => { - let (writer, guard) = tracing_appender::non_blocking(file); - let layer = tracing_subscriber::fmt::layer() - .with_writer(writer) - .json() - .with_span_events(FmtSpan::CLOSE) - .with_filter(env_filter) - .with_filter(filter::filter_fn(|meta| meta.target() == "movement_timing")); - (Some(layer), Some(guard)) - } - Err(e) => { - eprintln!("can't create `{}`: {}", timing_log_path.display(), e); - (None, None) - } - } - } - Err(e) => { - eprintln!("invalid {TIMING_ENV}: {e}"); - (None, None) - } - }; + let subscriber = tracing_subscriber::registry() + .with(fmt::layer().with_filter(env_filter)); + + tracing::subscriber::set_global_default(subscriber) + .expect("Failed to set tracing subscriber"); - tracing_subscriber::registry().with(log_layer).with(timing_layer).init(); + let metrics_addr = config.metrics_addr + .or_else(|| env::var(METRICS_ADDR_ENV).ok()) + .unwrap_or_else(|| DEFAULT_METRICS_ADDR.to_string()); - WorkerGuard { _drop_me: timing_writer_guard } + let metrics_server = tokio::spawn(serve_metrics(metrics_addr, registry)); + + TelemetryGuard { + _meter_provider: meter_provider, + metrics_server: Some(metrics_server), + } +} + +async fn serve_metrics(addr: String, registry: Registry) { + let addr: SocketAddr = addr.parse().expect("Invalid metrics address"); + let listener = TcpListener::bind(addr).await.expect("Failed to bind metrics server"); + println!("Metrics server listening on {}", addr); + + loop { + if let Ok((mut stream, _)) = listener.accept().await { + let registry = registry.clone(); + let encoder = TextEncoder::new(); + tokio::spawn(async move { + let metrics = registry.gather(); + let mut buffer = vec![]; + encoder.encode(&metrics, &mut buffer).unwrap(); + tokio::io::copy(&mut &buffer[..], &mut stream).await.ok(); + }); + } + } }