Skip to content

Commit

Permalink
Make storage configs clonable and serializable (#3400)
Browse files Browse the repository at this point in the history
## Motivation

We'd like to simplify config management from databases.

## Proposal

The first step is to make storage config objects serializable.

## Test Plan

CI
  • Loading branch information
ma2bd authored Feb 24, 2025
1 parent 55b9e8d commit 2646915
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 50 deletions.
7 changes: 4 additions & 3 deletions linera-client/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ use linera_storage_service::{
common::{ServiceStoreConfig, ServiceStoreInternalConfig},
};
#[cfg(feature = "dynamodb")]
use linera_views::dynamo_db::{get_config, DynamoDbStore, DynamoDbStoreConfig};
use linera_views::dynamo_db::{DynamoDbStore, DynamoDbStoreConfig};
#[cfg(with_storage)]
use linera_views::store::LocalAdminKeyValueStore as _;
use linera_views::{
memory::{MemoryStore, MemoryStoreConfig},
store::CommonStoreConfig,
views::ViewError,
};
use serde::{Deserialize, Serialize};
use tracing::error;
#[cfg(feature = "rocksdb")]
use {
Expand Down Expand Up @@ -63,6 +64,7 @@ util::impl_from_dynamic!(Error:Backend, linera_views::dynamo_db::DynamoDbStoreEr
util::impl_from_dynamic!(Error:Backend, linera_views::scylla_db::ScyllaDbStoreError);

/// The configuration of the key value store in use.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum StoreConfig {
/// The storage service key-value store
#[cfg(feature = "storage-service")]
Expand Down Expand Up @@ -366,8 +368,7 @@ impl StorageConfigNamespace {
}
#[cfg(feature = "dynamodb")]
StorageConfig::DynamoDb { use_localstack } => {
let aws_config = get_config(*use_localstack).await?;
let config = DynamoDbStoreConfig::new(aws_config, common_config);
let config = DynamoDbStoreConfig::new(*use_localstack, common_config);
Ok(StoreConfig::DynamoDb(config, namespace))
}
#[cfg(feature = "scylladb")]
Expand Down
3 changes: 2 additions & 1 deletion linera-storage-service/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use linera_views::{
lru_caching::LruCachingConfig,
store::{CommonStoreInternalConfig, KeyValueStoreError},
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tonic::Status;

Expand Down Expand Up @@ -70,7 +71,7 @@ pub fn storage_service_test_endpoint() -> Result<String, ServiceStoreError> {
Ok(std::env::var("LINERA_STORAGE_SERVICE")?)
}

#[derive(Debug, Clone)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ServiceStoreInternalConfig {
/// The endpoint used by the shared store
pub endpoint: String,
Expand Down
3 changes: 2 additions & 1 deletion linera-views/src/backends/dual.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

//! Implements [`crate::store::KeyValueStore`] by combining two existing stores.
use serde::{Deserialize, Serialize};
use thiserror::Error;

#[cfg(with_testing)]
Expand All @@ -16,7 +17,7 @@ use crate::{
};

/// The initial configuration of the system.
#[derive(Debug)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DualStoreConfig<C1, C2> {
/// The first config.
pub first_config: C1,
Expand Down
61 changes: 26 additions & 35 deletions linera-views/src/backends/dynamo_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use aws_sdk_dynamodb::{
use aws_smithy_types::error::operation::BuildError;
use futures::future::{join_all, FutureExt as _};
use linera_base::ensure;
use serde::{Deserialize, Serialize};
use thiserror::Error;

#[cfg(with_metrics)]
Expand All @@ -57,9 +58,6 @@ use crate::{
/// Name of the environment variable with the address to a LocalStack instance.
const LOCALSTACK_ENDPOINT: &str = "LOCALSTACK_ENDPOINT";

/// The configuration to connect to DynamoDB.
pub type Config = aws_sdk_dynamodb::Config;

/// Gets the AWS configuration from the environment
async fn get_base_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError> {
let base_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest())
Expand Down Expand Up @@ -88,17 +86,6 @@ async fn get_localstack_config() -> Result<aws_sdk_dynamodb::Config, DynamoDbSto
Ok(config)
}

/// Getting a configuration for the system
async fn get_config_internal(
use_localstack: bool,
) -> Result<aws_sdk_dynamodb::Config, DynamoDbStoreInternalError> {
if use_localstack {
get_localstack_config().await
} else {
get_base_config().await
}
}

/// DynamoDB forbids the iteration over the partition keys.
/// Therefore we use a special partition key named `[1]` for storing
/// the root keys. For normal root keys, we simply put a `[0]` in
Expand Down Expand Up @@ -334,15 +321,26 @@ pub struct DynamoDbStoreInternal {
root_key_written: Arc<AtomicBool>,
}

/// The initial configuration of the system
#[derive(Debug)]
/// The initial configuration of the system.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DynamoDbStoreInternalConfig {
/// The AWS configuration
config: aws_sdk_dynamodb::Config,
/// Whether to use local stack or not.
use_localstack: bool,
/// The common configuration of the key value store
common_config: CommonStoreInternalConfig,
}

impl DynamoDbStoreInternalConfig {
async fn client(&self) -> Result<Client, DynamoDbStoreInternalError> {
let config = if self.use_localstack {
get_localstack_config().await?
} else {
get_base_config().await?
};
Ok(Client::from_conf(config))
}
}

impl AdminKeyValueStore for DynamoDbStoreInternal {
type Config = DynamoDbStoreInternalConfig;

Expand All @@ -356,7 +354,7 @@ impl AdminKeyValueStore for DynamoDbStoreInternal {
root_key: &[u8],
) -> Result<Self, DynamoDbStoreInternalError> {
Self::check_namespace(namespace)?;
let client = Client::from_conf(config.config.clone());
let client = config.client().await?;
let semaphore = config
.common_config
.max_concurrent_queries
Expand Down Expand Up @@ -392,7 +390,7 @@ impl AdminKeyValueStore for DynamoDbStoreInternal {
}

async fn list_all(config: &Self::Config) -> Result<Vec<String>, DynamoDbStoreInternalError> {
let client = Client::from_conf(config.config.clone());
let client = config.client().await?;
let mut namespaces = Vec::new();
let mut start_table = None;
loop {
Expand Down Expand Up @@ -432,7 +430,7 @@ impl AdminKeyValueStore for DynamoDbStoreInternal {
}

async fn delete_all(config: &Self::Config) -> Result<(), DynamoDbStoreInternalError> {
let client = Client::from_conf(config.config.clone());
let client = config.client().await?;
let tables = Self::list_all(config).await?;
for table in tables {
client
Expand All @@ -450,7 +448,7 @@ impl AdminKeyValueStore for DynamoDbStoreInternal {
namespace: &str,
) -> Result<bool, DynamoDbStoreInternalError> {
Self::check_namespace(namespace)?;
let client = Client::from_conf(config.config.clone());
let client = config.client().await?;
let key_db = build_key(EMPTY_ROOT_KEY, DB_KEY.to_vec());
let response = client
.get_item()
Expand Down Expand Up @@ -484,8 +482,8 @@ impl AdminKeyValueStore for DynamoDbStoreInternal {
namespace: &str,
) -> Result<(), DynamoDbStoreInternalError> {
Self::check_namespace(namespace)?;
let client = Client::from_conf(config.config.clone());
let _result = client
let client = config.client().await?;
client
.create_table()
.table_name(namespace)
.attribute_definitions(
Expand Down Expand Up @@ -529,7 +527,7 @@ impl AdminKeyValueStore for DynamoDbStoreInternal {
namespace: &str,
) -> Result<(), DynamoDbStoreInternalError> {
Self::check_namespace(namespace)?;
let client = Client::from_conf(config.config.clone());
let client = config.client().await?;
client
.delete_table()
.table_name(namespace)
Expand Down Expand Up @@ -1166,10 +1164,8 @@ impl TestKeyValueStore for JournalingKeyValueStore<DynamoDbStoreInternal> {
max_concurrent_queries: Some(TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES),
max_stream_queries: TEST_DYNAMO_DB_MAX_STREAM_QUERIES,
};
let use_localstack = true;
let config = get_config_internal(use_localstack).await?;
Ok(DynamoDbStoreInternalConfig {
config,
use_localstack: true,
common_config,
})
}
Expand All @@ -1196,19 +1192,14 @@ pub type DynamoDbStoreError = ValueSplittingError<DynamoDbStoreInternalError>;
/// The config type for [`DynamoDbStore`]`
pub type DynamoDbStoreConfig = LruCachingConfig<DynamoDbStoreInternalConfig>;

/// Getting a configuration for the system
pub async fn get_config(use_localstack: bool) -> Result<Config, DynamoDbStoreError> {
Ok(get_config_internal(use_localstack).await?)
}

impl DynamoDbStoreConfig {
/// Creates a `DynamoDbStoreConfig` from the input.
pub fn new(
config: Config,
use_localstack: bool,
common_config: crate::store::CommonStoreConfig,
) -> DynamoDbStoreConfig {
let inner_config = DynamoDbStoreInternalConfig {
config,
use_localstack,
common_config: common_config.reduced(),
};
DynamoDbStoreConfig {
Expand Down
3 changes: 2 additions & 1 deletion linera-views/src/backends/indexed_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::rc::Rc;

use futures::future;
use indexed_db_futures::{js_sys, prelude::*, web_sys};
use serde::{Deserialize, Serialize};
use thiserror::Error;

use crate::{
Expand All @@ -19,7 +20,7 @@ use crate::{
};

/// The initial configuration of the system
#[derive(Debug)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexedDbStoreConfig {
/// The common configuration of the key value store
pub common_config: CommonStoreConfig,
Expand Down
2 changes: 2 additions & 0 deletions linera-views/src/backends/lru_caching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::{
};

use linked_hash_map::LinkedHashMap;
use serde::{Deserialize, Serialize};
#[cfg(with_metrics)]
use {linera_base::prometheus_util::register_int_counter_vec, prometheus::IntCounterVec};

Expand Down Expand Up @@ -275,6 +276,7 @@ where
}

/// The configuration type for the `LruCachingStore`.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LruCachingConfig<C> {
/// The inner configuration of the `LruCachingStore`.
pub inner_config: C,
Expand Down
3 changes: 2 additions & 1 deletion linera-views/src/backends/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
sync::{Arc, LazyLock, Mutex, RwLock},
};

use serde::{Deserialize, Serialize};
use thiserror::Error;

#[cfg(with_testing)]
Expand All @@ -24,7 +25,7 @@ use crate::{
};

/// The initial configuration of the system
#[derive(Debug)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct MemoryStoreConfig {
/// The common configuration of the key value store
pub common_config: CommonStoreInternalConfig,
Expand Down
10 changes: 6 additions & 4 deletions linera-views/src/backends/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::{
};

use linera_base::ensure;
use serde::{Deserialize, Serialize};
use tempfile::TempDir;
use thiserror::Error;

Expand Down Expand Up @@ -59,7 +60,7 @@ type DB = rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>;
/// One way to select that is to select BlockInPlace when
/// `tokio::runtime::Handle::current().metrics().num_workers() > 1`
/// `BlockInPlace` is documented in <https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html>
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub enum RocksDbSpawnMode {
/// This uses the `spawn_blocking` function of Tokio.
SpawnBlocking,
Expand Down Expand Up @@ -253,7 +254,7 @@ pub struct RocksDbStoreInternal {
}

/// The initial configuration of the system
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct RocksDbStoreInternalConfig {
/// The path to the storage containing the namespaces
path_with_guard: PathWithGuard,
Expand Down Expand Up @@ -577,11 +578,12 @@ pub enum RocksDbStoreInternalError {
}

/// A path and the guard for the temporary directory if needed
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct PathWithGuard {
/// The path to the data
pub path_buf: PathBuf,
/// The guard for the directory if one is needed
#[serde(skip)]
_dir: Option<Arc<TempDir>>,
}

Expand All @@ -596,7 +598,7 @@ impl PathWithGuard {

/// Returns the test path for RocksDB without common config.
#[cfg(with_testing)]
pub fn new_testing() -> PathWithGuard {
fn new_testing() -> PathWithGuard {
let dir = TempDir::new().unwrap();
let path_buf = dir.path().to_path_buf();
let _dir = Some(Arc::new(dir));
Expand Down
3 changes: 2 additions & 1 deletion linera-views/src/backends/scylla_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use scylla::{
transport::errors::{DbError, QueryError},
Session, SessionBuilder,
};
use serde::{Deserialize, Serialize};
use thiserror::Error;

#[cfg(with_metrics)]
Expand Down Expand Up @@ -625,7 +626,7 @@ fn get_big_root_key(root_key: &[u8]) -> Vec<u8> {
}

/// The type for building a new ScyllaDB Key Value Store
#[derive(Debug)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ScyllaDbStoreInternalConfig {
/// The URL to which the requests have to be sent
pub uri: String,
Expand Down
6 changes: 3 additions & 3 deletions linera-views/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
use std::{fmt::Debug, future::Future};

use serde::de::DeserializeOwned;
use serde::{de::DeserializeOwned, Deserialize, Serialize};

#[cfg(with_testing)]
use crate::random::generate_test_namespace;
use crate::{batch::Batch, common::from_bytes_option, views::ViewError};

/// The common initialization parameters for the `KeyValueStore`
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommonStoreInternalConfig {
/// The number of concurrent to a database
pub max_concurrent_queries: Option<usize>,
Expand All @@ -21,7 +21,7 @@ pub struct CommonStoreInternalConfig {
}

/// The common initialization parameters for the `KeyValueStore`
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommonStoreConfig {
/// The number of concurrent to a database
pub max_concurrent_queries: Option<usize>,
Expand Down

1 comment on commit 2646915

@fandile

This comment was marked as spam.

Please sign in to comment.