Skip to content

Commit

Permalink
Switch to timeline LRU cache
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesmunns committed Aug 22, 2022
1 parent e93727f commit 70d2019
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 16 deletions.
24 changes: 8 additions & 16 deletions tracing-modality/src/common/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub use modality_ingest_client::types::TimelineId;

use crate::{
layer::{RecordMap, TracingValue},
Options,
Options, timeline_lru::TimelineLru,
};
use anyhow::Context;
use modality_ingest_client::{
Expand Down Expand Up @@ -165,9 +165,7 @@ pub(crate) struct ModalityIngest {
timeline_keys: HashMap<String, AttrKey>,
span_names: HashMap<NonZeroU64, String>,
run_id: Uuid,

// TODO(AJM): Replace me with something that does LRU cache eviction!
timeline_map: HashMap<u64, TimelineId>,
timeline_map: TimelineLru,

#[cfg(feature = "blocking")]
rt: Option<Runtime>,
Expand Down Expand Up @@ -229,7 +227,7 @@ impl ModalityIngest {
timeline_keys: HashMap::new(),
span_names: HashMap::new(),
run_id,
timeline_map: HashMap::new(),
timeline_map: TimelineLru::with_capacity(options.lru_cache_size),
#[cfg(feature = "blocking")]
rt: None,
})
Expand Down Expand Up @@ -298,12 +296,6 @@ impl ModalityIngest {
let _ = self.client.flush().await;
}

#[inline]
fn generate_timeline_id(&self, user_id: u64) -> TimelineId {
// We deterministically generate timeline IDs using UUIDv5, with
uuid::Uuid::new_v5(&self.run_id, &user_id.to_ne_bytes()).into()
}

/// Ensures the reported user timeline is the currently open client timeline.
///
/// * If this is a new`*` timeline, we will first register its metadata,
Expand All @@ -315,10 +307,10 @@ impl ModalityIngest {
/// `*`: In some cases, we MAY end up re-registering the metadata for a timeline,
/// if the timeline has not been recently used.
async fn bind_user_timeline(&mut self, name: String, user_id: u64) -> Result<(), IngestError> {
let timeline_id = match self.timeline_map.get(&user_id).copied() {
Some(tid) => tid,
None => {
let timeline_id = self.generate_timeline_id(user_id);
let timeline_id = match self.timeline_map.query(user_id) {
Ok(tid) => tid,
Err(token) => {
let timeline_id = uuid::Uuid::new_v5(&self.run_id, &user_id.to_ne_bytes()).into();

// Register the metadata of the new* timeline ID
let mut timeline_metadata = self.global_metadata.clone();
Expand All @@ -337,7 +329,7 @@ impl ModalityIngest {
}

// Success, now add to the map
self.timeline_map.insert(user_id, timeline_id);
self.timeline_map.insert(user_id, timeline_id, token);

// And return the timeline
timeline_id
Expand Down
1 change: 1 addition & 0 deletions tracing-modality/src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub(crate) mod ingest;
pub(crate) mod layer;
pub(crate) mod options;
pub(crate) mod timeline_lru;

#[cfg(doc)]
use crate::Options;
Expand Down
18 changes: 18 additions & 0 deletions tracing-modality/src/common/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub struct Options {
pub(crate) metadata: Vec<(String, AttrVal)>,
pub(crate) server_addr: SocketAddr,
pub(crate) timeline_identifier: fn() -> UserTimelineInfo,
pub(crate) lru_cache_size: usize,
}

impl Options {
Expand All @@ -65,6 +66,7 @@ impl Options {
metadata: Vec::new(),
server_addr,
timeline_identifier: thread_timeline,
lru_cache_size: 64,
}
}

Expand Down Expand Up @@ -112,6 +114,22 @@ impl Options {
self
}

/// How many modality timelines should be cached?
///
/// Setting this to a higher number increases the amount of memory used to remember
/// which `TimelineId`s have been registered with modalityd.
///
/// Setting this to a lower number increases the liklihood that timeline metadata
/// will need to be re-sent to modalityd (and the amount of network traffic sent)
pub fn set_lru_cache_size(&mut self, size: usize) {
self.lru_cache_size = size;
}
/// A chainable version of [set_lru_cache_size](Self::set_lru_cache_size).
pub fn with_lru_cache_size(mut self, size: usize) -> Self {
self.lru_cache_size = size;
self
}

/// Set the name for the root timeline. By default this will be the name of the main thread as
/// provided by the OS.
pub fn set_name<S: AsRef<str>>(&mut self, name: S) {
Expand Down
99 changes: 99 additions & 0 deletions tracing-modality/src/common/timeline_lru.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::{time::Instant, mem::swap};
use modality_ingest_client::types::TimelineId;

/// A Least Recently Used (LRU) cache of timeline information
pub struct TimelineLru {
// TODO(AJM): At the moment, the contents of the LRU cache are unsorted.
//
// Sorting them by time would be BAD, because there would be lots of churn.
// However, sorting them by `user_id` might be good, because then we could
// use a binary search for the fast path of query, but then we'd need to
// still do a linear search to determine the oldest item.
//
// For now, a linear search of a relatively small number (64-128) of items
// is likely to be more than fast enough. If it later turns out that users
// need a much larger LRU cache size (e.g. 1024+), we might consider making this
// optimization at that time.
data: Vec<LruItem>,
}

impl TimelineLru {
/// Create a timeline LRU cache with a maximum size of `cap`.
pub fn with_capacity(cap: usize) -> Self {
Self {
data: Vec::with_capacity(cap),
}
}

/// Check to see if the given `user_id` is known by the Lru
pub fn query(&mut self, user_id: u64) -> Result<TimelineId, LruToken> {
let full = self.data.len() >= self.data.capacity();

if !full {
// If we're not full, we don't need to keep track of the oldest item.
// Just find it or don't.
match self.data.iter_mut().find(|li| li.user_id == user_id) {
Some(li) => {
li.last_use = Instant::now();
Ok(li.timeline_id)
},
None => Err(LruToken { val: LruError::DoesntExistNotFull }),
}
} else {
// Traverse the list, remembering the oldest slot we've seen. We'll either:
//
// * Find the item, bailing out early
// * Reach the end, in which case we've already found the oldest item
let mut oldest_time = Instant::now();
let mut oldest_slot = 0;
for (idx, li) in self.data.iter_mut().enumerate() {
if li.user_id == user_id {
li.last_use = Instant::now();
return Ok(li.timeline_id);
}

if oldest_time >= li.last_use {
oldest_time = li.last_use;
oldest_slot = idx;
}
}

// We reached the end, it doesn't exist.
Err(LruToken { val: LruError::DoesntExistReplace(oldest_slot) })
}
}

pub fn insert(&mut self, user_id: u64, timeline_id: TimelineId, token: LruToken) {
let mut item = LruItem {
user_id,
timeline_id,
last_use: Instant::now(),
};

match token.val {
LruError::DoesntExistNotFull => self.data.push(item),
LruError::DoesntExistReplace(idx) => {
if let Some(old) = self.data.get_mut(idx) {
swap(old, &mut item)
}
}
}
}
}

struct LruItem {
user_id: u64,
timeline_id: TimelineId,
last_use: Instant,
}

pub struct LruToken {
// Note: This is a struct with a private field to ensure that
// `query` must be called before `insert`.
val: LruError
}

enum LruError {
DoesntExistNotFull,
DoesntExistReplace(usize),
}

0 comments on commit 70d2019

Please sign in to comment.