From cc039d7b84115245ab6adff07653da0cb54860bf Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Mon, 2 Sep 2024 21:56:44 +0200 Subject: [PATCH] Remove the handling of `root_key` in `RocksDb` via separate instances. (#2441) * Remove the existing implementation of `root_key` in RocksDb and replace with a simpler more reliable one. --- linera-views/src/backends/rocks_db.rs | 148 ++++++++++---------------- 1 file changed, 57 insertions(+), 91 deletions(-) diff --git a/linera-views/src/backends/rocks_db.rs b/linera-views/src/backends/rocks_db.rs index fefdf9c8c1fc..1e8f0443e848 100644 --- a/linera-views/src/backends/rocks_db.rs +++ b/linera-views/src/backends/rocks_db.rs @@ -4,15 +4,14 @@ //! Implements [`crate::common::KeyValueStore`] for the RocksDB database. use std::{ - collections::{btree_map::Entry, BTreeMap}, ffi::OsString, ops::{Bound, Bound::Excluded}, path::PathBuf, - sync::{Arc, LazyLock, Mutex}, + sync::Arc, }; use futures::future::join_all; -use linera_base::{ensure, hex}; +use linera_base::ensure; use tempfile::TempDir; use thiserror::Error; @@ -50,8 +49,8 @@ pub type DB = rocksdb::DBWithThreadMode; #[derive(Clone)] struct RocksDbStoreInternal { db: Arc, - path_with_guard: PathWithGuard, - namespace: String, + _path_with_guard: PathWithGuard, + root_key: Vec, max_stream_queries: usize, cache_size: usize, } @@ -65,15 +64,6 @@ pub struct RocksDbStoreConfig { pub common_config: CommonStoreConfig, } -#[derive(Default)] -struct RocksDbStores { - stores: BTreeMap<(String, Vec), RocksDbStoreInternal>, -} - -/// The global variables of the RocksDB stores -static ROCKSDB_STORES: LazyLock> = - LazyLock::new(|| Mutex::new(RocksDbStores::default())); - impl RocksDbStoreInternal { fn check_namespace(namespace: &str) -> Result<(), RocksDbStoreError> { if !namespace @@ -87,56 +77,26 @@ impl RocksDbStoreInternal { fn build( path_with_guard: PathWithGuard, - namespace: &str, max_stream_queries: usize, cache_size: usize, root_key: &[u8], ) -> Result { - let mut full_path_buf = path_with_guard.path_buf.clone(); - full_path_buf.push(root_key_as_string(root_key)); - if !std::path::Path::exists(&full_path_buf) { - std::fs::create_dir(full_path_buf.clone())?; + let path = path_with_guard.path_buf.clone(); + if !std::path::Path::exists(&path) { + std::fs::create_dir(path.clone())?; } let mut options = rocksdb::Options::default(); options.create_if_missing(true); - let db = DB::open(&options, full_path_buf)?; - let namespace = namespace.to_string(); + let db = DB::open(&options, path)?; + let root_key = root_key.to_vec(); Ok(RocksDbStoreInternal { db: Arc::new(db), - path_with_guard, - namespace, + _path_with_guard: path_with_guard, + root_key, max_stream_queries, cache_size, }) } - - fn connect_from_path( - path_with_guard: PathWithGuard, - namespace: &str, - max_stream_queries: usize, - cache_size: usize, - root_key: &[u8], - ) -> Result { - let mut rocksdb_stores = ROCKSDB_STORES.lock().unwrap(); - let pair = (namespace.to_string(), root_key.to_vec()); - match rocksdb_stores.stores.entry(pair) { - Entry::Occupied(entry) => { - let entry = entry.into_mut(); - Ok(entry.clone()) - } - Entry::Vacant(entry) => { - let store = Self::build( - path_with_guard, - namespace, - max_stream_queries, - cache_size, - root_key, - )?; - entry.insert(store.clone()); - Ok(store) - } - } - } } impl WithError for RocksDbStoreInternal { @@ -155,17 +115,18 @@ impl ReadableKeyValueStore for RocksDbStoreInternal { async fn read_value_bytes(&self, key: &[u8]) -> Result>, RocksDbStoreError> { ensure!(key.len() <= MAX_KEY_SIZE, RocksDbStoreError::KeyTooLong); let client = self.clone(); - let key = key.to_vec(); - Ok(tokio::task::spawn_blocking(move || client.db.get(&key)).await??) + let mut full_key = self.root_key.to_vec(); + full_key.extend(key); + Ok(tokio::task::spawn_blocking(move || client.db.get(&full_key)).await??) } async fn contains_key(&self, key: &[u8]) -> Result { ensure!(key.len() <= MAX_KEY_SIZE, RocksDbStoreError::KeyTooLong); let client = self.clone(); - let key_may_exist = { - let key = key.to_vec(); - tokio::task::spawn_blocking(move || client.db.key_may_exist(key)).await? - }; + let mut full_key = self.root_key.to_vec(); + full_key.extend(key); + let key_may_exist = + tokio::task::spawn_blocking(move || client.db.key_may_exist(full_key)).await?; if !key_may_exist { return Ok(false); } @@ -176,10 +137,12 @@ impl ReadableKeyValueStore for RocksDbStoreInternal { let size = keys.len(); let mut results = vec![false; size]; let mut handles = Vec::new(); - for key in keys.clone() { + for key in &keys { ensure!(key.len() <= MAX_KEY_SIZE, RocksDbStoreError::KeyTooLong); + let mut full_key = self.root_key.to_vec(); + full_key.extend(key); let client = self.clone(); - let handle = tokio::task::spawn_blocking(move || client.db.key_may_exist(key)); + let handle = tokio::task::spawn_blocking(move || client.db.key_may_exist(full_key)); handles.push(handle); } let may_results: Vec<_> = join_all(handles) @@ -209,7 +172,15 @@ impl ReadableKeyValueStore for RocksDbStoreInternal { ensure!(key.len() <= MAX_KEY_SIZE, RocksDbStoreError::KeyTooLong); } let client = self.clone(); - let entries = tokio::task::spawn_blocking(move || client.db.multi_get(&keys)).await?; + let full_keys = keys + .into_iter() + .map(|key| { + let mut full_key = self.root_key.to_vec(); + full_key.extend(key); + full_key + }) + .collect::>(); + let entries = tokio::task::spawn_blocking(move || client.db.multi_get(&full_keys)).await?; Ok(entries.into_iter().collect::>()?) } @@ -222,8 +193,9 @@ impl ReadableKeyValueStore for RocksDbStoreInternal { RocksDbStoreError::KeyTooLong ); let client = self.clone(); - let prefix = key_prefix.to_vec(); - let len = key_prefix.len(); + let mut prefix = self.root_key.clone(); + prefix.extend(key_prefix); + let len = prefix.len(); let keys = tokio::task::spawn_blocking(move || { let mut iter = client.db.raw_iterator(); let mut keys = Vec::new(); @@ -252,8 +224,9 @@ impl ReadableKeyValueStore for RocksDbStoreInternal { RocksDbStoreError::KeyTooLong ); let client = self.clone(); - let prefix = key_prefix.to_vec(); - let len = key_prefix.len(); + let mut prefix = self.root_key.clone(); + prefix.extend(key_prefix); + let len = prefix.len(); let key_values = tokio::task::spawn_blocking(move || { let mut iter = client.db.raw_iterator(); let mut key_values = Vec::new(); @@ -292,9 +265,10 @@ impl WritableKeyValueStore for RocksDbStoreInternal { if let WriteOperation::DeletePrefix { key_prefix } = op { if get_upper_bound(key_prefix) == Bound::Unbounded { for short_key in self.find_keys_by_prefix(key_prefix).await? { - let mut key = key_prefix.clone(); - key.extend(short_key); - keys.push(key); + let mut full_key = self.root_key.clone(); + full_key.extend(key_prefix); + full_key.extend(short_key); + keys.push(full_key); } } } @@ -302,17 +276,22 @@ impl WritableKeyValueStore for RocksDbStoreInternal { for key in keys { batch.operations.push(WriteOperation::Delete { key }); } + let root_key = self.root_key.to_vec(); tokio::task::spawn_blocking(move || -> Result<(), RocksDbStoreError> { let mut inner_batch = rocksdb::WriteBatchWithTransaction::default(); for operation in batch.operations { match operation { WriteOperation::Delete { key } => { ensure!(key.len() <= MAX_KEY_SIZE, RocksDbStoreError::KeyTooLong); - inner_batch.delete(&key) + let mut full_key = root_key.clone(); + full_key.extend(key); + inner_batch.delete(&full_key) } WriteOperation::Put { key, value } => { ensure!(key.len() <= MAX_KEY_SIZE, RocksDbStoreError::KeyTooLong); - inner_batch.put(&key, value) + let mut full_key = root_key.clone(); + full_key.extend(key); + inner_batch.put(&full_key, value) } WriteOperation::DeletePrefix { key_prefix } => { ensure!( @@ -320,7 +299,11 @@ impl WritableKeyValueStore for RocksDbStoreInternal { RocksDbStoreError::KeyTooLong ); if let Excluded(upper_bound) = get_upper_bound(&key_prefix) { - inner_batch.delete_range(&key_prefix, &upper_bound); + let mut full_key1 = root_key.clone(); + full_key1.extend(key_prefix); + let mut full_key2 = root_key.clone(); + full_key2.extend(upper_bound); + inner_batch.delete_range(&full_key1, &full_key2); } } } @@ -337,10 +320,6 @@ impl WritableKeyValueStore for RocksDbStoreInternal { } } -fn root_key_as_string(root_key: &[u8]) -> String { - format!("ROOT_KEY_{}", hex::encode(root_key)) -} - impl AdminKeyValueStore for RocksDbStoreInternal { type Config = RocksDbStoreConfig; @@ -365,26 +344,13 @@ impl AdminKeyValueStore for RocksDbStoreInternal { path_with_guard.path_buf = path_buf; let max_stream_queries = config.common_config.max_stream_queries; let cache_size = config.common_config.cache_size; - RocksDbStoreInternal::connect_from_path( - path_with_guard, - namespace, - max_stream_queries, - cache_size, - root_key, - ) + RocksDbStoreInternal::build(path_with_guard, max_stream_queries, cache_size, root_key) } fn clone_with_root_key(&self, root_key: &[u8]) -> Result { - let path_with_guard = self.path_with_guard.clone(); - let max_stream_queries = self.max_stream_queries; - let cache_size = self.cache_size; - RocksDbStoreInternal::connect_from_path( - path_with_guard, - &self.namespace, - max_stream_queries, - cache_size, - root_key, - ) + let mut store = self.clone(); + store.root_key = root_key.to_vec(); + Ok(store) } async fn list_all(config: &Self::Config) -> Result, RocksDbStoreError> {