From b986e7dd9f0f3cadc2b9fc7ca406fd1530da4e86 Mon Sep 17 00:00:00 2001 From: Mathieu Baudet <1105398+ma2bd@users.noreply.github.com> Date: Mon, 23 Sep 2024 23:40:41 +0800 Subject: [PATCH] Allow combining different stores (#2489) * Create skeleton of a combinator to use different stores based on the root key. * Implement the remaining functions --- linera-storage/src/db_storage.rs | 15 + linera-storage/src/lib.rs | 2 +- linera-views/src/backends/dual.rs | 496 ++++++++++++++++++++++++++++++ linera-views/src/backends/mod.rs | 2 + 4 files changed, 514 insertions(+), 1 deletion(-) create mode 100644 linera-views/src/backends/dual.rs diff --git a/linera-storage/src/db_storage.rs b/linera-storage/src/db_storage.rs index fc6367858413..fc6cfe755c39 100644 --- a/linera-storage/src/db_storage.rs +++ b/linera-storage/src/db_storage.rs @@ -21,6 +21,7 @@ use linera_execution::{ WasmRuntime, }; use linera_views::{ + backends::dual::{DualStoreRootKeyAssignment, StoreInUse}, batch::Batch, common::KeyValueStore, context::ViewContext, @@ -227,6 +228,20 @@ enum BaseKey { BlobState(BlobId), } +/// An implementation of [`DualStoreRootKeyAssignment`] that stores the +/// chain states into the first store. +pub struct ChainStatesFirstAssignment; + +impl DualStoreRootKeyAssignment for ChainStatesFirstAssignment { + fn assigned_store(root_key: &[u8]) -> Result { + let store = match bcs::from_bytes(root_key)? { + BaseKey::ChainState(_) => StoreInUse::First, + _ => StoreInUse::Second, + }; + Ok(store) + } +} + /// A `Clock` implementation using the system clock. #[derive(Clone)] pub struct WallClock; diff --git a/linera-storage/src/lib.rs b/linera-storage/src/lib.rs index d6a24bd48d49..f35a0270a3bb 100644 --- a/linera-storage/src/lib.rs +++ b/linera-storage/src/lib.rs @@ -37,7 +37,7 @@ use linera_views::{ #[cfg(with_testing)] pub use crate::db_storage::TestClock; -pub use crate::db_storage::{DbStorage, WallClock}; +pub use crate::db_storage::{ChainStatesFirstAssignment, DbStorage, WallClock}; #[cfg(with_metrics)] pub use crate::db_storage::{ READ_CERTIFICATE_COUNTER, READ_HASHED_CERTIFICATE_VALUE_COUNTER, WRITE_CERTIFICATE_COUNTER, diff --git a/linera-views/src/backends/dual.rs b/linera-views/src/backends/dual.rs new file mode 100644 index 000000000000..5aeed66e52e0 --- /dev/null +++ b/linera-views/src/backends/dual.rs @@ -0,0 +1,496 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//! Implements [`crate::common::KeyValueStore`] by combining two existing stores. + +use thiserror::Error; + +use crate::{ + batch::Batch, + common::{ + AdminKeyValueStore, KeyIterable, KeyValueIterable, KeyValueStoreError, + ReadableKeyValueStore, WithError, WritableKeyValueStore, + }, +}; + +/// The initial configuration of the system. +#[derive(Debug)] +pub struct DualStoreConfig { + /// The first config. + pub first_config: C1, + /// The second config. + pub second_config: C2, +} + +/// The store in use. +#[derive(Debug, Clone, Copy)] +pub enum StoreInUse { + /// The first store. + First, + /// The second store. + Second, +} + +/// The trait for a (static) root key assignement. +pub trait DualStoreRootKeyAssignment { + /// Obtains the store assigned to this root key. + fn assigned_store(root_key: &[u8]) -> Result; +} + +/// A store made of two existing stores. +#[derive(Clone)] +pub struct DualStore { + /// The first underlying store. + first_store: S1, + /// The second underlying store. + second_store: S2, + /// Which store is currently in use given the root key. (The root key in the other store will be set arbitrarily.) + store_in_use: StoreInUse, + /// Marker for the static root key assignement. + _marker: std::marker::PhantomData, +} + +impl WithError for DualStore +where + S1: WithError, + S2: WithError, +{ + type Error = DualStoreError; +} + +impl ReadableKeyValueStore for DualStore +where + S1: ReadableKeyValueStore + Send + Sync, + S2: ReadableKeyValueStore + Send + Sync, + A: Send + Sync, +{ + // TODO(#2524): consider changing MAX_KEY_SIZE into a function. + const MAX_KEY_SIZE: usize = if S1::MAX_KEY_SIZE < S2::MAX_KEY_SIZE { + S1::MAX_KEY_SIZE + } else { + S2::MAX_KEY_SIZE + }; + + type Keys = DualStoreKeys; + type KeyValues = DualStoreKeyValues; + + fn max_stream_queries(&self) -> usize { + match self.store_in_use { + StoreInUse::First => self.first_store.max_stream_queries(), + StoreInUse::Second => self.second_store.max_stream_queries(), + } + } + + async fn read_value_bytes(&self, key: &[u8]) -> Result>, Self::Error> { + let result = match self.store_in_use { + StoreInUse::First => self + .first_store + .read_value_bytes(key) + .await + .map_err(DualStoreError::First)?, + StoreInUse::Second => self + .second_store + .read_value_bytes(key) + .await + .map_err(DualStoreError::Second)?, + }; + Ok(result) + } + + async fn contains_key(&self, key: &[u8]) -> Result { + let result = match self.store_in_use { + StoreInUse::First => self + .first_store + .contains_key(key) + .await + .map_err(DualStoreError::First)?, + StoreInUse::Second => self + .second_store + .contains_key(key) + .await + .map_err(DualStoreError::Second)?, + }; + Ok(result) + } + + async fn contains_keys(&self, keys: Vec>) -> Result, Self::Error> { + let result = match self.store_in_use { + StoreInUse::First => self + .first_store + .contains_keys(keys) + .await + .map_err(DualStoreError::First)?, + StoreInUse::Second => self + .second_store + .contains_keys(keys) + .await + .map_err(DualStoreError::Second)?, + }; + Ok(result) + } + + async fn read_multi_values_bytes( + &self, + keys: Vec>, + ) -> Result>>, Self::Error> { + let result = match self.store_in_use { + StoreInUse::First => self + .first_store + .read_multi_values_bytes(keys) + .await + .map_err(DualStoreError::First)?, + StoreInUse::Second => self + .second_store + .read_multi_values_bytes(keys) + .await + .map_err(DualStoreError::Second)?, + }; + Ok(result) + } + + async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result { + let result = match self.store_in_use { + StoreInUse::First => DualStoreKeys::First( + self.first_store + .find_keys_by_prefix(key_prefix) + .await + .map_err(DualStoreError::First)?, + ), + StoreInUse::Second => DualStoreKeys::Second( + self.second_store + .find_keys_by_prefix(key_prefix) + .await + .map_err(DualStoreError::Second)?, + ), + }; + Ok(result) + } + + async fn find_key_values_by_prefix( + &self, + key_prefix: &[u8], + ) -> Result { + let result = match self.store_in_use { + StoreInUse::First => DualStoreKeyValues::First( + self.first_store + .find_key_values_by_prefix(key_prefix) + .await + .map_err(DualStoreError::First)?, + ), + StoreInUse::Second => DualStoreKeyValues::Second( + self.second_store + .find_key_values_by_prefix(key_prefix) + .await + .map_err(DualStoreError::Second)?, + ), + }; + Ok(result) + } +} + +impl WritableKeyValueStore for DualStore +where + S1: WritableKeyValueStore + WithError + Send + Sync, + S2: WritableKeyValueStore + WithError + Send + Sync, + A: Send + Sync, +{ + const MAX_VALUE_SIZE: usize = usize::MAX; + + async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> { + match self.store_in_use { + StoreInUse::First => self + .first_store + .write_batch(batch) + .await + .map_err(DualStoreError::First)?, + StoreInUse::Second => self + .second_store + .write_batch(batch) + .await + .map_err(DualStoreError::Second)?, + } + Ok(()) + } + + async fn clear_journal(&self) -> Result<(), Self::Error> { + match self.store_in_use { + StoreInUse::First => self + .first_store + .clear_journal() + .await + .map_err(DualStoreError::First)?, + StoreInUse::Second => self + .second_store + .clear_journal() + .await + .map_err(DualStoreError::Second)?, + } + Ok(()) + } +} + +impl AdminKeyValueStore for DualStore +where + S1: AdminKeyValueStore + Send + Sync, + S2: AdminKeyValueStore + Send + Sync, + A: DualStoreRootKeyAssignment + Send + Sync, +{ + type Config = DualStoreConfig; + + async fn new_test_config() -> Result { + let first_config = S1::new_test_config().await.map_err(DualStoreError::First)?; + let second_config = S2::new_test_config() + .await + .map_err(DualStoreError::Second)?; + Ok(DualStoreConfig { + first_config, + second_config, + }) + } + + async fn connect( + config: &Self::Config, + namespace: &str, + root_key: &[u8], + ) -> Result { + let first_store = S1::connect(&config.first_config, namespace, root_key) + .await + .map_err(DualStoreError::First)?; + let second_store = S2::connect(&config.second_config, namespace, root_key) + .await + .map_err(DualStoreError::Second)?; + let store_in_use = A::assigned_store(root_key)?; + Ok(Self { + first_store, + second_store, + store_in_use, + _marker: std::marker::PhantomData, + }) + } + + fn clone_with_root_key(&self, root_key: &[u8]) -> Result { + let first_store = self + .first_store + .clone_with_root_key(root_key) + .map_err(DualStoreError::First)?; + let second_store = self + .second_store + .clone_with_root_key(root_key) + .map_err(DualStoreError::Second)?; + let store_in_use = A::assigned_store(root_key)?; + Ok(Self { + first_store, + second_store, + store_in_use, + _marker: std::marker::PhantomData, + }) + } + + async fn list_all(config: &Self::Config) -> Result, Self::Error> { + let namespaces1 = S1::list_all(&config.first_config) + .await + .map_err(DualStoreError::First)?; + let mut namespaces = Vec::new(); + for namespace in namespaces1 { + if S2::exists(&config.second_config, &namespace) + .await + .map_err(DualStoreError::Second)? + { + namespaces.push(namespace); + } else { + tracing::warn!("Namespace {} only exists in the first store", namespace); + } + } + Ok(namespaces) + } + + async fn exists(config: &Self::Config, namespace: &str) -> Result { + Ok(S1::exists(&config.first_config, namespace) + .await + .map_err(DualStoreError::First)? + && S2::exists(&config.second_config, namespace) + .await + .map_err(DualStoreError::Second)?) + } + + async fn create(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> { + S1::create(&config.first_config, namespace) + .await + .map_err(DualStoreError::First)?; + S2::create(&config.second_config, namespace) + .await + .map_err(DualStoreError::Second)?; + Ok(()) + } + + async fn delete(config: &Self::Config, namespace: &str) -> Result<(), Self::Error> { + S1::delete(&config.first_config, namespace) + .await + .map_err(DualStoreError::First)?; + S2::delete(&config.second_config, namespace) + .await + .map_err(DualStoreError::Second)?; + Ok(()) + } +} + +/// The error type for [`DualStore`]. +#[derive(Error, Debug)] +pub enum DualStoreError { + /// Serialization error with BCS. + #[error("BCS error: {0}")] + Bcs(#[from] bcs::Error), + + /// First store. + #[error("Error in first store: {0}")] + First(E1), + + /// Second store. + #[error("Error in second store: {0}")] + Second(E2), +} + +impl KeyValueStoreError for DualStoreError +where + E1: KeyValueStoreError, + E2: KeyValueStoreError, +{ + const BACKEND: &'static str = "dual_store"; +} + +/// A set of keys returned by [`DualStore::find_keys_by_prefix`]. +pub enum DualStoreKeys { + /// A set of keys from the first store. + First(K1), + /// A set of Keys from the second store. + Second(K2), +} + +/// An iterator over the keys in [`DualStoreKeys`]. +pub enum DualStoreKeyIterator { + /// Iterating over keys from the first store. + First(I1), + /// Iterating over keys from the second store. + Second(I2), +} + +/// A set of key-values returned by [`DualStore::find_key_values_by_prefix`]. +pub enum DualStoreKeyValues { + /// A set of key-values from the first store. + First(K1), + /// A set of key-values from the second store. + Second(K2), +} + +/// An iterator over the key-values in [`DualStoreKeyValues`]. +pub enum DualStoreKeyValueIterator { + /// Iterating over key-values from the first store. + First(I1), + /// Iterating over key-values from the second store. + Second(I2), +} + +/// An owning iterator over the key-values in [`DualStoreKeyValues`]. +pub enum DualStoreKeyValueIteratorOwned { + /// Iterating over key-values from the first store. + First(I1), + /// Iterating over key-values from the second store. + Second(I2), +} + +impl KeyIterable> for DualStoreKeys +where + K1: KeyIterable, + K2: KeyIterable, +{ + type Iterator<'a> = DualStoreKeyIterator, K2::Iterator<'a>> where K1: 'a, K2: 'a; + + fn iterator(&self) -> Self::Iterator<'_> { + match self { + Self::First(keys) => DualStoreKeyIterator::First(keys.iterator()), + Self::Second(keys) => DualStoreKeyIterator::Second(keys.iterator()), + } + } +} + +impl<'a, I1, I2, E1, E2> Iterator for DualStoreKeyIterator +where + I1: Iterator>, + I2: Iterator>, +{ + type Item = Result<&'a [u8], DualStoreError>; + + fn next(&mut self) -> Option { + match self { + Self::First(iter) => iter + .next() + .map(|result| result.map_err(DualStoreError::First)), + Self::Second(iter) => iter + .next() + .map(|result| result.map_err(DualStoreError::Second)), + } + } +} + +impl KeyValueIterable> for DualStoreKeyValues +where + K1: KeyValueIterable, + K2: KeyValueIterable, +{ + type Iterator<'a> = DualStoreKeyValueIterator, K2::Iterator<'a>> where K1: 'a, K2: 'a; + type IteratorOwned = DualStoreKeyValueIteratorOwned; + + fn iterator(&self) -> Self::Iterator<'_> { + match self { + Self::First(keys) => DualStoreKeyValueIterator::First(keys.iterator()), + Self::Second(keys) => DualStoreKeyValueIterator::Second(keys.iterator()), + } + } + + fn into_iterator_owned(self) -> Self::IteratorOwned { + match self { + Self::First(keys) => DualStoreKeyValueIteratorOwned::First(keys.into_iterator_owned()), + Self::Second(keys) => { + DualStoreKeyValueIteratorOwned::Second(keys.into_iterator_owned()) + } + } + } +} + +impl<'a, I1, I2, E1, E2> Iterator for DualStoreKeyValueIterator +where + I1: Iterator>, + I2: Iterator>, +{ + type Item = Result<(&'a [u8], &'a [u8]), DualStoreError>; + + fn next(&mut self) -> Option { + match self { + Self::First(iter) => iter + .next() + .map(|result| result.map_err(DualStoreError::First)), + Self::Second(iter) => iter + .next() + .map(|result| result.map_err(DualStoreError::Second)), + } + } +} + +impl Iterator for DualStoreKeyValueIteratorOwned +where + I1: Iterator, Vec), E1>>, + I2: Iterator, Vec), E2>>, +{ + type Item = Result<(Vec, Vec), DualStoreError>; + + fn next(&mut self) -> Option { + match self { + Self::First(iter) => iter + .next() + .map(|result| result.map_err(DualStoreError::First)), + Self::Second(iter) => iter + .next() + .map(|result| result.map_err(DualStoreError::Second)), + } + } +} diff --git a/linera-views/src/backends/mod.rs b/linera-views/src/backends/mod.rs index 55a19481b5b3..02b3a31b25b7 100644 --- a/linera-views/src/backends/mod.rs +++ b/linera-views/src/backends/mod.rs @@ -12,6 +12,8 @@ pub mod memory; pub mod lru_caching; +pub mod dual; + #[cfg(with_scylladb)] pub mod scylla_db;