Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce metered container #1516

Merged
merged 15 commits into from
Jan 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions linera-views/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ async-graphql = { workspace = true }
async-lock = { workspace = true }
async-trait = { workspace = true }
bcs = { workspace = true }
convert_case = { workspace = true }
futures = { workspace = true }
generic-array = { workspace = true }
hex = { workspace = true, optional = true }
Expand Down
50 changes: 41 additions & 9 deletions linera-views/src/dynamo_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ use linera_base::ensure;
use std::{collections::HashMap, env, str::FromStr, sync::Arc};
use thiserror::Error;

#[cfg(feature = "metrics")]
use crate::metering::{
MeteredStore, DYNAMO_DB_METRICS, LRU_CACHING_METRICS, VALUE_SPLITTING_METRICS,
};

#[cfg(any(test, feature = "test"))]
use {
crate::lru_caching::TEST_CACHE_SIZE,
Expand Down Expand Up @@ -944,7 +949,17 @@ impl DirectKeyValueStore for DynamoDbStoreInternal {

/// A shared DB client for DynamoDb implementing LruCaching
#[derive(Clone)]
#[allow(clippy::type_complexity)]
pub struct DynamoDbStore {
#[cfg(feature = "metrics")]
store: MeteredStore<
LruCachingStore<
MeteredStore<
ValueSplittingStore<MeteredStore<JournalingKeyValueStore<DynamoDbStoreInternal>>>,
>,
>,
>,
#[cfg(not(feature = "metrics"))]
store: LruCachingStore<ValueSplittingStore<JournalingKeyValueStore<DynamoDbStoreInternal>>>,
}

Expand Down Expand Up @@ -1007,6 +1022,29 @@ impl KeyValueStore for DynamoDbStore {
}

impl DynamoDbStore {
#[cfg(not(feature = "metrics"))]
fn get_complete_store(
store: JournalingKeyValueStore<DynamoDbStoreInternal>,
cache_size: usize,
) -> Self {
let store = ValueSplittingStore::new(store);
let store = LruCachingStore::new(store, cache_size);
Self { store }
}

#[cfg(feature = "metrics")]
fn get_complete_store(
store: JournalingKeyValueStore<DynamoDbStoreInternal>,
cache_size: usize,
) -> Self {
let store = MeteredStore::new(&DYNAMO_DB_METRICS, store);
let store = ValueSplittingStore::new(store);
let store = MeteredStore::new(&VALUE_SPLITTING_METRICS, store);
let store = LruCachingStore::new(store, cache_size);
let store = MeteredStore::new(&LRU_CACHING_METRICS, store);
Self { store }
}

/// Creates a `DynamoDbStore` from scratch with an LRU cache
#[cfg(any(test, feature = "test"))]
pub async fn new_for_testing(
Expand All @@ -1016,9 +1054,7 @@ impl DynamoDbStore {
let (simple_store, table_status) =
DynamoDbStoreInternal::new_for_testing(store_config).await?;
let store = JournalingKeyValueStore::new(simple_store);
let store = ValueSplittingStore::new(store);
let store = LruCachingStore::new(store, cache_size);
let store = Self { store };
let store = Self::get_complete_store(store, cache_size);
Ok((store, table_status))
}

Expand All @@ -1029,9 +1065,7 @@ impl DynamoDbStore {
let cache_size = store_config.common_config.cache_size;
let simple_store = DynamoDbStoreInternal::initialize(store_config).await?;
let store = JournalingKeyValueStore::new(simple_store);
let store = ValueSplittingStore::new(store);
let store = LruCachingStore::new(store, cache_size);
let store = Self { store };
let store = Self::get_complete_store(store, cache_size);
Ok(store)
}

Expand Down Expand Up @@ -1068,9 +1102,7 @@ impl DynamoDbStore {
let cache_size = store_config.common_config.cache_size;
let (simple_store, table_name) = DynamoDbStoreInternal::new(store_config).await?;
let store = JournalingKeyValueStore::new(simple_store);
let store = ValueSplittingStore::new(store);
let store = LruCachingStore::new(store, cache_size);
let store = Self { store };
let store = Self::get_complete_store(store, cache_size);
Ok((store, table_name))
}
}
Expand Down
4 changes: 4 additions & 0 deletions linera-views/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ pub mod common;
/// The code to turn a `DirectKeyValueStore` into a `KeyValueStore` by adding journaling.
pub mod journaling;

/// The code for encapsulating one key_value store into another that does metric
#[cfg(feature = "metrics")]
pub mod metering;

/// The code for handling big values by splitting them into several small ones.
pub mod value_splitting;

Expand Down
215 changes: 215 additions & 0 deletions linera-views/src/metering.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::{
batch::Batch,
common::{KeyValueStore, ReadableKeyValueStore, WritableKeyValueStore},
};
use async_trait::async_trait;
use convert_case::{Case, Casing};
use linera_base::sync::Lazy;
use prometheus::{register_histogram_vec, HistogramVec};
use std::{future::Future, time::Instant};

#[derive(Clone)]
/// The implementation of the `KeyValueStoreMetrics` for the `KeyValueStore`.
pub struct KeyValueStoreMetrics {
read_value_bytes: HistogramVec,
contains_key: HistogramVec,
read_multi_values_bytes: HistogramVec,
find_keys_by_prefix: HistogramVec,
find_key_values_by_prefix: HistogramVec,
write_batch: HistogramVec,
clear_journal: HistogramVec,
}

/// The metrics for the "rocks db"
#[cfg(feature = "rocksdb")]
pub static ROCKS_DB_METRICS: Lazy<KeyValueStoreMetrics> =
Lazy::new(|| KeyValueStoreMetrics::new("rocks db internal".to_string()));

/// The metrics for the "dynamo db"
#[cfg(feature = "aws")]
pub static DYNAMO_DB_METRICS: Lazy<KeyValueStoreMetrics> =
Lazy::new(|| KeyValueStoreMetrics::new("dynamo db internal".to_string()));

/// The metrics for the "scylla db"
#[cfg(feature = "scylladb")]
pub static SCYLLA_DB_METRICS: Lazy<KeyValueStoreMetrics> =
Lazy::new(|| KeyValueStoreMetrics::new("scylla db internal".to_string()));

/// The metrics for the "scylla db"
#[cfg(any(feature = "rocksdb", feature = "aws"))]
pub static VALUE_SPLITTING_METRICS: Lazy<KeyValueStoreMetrics> =
Lazy::new(|| KeyValueStoreMetrics::new("value splitting".to_string()));

/// The metrics for the "lru caching"
#[cfg(any(feature = "rocksdb", feature = "aws", feature = "scylladb"))]
pub static LRU_CACHING_METRICS: Lazy<KeyValueStoreMetrics> =
Lazy::new(|| KeyValueStoreMetrics::new("lru caching".to_string()));

impl KeyValueStoreMetrics {
/// Creation of a named Metered counter.
pub fn new(name: String) -> Self {
// name can be "rocks db". Then var_name = "rocks_db" and title_name = "RocksDb"
let var_name = name.replace(' ', "_");
let title_name = name.to_case(Case::Snake);

let read_value1 = format!("{}_read_value_bytes", var_name);
let read_value2 = format!("{} read value bytes", title_name);
let read_value_bytes = register_histogram_vec!(read_value1, read_value2, &[])
.expect("Counter creation should not fail");

let contains_key1 = format!("{}_contains_key", var_name);
let contains_key2 = format!("{} contains key", title_name);
let contains_key = register_histogram_vec!(contains_key1, contains_key2, &[])
.expect("Counter creation should not fail");

let read_multi_values1 = format!("{}_read_multi_value_bytes", var_name);
let read_multi_values2 = format!("{} read multi value bytes", title_name);
let read_multi_values_bytes =
register_histogram_vec!(read_multi_values1, read_multi_values2, &[])
.expect("Counter creation should not fail");

let find_keys1 = format!("{}_find_keys_by_prefix", var_name);
let find_keys2 = format!("{} find keys by prefix", title_name);
let find_keys_by_prefix = register_histogram_vec!(find_keys1, find_keys2, &[])
.expect("Counter creation should not fail");

let find_key_values1 = format!("{}_find_key_values_by_prefix", var_name);
let find_key_values2 = format!("{} find key values by prefix", title_name);
let find_key_values_by_prefix =
register_histogram_vec!(find_key_values1, find_key_values2, &[])
.expect("Counter creation should not fail");

let write_batch1 = format!("{}_write_batch", var_name);
let write_batch2 = format!("{} write batch", title_name);
let write_batch = register_histogram_vec!(write_batch1, write_batch2, &[])
.expect("Counter creation should not fail");

let clear_journal1 = format!("{}_clear_journal", var_name);
let clear_journal2 = format!("{} clear journal", title_name);
let clear_journal = register_histogram_vec!(clear_journal1, clear_journal2, &[])
.expect("Counter creation should not fail");

KeyValueStoreMetrics {
read_value_bytes,
contains_key,
read_multi_values_bytes,
find_keys_by_prefix,
find_key_values_by_prefix,
write_batch,
clear_journal,
}
}
}

/// A metered wrapper that keeps track of every operation
#[derive(Clone)]
pub struct MeteredStore<K> {
/// the metrics being stored
counter: &'static Lazy<KeyValueStoreMetrics>,
/// The underlying store of the metered store
pub store: K,
}

async fn run_with_execution_time_metric<F, O>(f: F, hist: &HistogramVec) -> O
where
F: Future<Output = O>,
{
let start = Instant::now();
let out = f.await;
let duration = start.elapsed();
hist.with_label_values(&[])
.observe(duration.as_micros() as f64);
out
}

#[async_trait]
impl<K, E> ReadableKeyValueStore<E> for MeteredStore<K>
where
K: ReadableKeyValueStore<E> + Send + Sync,
{
const MAX_KEY_SIZE: usize = K::MAX_KEY_SIZE;
type Keys = K::Keys;
type KeyValues = K::KeyValues;

fn max_stream_queries(&self) -> usize {
self.store.max_stream_queries()
}

async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, E> {
run_with_execution_time_metric(
self.store.read_value_bytes(key),
&self.counter.read_value_bytes,
)
.await
}

async fn contains_key(&self, key: &[u8]) -> Result<bool, E> {
run_with_execution_time_metric(self.store.contains_key(key), &self.counter.contains_key)
.await
}

async fn read_multi_values_bytes(&self, keys: Vec<Vec<u8>>) -> Result<Vec<Option<Vec<u8>>>, E> {
run_with_execution_time_metric(
self.store.read_multi_values_bytes(keys),
&self.counter.read_multi_values_bytes,
)
.await
}

async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Self::Keys, E> {
run_with_execution_time_metric(
self.store.find_keys_by_prefix(key_prefix),
&self.counter.find_keys_by_prefix,
)
.await
}

async fn find_key_values_by_prefix(&self, key_prefix: &[u8]) -> Result<Self::KeyValues, E> {
run_with_execution_time_metric(
self.store.find_key_values_by_prefix(key_prefix),
&self.counter.find_key_values_by_prefix,
)
.await
}
}

#[async_trait]
impl<K, E> WritableKeyValueStore<E> for MeteredStore<K>
where
K: WritableKeyValueStore<E> + Send + Sync,
{
const MAX_VALUE_SIZE: usize = K::MAX_VALUE_SIZE;

async fn write_batch(&self, batch: Batch, base_key: &[u8]) -> Result<(), E> {
run_with_execution_time_metric(
self.store.write_batch(batch, base_key),
&self.counter.write_batch,
)
.await
}

async fn clear_journal(&self, base_key: &[u8]) -> Result<(), E> {
run_with_execution_time_metric(
self.store.clear_journal(base_key),
&self.counter.clear_journal,
)
.await
}
}

impl<K> KeyValueStore for MeteredStore<K>
where
K: KeyValueStore + Send + Sync,
{
type Error = K::Error;
}

impl<K> MeteredStore<K> {
/// Creates a new Metered store
pub fn new(counter: &'static Lazy<KeyValueStoreMetrics>, store: K) -> Self {
Self { counter, store }
}
}
Loading