diff --git a/Cargo.toml b/Cargo.toml index 799fa61..1cb0cc1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,3 +5,8 @@ members = [ "tracing-serde-modality-ingest", "tracing-serde-subscriber", ] + +# TODO(AJM): Work around mismatched UUID version +[patch.crates-io.modality-ingest-client] +git = "https://github.com/auxoncorp/modality-sdk" +rev = "4a3b9e3cca08e8225e084f9e8409e818646589ec" diff --git a/tracing-modality/src/async/layer.rs b/tracing-modality/src/async/layer.rs index 0fae002..efd35ce 100644 --- a/tracing-modality/src/async/layer.rs +++ b/tracing-modality/src/async/layer.rs @@ -1,5 +1,5 @@ use crate::common::options::Options; -use crate::{InitError, TIMELINE_IDENTIFIER, RUN_ID}; +use crate::{InitError, TIMELINE_IDENTIFIER}; use crate::common::layer::LayerHandler; use crate::ingest::{ModalityIngest, ModalityIngestTaskHandle, WrappedMessage}; @@ -8,7 +8,6 @@ use anyhow::Context as _; use tokio::sync::mpsc::{self, UnboundedSender}; use tracing_core::Subscriber; use tracing_subscriber::{layer::SubscriberExt, Registry}; -use uuid::Uuid; /// A `tracing` `Layer` that can be used to record trace events and stream them to modality in real /// time. @@ -26,15 +25,8 @@ impl ModalityLayer { /// Initialize a new `ModalityLayer`, with specified options. pub async fn init_with_options( - mut opts: Options, + opts: Options, ) -> Result<(Self, ModalityIngestTaskHandle), InitError> { - let run_id = Uuid::new_v4(); - opts.add_metadata("run_id", run_id.to_string()); - - RUN_ID - .set(run_id) - .map_err(|_| InitError::InitializedTwice)?; - TIMELINE_IDENTIFIER .set(opts.timeline_identifier) .map_err(|_| InitError::InitializedTwice)?; diff --git a/tracing-modality/src/blocking/layer.rs b/tracing-modality/src/blocking/layer.rs index a221b81..23e7ffc 100644 --- a/tracing-modality/src/blocking/layer.rs +++ b/tracing-modality/src/blocking/layer.rs @@ -1,5 +1,5 @@ use crate::common::options::Options; -use crate::{InitError, RUN_ID, TIMELINE_IDENTIFIER}; +use crate::{InitError, TIMELINE_IDENTIFIER}; use crate::common::layer::LayerHandler; use crate::ingest::{ModalityIngest, ModalityIngestThreadHandle, WrappedMessage}; @@ -8,7 +8,6 @@ use anyhow::Context as _; use tokio::sync::mpsc::{self, UnboundedSender}; use tracing_core::Subscriber; use tracing_subscriber::{layer::SubscriberExt, Registry}; -use uuid::Uuid; /// A `tracing` `Layer` that can be used to record trace events and stream them to modality in real /// time. @@ -26,15 +25,8 @@ impl ModalityLayer { /// Initialize a new `ModalityLayer`, with specified options. pub fn init_with_options( - mut opts: Options, + opts: Options, ) -> Result<(Self, ModalityIngestThreadHandle), InitError> { - let run_id = Uuid::new_v4(); - opts.add_metadata("run_id", run_id.to_string()); - - RUN_ID - .set(run_id) - .map_err(|_| InitError::InitializedTwice)?; - TIMELINE_IDENTIFIER .set(opts.timeline_identifier) .map_err(|_| InitError::InitializedTwice)?; diff --git a/tracing-modality/src/common/ingest.rs b/tracing-modality/src/common/ingest.rs index f379862..31b34af 100644 --- a/tracing-modality/src/common/ingest.rs +++ b/tracing-modality/src/common/ingest.rs @@ -60,9 +60,6 @@ pub(crate) struct WrappedMessage { #[derive(Debug)] pub(crate) enum Message { - NewTimeline { - name: String, - }, NewSpan { id: SpanId, metadata: &'static Metadata<'static>, @@ -164,8 +161,13 @@ pub(crate) struct ModalityIngest { client: IngestClient, global_metadata: Vec<(String, AttrVal)>, event_keys: HashMap, + // TODO(AJM): Should these be per-timeline? Or are all attrs shared with all timelines? timeline_keys: HashMap, span_names: HashMap, + run_id: Uuid, + + // TODO(AJM): Replace me with something that does LRU cache eviction! + timeline_map: HashMap, #[cfg(feature = "blocking")] rt: Option, @@ -187,12 +189,16 @@ impl ModalityIngest { }) } - pub(crate) async fn async_connect(options: Options) -> Result { + pub(crate) async fn async_connect(mut options: Options) -> Result { let url = url::Url::parse(&format!("modality-ingest://{}/", options.server_addr)).unwrap(); let unauth_client = IngestClient::connect(&url, false) .await .context("init ingest client")?; + // TODO(AJM): Does this NEED to be in the global metadata? The old code did this. + let run_id = Uuid::new_v4(); + options.add_metadata("run_id", run_id.to_string()); + let auth_key = options.auth.ok_or(ConnectError::AuthRequired)?; let client = unauth_client .authenticate(auth_key) @@ -202,13 +208,18 @@ impl ModalityIngest { // open a timeline for the current thread because we need to open something to make the // types work // - // TODO(AJM): Should this be thread_timeline? Or the user callback? - let client = todo!("Figure out how to get timeline at this point. Do we have a run_id yet?"); - // let timeline_id = thread_timeline(); - // let client = client - // .open_timeline(timeline_id.id) - // .await - // .context("open new timeline")?; + // TODO(AJM): Grumble grumble, this is partially duplicating the work of `bind_user_timeline()`, + // and also not actually registering the metadata for this timeline. Since this is our worker + // thread, and we will never produce tracing information that is sent to modality, it is probably + // fine. If we do at a later time, it will then be registered via the actual call to + // `bind_user_timeline()`. + let user_id = (options.timeline_identifier)().user_id; + let timeline_id = uuid::Uuid::new_v5(&run_id, &user_id.to_ne_bytes()).into(); + + let client = client + .open_timeline(timeline_id) + .await + .context("open new timeline")?; #[allow(unreachable_code)] Ok(Self { @@ -217,6 +228,8 @@ impl ModalityIngest { event_keys: HashMap::new(), timeline_keys: HashMap::new(), span_names: HashMap::new(), + run_id, + timeline_map: HashMap::new(), #[cfg(feature = "blocking")] rt: None, }) @@ -285,230 +298,266 @@ impl ModalityIngest { let _ = self.client.flush().await; } + #[inline] + fn generate_timeline_id(&self, user_id: u64) -> TimelineId { + // We deterministically generate timeline IDs using UUIDv5, with + uuid::Uuid::new_v5(&self.run_id, &user_id.to_ne_bytes()).into() + } + + /// Ensures the reported user timeline is the currently open client timeline. + /// + /// * If this is a new`*` timeline, we will first register its metadata, + /// and open the timeline. + /// * If this is NOT a new timeline, but not the current timeline, + /// we will open it. + /// * If this is already the current timeline, nothing will be done. + /// + /// `*`: In some cases, we MAY end up re-registering the metadata for a timeline, + /// if the timeline has not been recently used. + async fn bind_user_timeline(&mut self, name: String, user_id: u64) -> Result<(), IngestError> { + let timeline_id = match self.timeline_map.get(&user_id).copied() { + Some(tid) => tid, + None => { + let timeline_id = self.generate_timeline_id(user_id); + + // Register the metadata of the new* timeline ID + let mut timeline_metadata = self.global_metadata.clone(); + timeline_metadata.push(("timeline.name".to_string(), name.into())); + + for (key, value) in timeline_metadata { + let timeline_key_name = self + .get_or_create_timeline_attr_key(key) + .await + .context("get or define timeline attr key")?; + + self.client + .timeline_metadata([(timeline_key_name, value)]) + .await + .context("apply timeline metadata")?; + } + + // Success, now add to the map + self.timeline_map.insert(user_id, timeline_id); + + // And return the timeline + timeline_id + }, + }; + + // TODO(AJM): DO LRU EVICTION HERE! + + if self.client.bound_timeline() != timeline_id { + self.client + .open_timeline(timeline_id) + .await + .context("open new timeline")?; + } + + Ok(()) + } + async fn handle_packet(&mut self, message: WrappedMessage) -> Result<(), IngestError> { - todo!("Rework processing!") - - // let WrappedMessage { - // message, - // tick, - // timeline, - // } = message; - - // if self.client.bound_timeline() != timeline { - // self.client - // .open_timeline(timeline) - // .await - // .context("open new timeline")?; - // } - - // match message { - // Message::NewTimeline { name } => { - // let mut timeline_metadata = self.global_metadata.clone(); - - // if !timeline_metadata.iter().any(|(k, _v)| k == "name") { - // timeline_metadata.push(("timeline.name".to_string(), name.into())); - // } - - // for (key, value) in timeline_metadata { - // let timeline_key_name = self - // .get_or_create_timeline_attr_key(key) - // .await - // .context("get or define timeline attr key")?; - - // self.client - // .timeline_metadata([(timeline_key_name, value)]) - // .await - // .context("apply timeline metadata")?; - // } - // } - // Message::NewSpan { - // id, - // metadata, - // mut records, - // } => { - // let name = { - // // store name for future use - // let name = records - // .get("name") - // .or_else(|| records.get("message")) - // .map(|n| format!("{:?}", n)) - // .unwrap_or_else(|| metadata.name().to_string()); - - // self.span_names.insert(id, name.clone()); - - // name - // }; - - // let mut packed_attrs = Vec::new(); - - // packed_attrs.push(( - // self.get_or_create_event_attr_key("event.name".to_string()) - // .await?, - // AttrVal::String(name), - // )); - - // let kind = records - // .remove("modality.kind") - // .map(tracing_value_to_attr_val) - // .unwrap_or_else(|| "span:defined".into()); - // packed_attrs.push(( - // self.get_or_create_event_attr_key("event.internal.rs.kind".to_string()) - // .await?, - // kind, - // )); - - // let span_id = records - // .remove("modality.span_id") - // .map(tracing_value_to_attr_val) - // .unwrap_or_else(|| BigInt::new_attr_val(u64::from(id) as i128)); - // packed_attrs.push(( - // self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string()) - // .await?, - // span_id, - // )); - - // self.pack_common_attrs(&mut packed_attrs, metadata, records, tick) - // .await?; - - // self.client - // .event(tick.as_nanos(), packed_attrs) - // .await - // .context("send packed event")?; - // } - // Message::Record { span, records } => { - // // TODO: span events can't be added to after being sent, impl this once we can use - // // timelines to represent spans - - // let _ = span; - // let _ = records; - // } - // Message::RecordFollowsFrom { span, follows } => { - // // TODO: span events can't be added to after being sent, impl this once we can use - // // timelines to represent spans - - // let _ = span; - // let _ = follows; - // } - // Message::Event { - // metadata, - // mut records, - // } => { - // let mut packed_attrs = Vec::new(); - - // let kind = records - // .remove("modality.kind") - // .map(tracing_value_to_attr_val) - // .unwrap_or_else(|| "event".into()); - // packed_attrs.push(( - // self.get_or_create_event_attr_key("event.internal.rs.kind".to_string()) - // .await?, - // kind, - // )); - - // self.pack_common_attrs(&mut packed_attrs, metadata, records, tick) - // .await?; - - // self.client - // .event(tick.as_nanos(), packed_attrs) - // .await - // .context("send packed event")?; - // } - // Message::Enter { span } => { - // let mut packed_attrs = Vec::new(); - - // { - // // get stored span name - // let name = self.span_names.get(&span).map(|n| format!("enter: {}", n)); - - // if let Some(name) = name { - // packed_attrs.push(( - // self.get_or_create_event_attr_key("event.name".to_string()) - // .await?, - // AttrVal::String(name), - // )); - // } - // }; - - // packed_attrs.push(( - // self.get_or_create_event_attr_key("event.internal.rs.kind".to_string()) - // .await?, - // AttrVal::String("span:enter".to_string()), - // )); - - // packed_attrs.push(( - // self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string()) - // .await?, - // BigInt::new_attr_val(u64::from(span).into()), - // )); - - // // only record tick directly during the first ~5.8 centuries this program is running - // if let Ok(tick) = TryInto::::try_into(tick.as_nanos()) { - // packed_attrs.push(( - // self.get_or_create_event_attr_key("event.internal.rs.tick".to_string()) - // .await?, - // AttrVal::LogicalTime(LogicalTime::unary(tick)), - // )); - // } - - // self.client - // .event(tick.as_nanos(), packed_attrs) - // .await - // .context("send packed event")?; - // } - // Message::Exit { span } => { - // let mut packed_attrs = Vec::new(); - - // { - // // get stored span name - // let name = self.span_names.get(&span).map(|n| format!("exit: {}", n)); - - // if let Some(name) = name { - // packed_attrs.push(( - // self.get_or_create_event_attr_key("event.name".to_string()) - // .await?, - // AttrVal::String(name), - // )); - // } - // }; - - // packed_attrs.push(( - // self.get_or_create_event_attr_key("event.internal.rs.kind".to_string()) - // .await?, - // AttrVal::String("span:exit".to_string()), - // )); - - // packed_attrs.push(( - // self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string()) - // .await?, - // BigInt::new_attr_val(u64::from(span).into()), - // )); - - // // only record tick directly during the first ~5.8 centuries this program is running - // if let Ok(tick) = TryInto::::try_into(tick.as_nanos()) { - // packed_attrs.push(( - // self.get_or_create_event_attr_key("event.internal.rs.tick".to_string()) - // .await?, - // AttrVal::LogicalTime(LogicalTime::unary(tick)), - // )); - // } - - // self.client - // .event(tick.as_nanos(), packed_attrs) - // .await - // .context("send packed event")?; - // } - // Message::Close { span } => { - // self.span_names.remove(&span); - // } - // Message::IdChange { old, new } => { - // let name = self.span_names.get(&old).cloned(); - // if let Some(name) = name { - // self.span_names.insert(new, name); - // } - // } - // } - - // Ok(()) + let WrappedMessage { + message, + tick, + timeline_name, + user_timeline_id, + } = message; + + // Ensure that the user reported timeline ID is active. + self.bind_user_timeline(timeline_name, user_timeline_id).await?; + + match message { + Message::NewSpan { + id, + metadata, + mut records, + } => { + let name = { + // store name for future use + let name = records + .get("name") + .or_else(|| records.get("message")) + .map(|n| format!("{:?}", n)) + .unwrap_or_else(|| metadata.name().to_string()); + + self.span_names.insert(id, name.clone()); + + name + }; + + let mut packed_attrs = Vec::new(); + + packed_attrs.push(( + self.get_or_create_event_attr_key("event.name".to_string()) + .await?, + AttrVal::String(name), + )); + + let kind = records + .remove("modality.kind") + .map(tracing_value_to_attr_val) + .unwrap_or_else(|| "span:defined".into()); + packed_attrs.push(( + self.get_or_create_event_attr_key("event.internal.rs.kind".to_string()) + .await?, + kind, + )); + + let span_id = records + .remove("modality.span_id") + .map(tracing_value_to_attr_val) + .unwrap_or_else(|| BigInt::new_attr_val(u64::from(id) as i128)); + packed_attrs.push(( + self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string()) + .await?, + span_id, + )); + + self.pack_common_attrs(&mut packed_attrs, metadata, records, tick) + .await?; + + self.client + .event(tick.as_nanos(), packed_attrs) + .await + .context("send packed event")?; + } + Message::Record { span, records } => { + // TODO: span events can't be added to after being sent, impl this once we can use + // timelines to represent spans + + let _ = span; + let _ = records; + } + Message::RecordFollowsFrom { span, follows } => { + // TODO: span events can't be added to after being sent, impl this once we can use + // timelines to represent spans + + let _ = span; + let _ = follows; + } + Message::Event { + metadata, + mut records, + } => { + let mut packed_attrs = Vec::new(); + + let kind = records + .remove("modality.kind") + .map(tracing_value_to_attr_val) + .unwrap_or_else(|| "event".into()); + packed_attrs.push(( + self.get_or_create_event_attr_key("event.internal.rs.kind".to_string()) + .await?, + kind, + )); + + self.pack_common_attrs(&mut packed_attrs, metadata, records, tick) + .await?; + + self.client + .event(tick.as_nanos(), packed_attrs) + .await + .context("send packed event")?; + } + Message::Enter { span } => { + let mut packed_attrs = Vec::new(); + + { + // get stored span name + let name = self.span_names.get(&span).map(|n| format!("enter: {}", n)); + + if let Some(name) = name { + packed_attrs.push(( + self.get_or_create_event_attr_key("event.name".to_string()) + .await?, + AttrVal::String(name), + )); + } + }; + + packed_attrs.push(( + self.get_or_create_event_attr_key("event.internal.rs.kind".to_string()) + .await?, + AttrVal::String("span:enter".to_string()), + )); + + packed_attrs.push(( + self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string()) + .await?, + BigInt::new_attr_val(u64::from(span).into()), + )); + + // only record tick directly during the first ~5.8 centuries this program is running + if let Ok(tick) = TryInto::::try_into(tick.as_nanos()) { + packed_attrs.push(( + self.get_or_create_event_attr_key("event.internal.rs.tick".to_string()) + .await?, + AttrVal::LogicalTime(LogicalTime::unary(tick)), + )); + } + + self.client + .event(tick.as_nanos(), packed_attrs) + .await + .context("send packed event")?; + } + Message::Exit { span } => { + let mut packed_attrs = Vec::new(); + + { + // get stored span name + let name = self.span_names.get(&span).map(|n| format!("exit: {}", n)); + + if let Some(name) = name { + packed_attrs.push(( + self.get_or_create_event_attr_key("event.name".to_string()) + .await?, + AttrVal::String(name), + )); + } + }; + + packed_attrs.push(( + self.get_or_create_event_attr_key("event.internal.rs.kind".to_string()) + .await?, + AttrVal::String("span:exit".to_string()), + )); + + packed_attrs.push(( + self.get_or_create_event_attr_key("event.internal.rs.span_id".to_string()) + .await?, + BigInt::new_attr_val(u64::from(span).into()), + )); + + // only record tick directly during the first ~5.8 centuries this program is running + if let Ok(tick) = TryInto::::try_into(tick.as_nanos()) { + packed_attrs.push(( + self.get_or_create_event_attr_key("event.internal.rs.tick".to_string()) + .await?, + AttrVal::LogicalTime(LogicalTime::unary(tick)), + )); + } + + self.client + .event(tick.as_nanos(), packed_attrs) + .await + .context("send packed event")?; + } + Message::Close { span } => { + self.span_names.remove(&span); + } + Message::IdChange { old, new } => { + let name = self.span_names.get(&old).cloned(); + if let Some(name) = name { + self.span_names.insert(new, name); + } + } + } + + Ok(()) + + // todo!("Rework processing!"); } async fn get_or_create_timeline_attr_key(