Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow user to provide timeline callback #23

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,8 @@ members = [
"tracing-serde-modality-ingest",
"tracing-serde-subscriber",
]

# TODO(AJM): Work around mismatched UUID version
[patch.crates-io.modality-ingest-client]
git = "https://github.com/auxoncorp/modality-sdk"
rev = "4a3b9e3cca08e8225e084f9e8409e818646589ec"
Comment on lines +8 to +12
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, this should no longer be needed.

2 changes: 1 addition & 1 deletion tracing-modality/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ tracing = "0.1"
tracing-core = "0.1"
tracing-subscriber = "0.3"
url = "2"
uuid = { version = "1", features = ["v4"] }
uuid = { version = "1", features = ["v4", "v5"] }

[dev-dependencies]
# used for some examples
Expand Down
35 changes: 19 additions & 16 deletions tracing-modality/examples/monitored_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::{
use std::time::{Duration, Instant};
use std::{fmt, thread};
use tracing_core::Dispatch;
use tracing_modality::blocking::{timeline_id, ModalityLayer, TimelineId};
use tracing_modality::{blocking::ModalityLayer, timeline_id};
use tracing_subscriber::{fmt::Layer, layer::SubscriberExt, Registry};

fn main() {
Expand Down Expand Up @@ -134,13 +134,13 @@ pub struct MessageMetadata {
/// When was this message from?
timestamp: NanosecondsSinceUnixEpoch,
/// Which tracing timeline was this from?
timeline_id: TimelineId,
timeline_id: u64,
/// A correlation nonce for precisely matching
/// the source event related to this message.
nonce: Option<i64>,
}

#[derive(Copy, Clone, PartialEq, Debug)]
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
#[repr(transparent)]
pub struct NanosecondsSinceUnixEpoch(pub u64);
impl NanosecondsSinceUnixEpoch {
Expand Down Expand Up @@ -231,7 +231,7 @@ mod producer {
fn send_measurement(
sample: i8,
consumer_tx: &SyncSender<MeasurementMessage>,
timeline_id: TimelineId,
timeline_id: u64,
nonce: i64,
) {
// The measurement sample value must be in the range [-50, 50]
Expand Down Expand Up @@ -307,17 +307,19 @@ mod consumer {
Ok(msg) => {
if let Some(nonce) = msg.meta.nonce {
tracing::info!(
sample = msg.sample,
interaction.remote_timeline_id = %msg.meta.timeline_id.get_raw(),
interaction.remote_timestamp = msg.meta.timestamp.0,
interaction.remote_nonce=nonce,
"Received measurement message");
sample = msg.sample,
interaction.remote_timeline_id = msg.meta.timeline_id,
interaction.remote_timestamp = msg.meta.timestamp.0,
interaction.remote_nonce = nonce,
"Received measurement message"
);
} else {
tracing::info!(
sample = msg.sample,
interaction.remote_timeline_id = %msg.meta.timeline_id.get_raw(),
interaction.remote_timestamp = msg.meta.timestamp.0,
"Received measurement message");
sample = msg.sample,
interaction.remote_timeline_id = msg.meta.timeline_id,
interaction.remote_timestamp = msg.meta.timestamp.0,
"Received measurement message"
);
}

expensive_task(msg.sample, &is_shutdown_requested);
Expand Down Expand Up @@ -380,9 +382,10 @@ mod monitor {
Ok(msg) => {
tracing::info!(
source = msg.source.name(),
interaction.remote_timeline_id = %msg.meta.timeline_id.get_raw(),
interaction.remote_timeline_id = msg.meta.timeline_id,
interaction.remote_timestamp = msg.meta.timestamp.0,
"Received heartbeat message");
"Received heartbeat message"
);
let prev = component_to_last_rx.insert(msg.source, Instant::now());
if prev.is_none() {
tracing::info!(
Expand Down Expand Up @@ -421,7 +424,7 @@ fn send_heartbeat(
source: Component,
// What is the timeline id of the component sending the heartbeat?
// Instead of passing this around, for a small cost, one could use the timeline_id() fn
timeline_id: TimelineId,
timeline_id: u64,
) {
let timestamp = match NanosecondsSinceUnixEpoch::now() {
Ok(timestamp) => timestamp,
Expand Down
20 changes: 9 additions & 11 deletions tracing-modality/examples/simple_multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use rand::{thread_rng, Rng};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread::{Builder, JoinHandle};
use tracing::{info, info_span};
use tracing_modality::blocking::{timeline_id, TimelineId, TracingModality};
use tracing_modality::{blocking::TracingModality, timeline_id};

const THREADS: usize = 2;

Expand All @@ -13,7 +13,7 @@ enum Message {
struct Job {
nonce: u32,
num: u32,
timeline_id: TimelineId,
timeline_id: u64,
}

fn main() {
Expand Down Expand Up @@ -41,8 +41,8 @@ fn main() {
match msg {
Message::Data(job) => {
info!(
interaction.remote_nonce=job.nonce,
interaction.remote_timeline_id=?job.timeline_id.get_raw(),
interaction.remote_nonce = job.nonce,
interaction.remote_timeline_id = job.timeline_id,
job.num,
"received",
);
Expand All @@ -53,12 +53,10 @@ fn main() {
foo = "bar"
);

let result = comp_span.in_scope(|| {
job.num * 2
});
let result = comp_span.in_scope(|| job.num * 2);
//let nonce = job.nonce + THREADS as u32;
let nonce = job.nonce;
info!(nonce = nonce, source = ?timeline_id.get_raw(), result, "sending");
info!(nonce = nonce, source = timeline_id, result, "sending");
term_tx
.send(Message::Data(Job {
nonce,
Expand All @@ -85,7 +83,7 @@ fn main() {
nonce = i,
worker = target,
input = start,
source = ?timeline_id.get_raw(),
source = timeline_id,
"sending",
);
tx_chans[target]
Expand All @@ -102,8 +100,8 @@ fn main() {
match result {
Message::Data(job) => {
info!(
interaction.remote_nonce=job.nonce,
interaction.remote_timeline_id=?job.timeline_id.get_raw(),
interaction.remote_nonce = job.nonce,
interaction.remote_timeline_id = job.timeline_id,
job.num,
"result",
);
Expand Down
32 changes: 6 additions & 26 deletions tracing-modality/src/async/layer.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
use crate::common::options::Options;
use crate::InitError;
use crate::{InitError, TIMELINE_IDENTIFIER};

use crate::common::layer::{LayerHandler, LocalMetadata};
use crate::ingest;
use crate::common::layer::LayerHandler;
use crate::ingest::{ModalityIngest, ModalityIngestTaskHandle, WrappedMessage};

use anyhow::Context as _;
use once_cell::sync::Lazy;
use std::{cell::Cell, thread::LocalKey, thread_local};
use tokio::sync::mpsc::{self, UnboundedSender};
use tracing_core::Subscriber;
use tracing_subscriber::{layer::SubscriberExt, Registry};
use uuid::Uuid;

/// A `tracing` `Layer` that can be used to record trace events and stream them to modality in real
/// time.
Expand All @@ -22,26 +18,18 @@ pub struct ModalityLayer {
}

impl ModalityLayer {
thread_local! {
static LOCAL_METADATA: Lazy<LocalMetadata> = Lazy::new(|| {
LocalMetadata {
thread_timeline: ingest::current_timeline(),
}
});
static THREAD_TIMELINE_INITIALIZED: Cell<bool> = Cell::new(false);
}

/// Initialize a new `ModalityLayer`, with default options.
pub async fn init() -> Result<(Self, ModalityIngestTaskHandle), InitError> {
Self::init_with_options(Default::default()).await
}

/// Initialize a new `ModalityLayer`, with specified options.
pub async fn init_with_options(
mut opts: Options,
opts: Options,
) -> Result<(Self, ModalityIngestTaskHandle), InitError> {
let run_id = Uuid::new_v4();
opts.add_metadata("run_id", run_id.to_string());
TIMELINE_IDENTIFIER
.set(opts.timeline_identifier)
.map_err(|_| InitError::InitializedTwice)?;

let ingest = ModalityIngest::async_connect(opts)
.await
Expand All @@ -63,12 +51,4 @@ impl LayerHandler for ModalityLayer {
fn send(&self, msg: WrappedMessage) -> Result<(), mpsc::error::SendError<WrappedMessage>> {
self.sender.send(msg)
}

fn local_metadata(&self) -> &'static LocalKey<Lazy<LocalMetadata>> {
&Self::LOCAL_METADATA
}

fn thread_timeline_initialized(&self) -> &'static LocalKey<Cell<bool>> {
&Self::THREAD_TIMELINE_INITIALIZED
}
}
2 changes: 1 addition & 1 deletion tracing-modality/src/async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod layer;
use crate::common::ingest;
pub use crate::options::Options;
use crate::InitError;
pub use ingest::{ModalityIngestTaskHandle, TimelineId};
pub use ingest::ModalityIngestTaskHandle;
pub use layer::ModalityLayer;

use anyhow::Context as _;
Expand Down
32 changes: 6 additions & 26 deletions tracing-modality/src/blocking/layer.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
use crate::common::options::Options;
use crate::InitError;
use crate::{InitError, TIMELINE_IDENTIFIER};

use crate::common::layer::{LayerHandler, LocalMetadata};
use crate::ingest;
use crate::common::layer::LayerHandler;
use crate::ingest::{ModalityIngest, ModalityIngestThreadHandle, WrappedMessage};

use anyhow::Context as _;
use once_cell::sync::Lazy;
use std::{cell::Cell, thread::LocalKey, thread_local};
use tokio::sync::mpsc::{self, UnboundedSender};
use tracing_core::Subscriber;
use tracing_subscriber::{layer::SubscriberExt, Registry};
use uuid::Uuid;

/// A `tracing` `Layer` that can be used to record trace events and stream them to modality in real
/// time.
Expand All @@ -22,26 +18,18 @@ pub struct ModalityLayer {
}

impl ModalityLayer {
thread_local! {
static LOCAL_METADATA: Lazy<LocalMetadata> = Lazy::new(|| {
LocalMetadata {
thread_timeline: ingest::current_timeline(),
}
});
static THREAD_TIMELINE_INITIALIZED: Cell<bool> = Cell::new(false);
}

/// Initialize a new `ModalityLayer`, with default options.
pub fn init() -> Result<(Self, ModalityIngestThreadHandle), InitError> {
Self::init_with_options(Default::default())
}

/// Initialize a new `ModalityLayer`, with specified options.
pub fn init_with_options(
mut opts: Options,
opts: Options,
) -> Result<(Self, ModalityIngestThreadHandle), InitError> {
let run_id = Uuid::new_v4();
opts.add_metadata("run_id", run_id.to_string());
TIMELINE_IDENTIFIER
.set(opts.timeline_identifier)
.map_err(|_| InitError::InitializedTwice)?;

let ingest = ModalityIngest::connect(opts).context("connect to modality")?;
let ingest_handle = ingest.spawn_thread();
Expand All @@ -61,12 +49,4 @@ impl LayerHandler for ModalityLayer {
fn send(&self, msg: WrappedMessage) -> Result<(), mpsc::error::SendError<WrappedMessage>> {
self.sender.send(msg)
}

fn local_metadata(&self) -> &'static LocalKey<Lazy<LocalMetadata>> {
&Self::LOCAL_METADATA
}

fn thread_timeline_initialized(&self) -> &'static LocalKey<Cell<bool>> {
&Self::THREAD_TIMELINE_INITIALIZED
}
}
2 changes: 1 addition & 1 deletion tracing-modality/src/blocking/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod layer;

pub use crate::ingest::ModalityIngestThreadHandle;
pub use crate::{timeline_id, InitError, Options, TimelineId};
pub use crate::{InitError, Options};
pub use layer::ModalityLayer;

use anyhow::Context as _;
Expand Down
Loading