Skip to content

Commit

Permalink
Reformatting.
Browse files Browse the repository at this point in the history
  • Loading branch information
MathieuDutSik committed Jan 17, 2024
1 parent 3405601 commit a3e5810
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 44 deletions.
20 changes: 16 additions & 4 deletions linera-views/src/dynamo_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use thiserror::Error;
#[cfg(feature = "metrics")]
use crate::metered_wrapper::MeteredStore;


#[cfg(any(test, feature = "test"))]
use {
crate::lru_caching::TEST_CACHE_SIZE,
Expand Down Expand Up @@ -948,9 +947,16 @@ impl DirectKeyValueStore for DynamoDbStoreInternal {

/// A shared DB client for DynamoDb implementing LruCaching
#[derive(Clone)]
#[allow(clippy::type_complexity)]
pub struct DynamoDbStore {
#[cfg(feature = "metrics")]
store: MeteredStore<LruCachingStore<MeteredStore<ValueSplittingStore<MeteredStore<JournalingKeyValueStore<DynamoDbStoreInternal>>>>>>,
store: MeteredStore<
LruCachingStore<
MeteredStore<
ValueSplittingStore<MeteredStore<JournalingKeyValueStore<DynamoDbStoreInternal>>>,
>,
>,
>,
#[cfg(not(feature = "metrics"))]
store: LruCachingStore<ValueSplittingStore<JournalingKeyValueStore<DynamoDbStoreInternal>>>,
}
Expand Down Expand Up @@ -1015,14 +1021,20 @@ impl KeyValueStore for DynamoDbStore {

impl DynamoDbStore {
#[cfg(not(feature = "metrics"))]
fn get_complete_store(store: JournalingKeyValueStore<DynamoDbStoreInternal>, cache_size: usize) -> Self {
fn get_complete_store(
store: JournalingKeyValueStore<DynamoDbStoreInternal>,
cache_size: usize,
) -> Self {
let store = ValueSplittingStore::new(store);
let store = LruCachingStore::new(store, cache_size);
Self { store }
}

#[cfg(feature = "metrics")]
fn get_complete_store(store: JournalingKeyValueStore<DynamoDbStoreInternal>, cache_size: usize) -> Self {
fn get_complete_store(
store: JournalingKeyValueStore<DynamoDbStoreInternal>,
cache_size: usize,
) -> Self {
let store = MeteredStore::new("dynamo db internal".to_string(), store);
let store = ValueSplittingStore::new(store);
let store = MeteredStore::new("value splitting".to_string(), store);
Expand Down
86 changes: 55 additions & 31 deletions linera-views/src/metered_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@

//use std::time::{Duration, Instant};
//use linera_base::sync::Lazy;
use convert_case::{Case, Casing};
use prometheus::HistogramVec;
use prometheus::register_histogram_vec;
use crate::common::KeyValueStore;
use crate::{
batch::Batch,
common::{KeyValueStore, ReadableKeyValueStore, WritableKeyValueStore},
};
use async_trait::async_trait;
use std::future::Future;
use std::time::{Instant};
use crate::common::ReadableKeyValueStore;
use crate::common::WritableKeyValueStore;
use crate::batch::Batch;
use convert_case::{Case, Casing};
use prometheus::{register_histogram_vec, HistogramVec};
use std::{future::Future, time::Instant};
//use crate::journaling::DirectWritableKeyValueStore;

#[derive(Clone)]
Expand All @@ -29,7 +27,7 @@ struct MeteredCounter {
impl MeteredCounter {
pub fn new(name: String) -> Self {
// name can be "rocks db". Then var_name = "rocks_db" and title_name = "RocksDb"
let var_name = name.replace(" ", "_");
let var_name = name.replace(' ', "_");
let title_name = name.to_case(Case::Snake);

let read_value1 = format!("{}_read_value_bytes", var_name);
Expand All @@ -44,8 +42,9 @@ impl MeteredCounter {

let read_multi_values1 = format!("{}_read_multi_value_bytes", var_name);
let read_multi_values2 = format!("{} read multi value bytes", title_name);
let read_multi_values_bytes = register_histogram_vec!(read_multi_values1, read_multi_values2, &[])
.expect("Counter creation should not fail");
let read_multi_values_bytes =
register_histogram_vec!(read_multi_values1, read_multi_values2, &[])
.expect("Counter creation should not fail");

let find_keys1 = format!("{}_find_keys_by_prefix", var_name);
let find_keys2 = format!("{} find keys by prefix", title_name);
Expand All @@ -54,8 +53,9 @@ impl MeteredCounter {

let find_key_values1 = format!("{}_find_key_values_by_prefix", var_name);
let find_key_values2 = format!("{} find key values by prefix", title_name);
let find_key_values_by_prefix = register_histogram_vec!(find_key_values1, find_key_values2, &[])
.expect("Counter creation should not fail");
let find_key_values_by_prefix =
register_histogram_vec!(find_key_values1, find_key_values2, &[])
.expect("Counter creation should not fail");

let write_batch1 = format!("{}_write_batch", var_name);
let write_batch2 = format!("{} write batch", title_name);
Expand All @@ -67,7 +67,15 @@ impl MeteredCounter {
let clear_journal = register_histogram_vec!(clear_journal1, clear_journal2, &[])
.expect("Counter creation should not fail");

MeteredCounter { read_value_bytes, contains_key, read_multi_values_bytes, find_keys_by_prefix, find_key_values_by_prefix, write_batch, clear_journal }
MeteredCounter {
read_value_bytes,
contains_key,
read_multi_values_bytes,
find_keys_by_prefix,
find_key_values_by_prefix,
write_batch,
clear_journal,
}
}
}

Expand All @@ -92,7 +100,6 @@ where
out
}


#[async_trait]
impl<K, E> ReadableKeyValueStore<E> for MeteredStore<K>
where
Expand All @@ -107,29 +114,39 @@ where
}

async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, E> {
prometheus_async(self.store.read_value_bytes(key), &self.counter.read_value_bytes).await
prometheus_async(
self.store.read_value_bytes(key),
&self.counter.read_value_bytes,
)
.await
}

async fn contains_key(&self, key: &[u8]) -> Result<bool, E> {
prometheus_async(self.store.contains_key(key), &self.counter.contains_key).await
}

async fn read_multi_values_bytes(
&self,
keys: Vec<Vec<u8>>,
) -> Result<Vec<Option<Vec<u8>>>, E> {
prometheus_async(self.store.read_multi_values_bytes(keys), &self.counter.read_multi_values_bytes).await
async fn read_multi_values_bytes(&self, keys: Vec<Vec<u8>>) -> Result<Vec<Option<Vec<u8>>>, E> {
prometheus_async(
self.store.read_multi_values_bytes(keys),
&self.counter.read_multi_values_bytes,
)
.await
}

async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result<Self::Keys, E> {
prometheus_async(self.store.find_keys_by_prefix(key_prefix), &self.counter.find_keys_by_prefix).await
prometheus_async(
self.store.find_keys_by_prefix(key_prefix),
&self.counter.find_keys_by_prefix,
)
.await
}

async fn find_key_values_by_prefix(
&self,
key_prefix: &[u8],
) -> Result<Self::KeyValues, E> {
prometheus_async(self.store.find_key_values_by_prefix(key_prefix), &self.counter.find_key_values_by_prefix).await
async fn find_key_values_by_prefix(&self, key_prefix: &[u8]) -> Result<Self::KeyValues, E> {
prometheus_async(
self.store.find_key_values_by_prefix(key_prefix),
&self.counter.find_key_values_by_prefix,
)
.await
}
}

Expand All @@ -141,11 +158,19 @@ where
const MAX_VALUE_SIZE: usize = K::MAX_VALUE_SIZE;

async fn write_batch(&self, batch: Batch, base_key: &[u8]) -> Result<(), E> {
prometheus_async(self.store.write_batch(batch, base_key), &self.counter.write_batch).await
prometheus_async(
self.store.write_batch(batch, base_key),
&self.counter.write_batch,
)
.await
}

async fn clear_journal(&self, base_key: &[u8]) -> Result<(), E> {
prometheus_async(self.store.clear_journal(base_key), &self.counter.clear_journal).await
prometheus_async(
self.store.clear_journal(base_key),
&self.counter.clear_journal,
)
.await
}
}

Expand All @@ -163,4 +188,3 @@ impl<K> MeteredStore<K> {
Self { counter, store }
}
}

8 changes: 5 additions & 3 deletions linera-views/src/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,9 @@ impl KeyValueStore for RocksDbStoreInternal {
#[derive(Clone)]
pub struct RocksDbStore {
#[cfg(feature = "metrics")]
store: MeteredStore<LruCachingStore<MeteredStore<ValueSplittingStore<MeteredStore<RocksDbStoreInternal>>>>>,
store: MeteredStore<
LruCachingStore<MeteredStore<ValueSplittingStore<MeteredStore<RocksDbStoreInternal>>>>,
>,
#[cfg(not(feature = "metrics"))]
store: LruCachingStore<ValueSplittingStore<RocksDbStoreInternal>>,
}
Expand Down Expand Up @@ -314,9 +316,9 @@ impl RocksDbStore {
let store = MeteredStore::new("rocks db internal".to_string(), store);
let store = ValueSplittingStore::new(store);
let store = MeteredStore::new("value splitting".to_string(), store);
let store = LruCachingStore::new(store, cache_size);
let store = LruCachingStore::new(store, cache_size);
let store = MeteredStore::new("lru caching".to_string(), store);
Self { store }
Self { store }
}

/// Creates a RocksDB database from a specified path.
Expand Down
19 changes: 13 additions & 6 deletions linera-views/src/scylla_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,8 @@ impl ScyllaDbStoreInternal {
#[derive(Clone)]
pub struct ScyllaDbStore {
#[cfg(feature = "metrics")]
store: MeteredStore<LruCachingStore<MeteredStore<JournalingKeyValueStore<ScyllaDbStoreInternal>>>>,
store:
MeteredStore<LruCachingStore<MeteredStore<JournalingKeyValueStore<ScyllaDbStoreInternal>>>>,
#[cfg(not(feature = "metrics"))]
store: LruCachingStore<JournalingKeyValueStore<ScyllaDbStoreInternal>>,
}
Expand Down Expand Up @@ -767,16 +768,22 @@ impl ScyllaDbStore {
}

#[cfg(not(feature = "metrics"))]
fn get_complete_store(store: JournalingKeyValueStore<ScyllaDbStoreInternal>, cache_size: usize) -> Self {
let store = LruCachingStore::new(store, cache_size);
fn get_complete_store(
store: JournalingKeyValueStore<ScyllaDbStoreInternal>,
cache_size: usize,
) -> Self {
let store = LruCachingStore::new(store, cache_size);
Self { store }
}

#[cfg(feature = "metrics")]
fn get_complete_store(store: JournalingKeyValueStore<ScyllaDbStoreInternal>, cache_size: usize) -> Self {
let store = MeteredStore::new("scylla db internal".to_string(), store);
fn get_complete_store(
store: JournalingKeyValueStore<ScyllaDbStoreInternal>,
cache_size: usize,
) -> Self {
let store = MeteredStore::new("scylla db internal".to_string(), store);
let store = LruCachingStore::new(store, cache_size);
let store = MeteredStore::new("lru caching".to_string(), store);
let store = MeteredStore::new("lru caching".to_string(), store);
Self { store }
}

Expand Down

0 comments on commit a3e5810

Please sign in to comment.