From 70d2019d63b237180153e924d37b6bfe2e62cf75 Mon Sep 17 00:00:00 2001 From: James Munns Date: Mon, 22 Aug 2022 23:05:17 +0200 Subject: [PATCH] Switch to timeline LRU cache --- tracing-modality/src/common/ingest.rs | 24 ++--- tracing-modality/src/common/mod.rs | 1 + tracing-modality/src/common/options.rs | 18 ++++ tracing-modality/src/common/timeline_lru.rs | 99 +++++++++++++++++++++ 4 files changed, 126 insertions(+), 16 deletions(-) create mode 100644 tracing-modality/src/common/timeline_lru.rs diff --git a/tracing-modality/src/common/ingest.rs b/tracing-modality/src/common/ingest.rs index cb01db1..acd47c8 100644 --- a/tracing-modality/src/common/ingest.rs +++ b/tracing-modality/src/common/ingest.rs @@ -2,7 +2,7 @@ pub use modality_ingest_client::types::TimelineId; use crate::{ layer::{RecordMap, TracingValue}, - Options, + Options, timeline_lru::TimelineLru, }; use anyhow::Context; use modality_ingest_client::{ @@ -165,9 +165,7 @@ pub(crate) struct ModalityIngest { timeline_keys: HashMap, span_names: HashMap, run_id: Uuid, - - // TODO(AJM): Replace me with something that does LRU cache eviction! - timeline_map: HashMap, + timeline_map: TimelineLru, #[cfg(feature = "blocking")] rt: Option, @@ -229,7 +227,7 @@ impl ModalityIngest { timeline_keys: HashMap::new(), span_names: HashMap::new(), run_id, - timeline_map: HashMap::new(), + timeline_map: TimelineLru::with_capacity(options.lru_cache_size), #[cfg(feature = "blocking")] rt: None, }) @@ -298,12 +296,6 @@ impl ModalityIngest { let _ = self.client.flush().await; } - #[inline] - fn generate_timeline_id(&self, user_id: u64) -> TimelineId { - // We deterministically generate timeline IDs using UUIDv5, with - uuid::Uuid::new_v5(&self.run_id, &user_id.to_ne_bytes()).into() - } - /// Ensures the reported user timeline is the currently open client timeline. /// /// * If this is a new`*` timeline, we will first register its metadata, @@ -315,10 +307,10 @@ impl ModalityIngest { /// `*`: In some cases, we MAY end up re-registering the metadata for a timeline, /// if the timeline has not been recently used. async fn bind_user_timeline(&mut self, name: String, user_id: u64) -> Result<(), IngestError> { - let timeline_id = match self.timeline_map.get(&user_id).copied() { - Some(tid) => tid, - None => { - let timeline_id = self.generate_timeline_id(user_id); + let timeline_id = match self.timeline_map.query(user_id) { + Ok(tid) => tid, + Err(token) => { + let timeline_id = uuid::Uuid::new_v5(&self.run_id, &user_id.to_ne_bytes()).into(); // Register the metadata of the new* timeline ID let mut timeline_metadata = self.global_metadata.clone(); @@ -337,7 +329,7 @@ impl ModalityIngest { } // Success, now add to the map - self.timeline_map.insert(user_id, timeline_id); + self.timeline_map.insert(user_id, timeline_id, token); // And return the timeline timeline_id diff --git a/tracing-modality/src/common/mod.rs b/tracing-modality/src/common/mod.rs index 365829c..ebd13f8 100644 --- a/tracing-modality/src/common/mod.rs +++ b/tracing-modality/src/common/mod.rs @@ -1,6 +1,7 @@ pub(crate) mod ingest; pub(crate) mod layer; pub(crate) mod options; +pub(crate) mod timeline_lru; #[cfg(doc)] use crate::Options; diff --git a/tracing-modality/src/common/options.rs b/tracing-modality/src/common/options.rs index 82f42da..add4c4d 100644 --- a/tracing-modality/src/common/options.rs +++ b/tracing-modality/src/common/options.rs @@ -54,6 +54,7 @@ pub struct Options { pub(crate) metadata: Vec<(String, AttrVal)>, pub(crate) server_addr: SocketAddr, pub(crate) timeline_identifier: fn() -> UserTimelineInfo, + pub(crate) lru_cache_size: usize, } impl Options { @@ -65,6 +66,7 @@ impl Options { metadata: Vec::new(), server_addr, timeline_identifier: thread_timeline, + lru_cache_size: 64, } } @@ -112,6 +114,22 @@ impl Options { self } + /// How many modality timelines should be cached? + /// + /// Setting this to a higher number increases the amount of memory used to remember + /// which `TimelineId`s have been registered with modalityd. + /// + /// Setting this to a lower number increases the liklihood that timeline metadata + /// will need to be re-sent to modalityd (and the amount of network traffic sent) + pub fn set_lru_cache_size(&mut self, size: usize) { + self.lru_cache_size = size; + } + /// A chainable version of [set_lru_cache_size](Self::set_lru_cache_size). + pub fn with_lru_cache_size(mut self, size: usize) -> Self { + self.lru_cache_size = size; + self + } + /// Set the name for the root timeline. By default this will be the name of the main thread as /// provided by the OS. pub fn set_name>(&mut self, name: S) { diff --git a/tracing-modality/src/common/timeline_lru.rs b/tracing-modality/src/common/timeline_lru.rs new file mode 100644 index 0000000..29ae382 --- /dev/null +++ b/tracing-modality/src/common/timeline_lru.rs @@ -0,0 +1,99 @@ +use std::{time::Instant, mem::swap}; +use modality_ingest_client::types::TimelineId; + +/// A Least Recently Used (LRU) cache of timeline information +pub struct TimelineLru { + // TODO(AJM): At the moment, the contents of the LRU cache are unsorted. + // + // Sorting them by time would be BAD, because there would be lots of churn. + // However, sorting them by `user_id` might be good, because then we could + // use a binary search for the fast path of query, but then we'd need to + // still do a linear search to determine the oldest item. + // + // For now, a linear search of a relatively small number (64-128) of items + // is likely to be more than fast enough. If it later turns out that users + // need a much larger LRU cache size (e.g. 1024+), we might consider making this + // optimization at that time. + data: Vec, +} + +impl TimelineLru { + /// Create a timeline LRU cache with a maximum size of `cap`. + pub fn with_capacity(cap: usize) -> Self { + Self { + data: Vec::with_capacity(cap), + } + } + + /// Check to see if the given `user_id` is known by the Lru + pub fn query(&mut self, user_id: u64) -> Result { + let full = self.data.len() >= self.data.capacity(); + + if !full { + // If we're not full, we don't need to keep track of the oldest item. + // Just find it or don't. + match self.data.iter_mut().find(|li| li.user_id == user_id) { + Some(li) => { + li.last_use = Instant::now(); + Ok(li.timeline_id) + }, + None => Err(LruToken { val: LruError::DoesntExistNotFull }), + } + } else { + // Traverse the list, remembering the oldest slot we've seen. We'll either: + // + // * Find the item, bailing out early + // * Reach the end, in which case we've already found the oldest item + let mut oldest_time = Instant::now(); + let mut oldest_slot = 0; + for (idx, li) in self.data.iter_mut().enumerate() { + if li.user_id == user_id { + li.last_use = Instant::now(); + return Ok(li.timeline_id); + } + + if oldest_time >= li.last_use { + oldest_time = li.last_use; + oldest_slot = idx; + } + } + + // We reached the end, it doesn't exist. + Err(LruToken { val: LruError::DoesntExistReplace(oldest_slot) }) + } + } + + pub fn insert(&mut self, user_id: u64, timeline_id: TimelineId, token: LruToken) { + let mut item = LruItem { + user_id, + timeline_id, + last_use: Instant::now(), + }; + + match token.val { + LruError::DoesntExistNotFull => self.data.push(item), + LruError::DoesntExistReplace(idx) => { + if let Some(old) = self.data.get_mut(idx) { + swap(old, &mut item) + } + } + } + } +} + +struct LruItem { + user_id: u64, + timeline_id: TimelineId, + last_use: Instant, +} + +pub struct LruToken { + // Note: This is a struct with a private field to ensure that + // `query` must be called before `insert`. + val: LruError +} + +enum LruError { + DoesntExistNotFull, + DoesntExistReplace(usize), +}