Skip to content

Commit

Permalink
Remove the handling of root_key in RocksDb via separate instances. (
Browse files Browse the repository at this point in the history
#2441)

* Remove the existing implementation of `root_key` in RocksDb and replace with a simpler more reliable one.
  • Loading branch information
MathieuDutSik authored Sep 2, 2024
1 parent 761bb2e commit cc039d7
Showing 1 changed file with 57 additions and 91 deletions.
148 changes: 57 additions & 91 deletions linera-views/src/backends/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
//! Implements [`crate::common::KeyValueStore`] for the RocksDB database.
use std::{
collections::{btree_map::Entry, BTreeMap},
ffi::OsString,
ops::{Bound, Bound::Excluded},
path::PathBuf,
sync::{Arc, LazyLock, Mutex},
sync::Arc,
};

use futures::future::join_all;
use linera_base::{ensure, hex};
use linera_base::ensure;
use tempfile::TempDir;
use thiserror::Error;

Expand Down Expand Up @@ -50,8 +49,8 @@ pub type DB = rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>;
#[derive(Clone)]
struct RocksDbStoreInternal {
db: Arc<DB>,
path_with_guard: PathWithGuard,
namespace: String,
_path_with_guard: PathWithGuard,
root_key: Vec<u8>,
max_stream_queries: usize,
cache_size: usize,
}
Expand All @@ -65,15 +64,6 @@ pub struct RocksDbStoreConfig {
pub common_config: CommonStoreConfig,
}

#[derive(Default)]
struct RocksDbStores {
stores: BTreeMap<(String, Vec<u8>), RocksDbStoreInternal>,
}

/// The global variables of the RocksDB stores
static ROCKSDB_STORES: LazyLock<Mutex<RocksDbStores>> =
LazyLock::new(|| Mutex::new(RocksDbStores::default()));

impl RocksDbStoreInternal {
fn check_namespace(namespace: &str) -> Result<(), RocksDbStoreError> {
if !namespace
Expand All @@ -87,56 +77,26 @@ impl RocksDbStoreInternal {

fn build(
path_with_guard: PathWithGuard,
namespace: &str,
max_stream_queries: usize,
cache_size: usize,
root_key: &[u8],
) -> Result<RocksDbStoreInternal, RocksDbStoreError> {
let mut full_path_buf = path_with_guard.path_buf.clone();
full_path_buf.push(root_key_as_string(root_key));
if !std::path::Path::exists(&full_path_buf) {
std::fs::create_dir(full_path_buf.clone())?;
let path = path_with_guard.path_buf.clone();
if !std::path::Path::exists(&path) {
std::fs::create_dir(path.clone())?;
}
let mut options = rocksdb::Options::default();
options.create_if_missing(true);
let db = DB::open(&options, full_path_buf)?;
let namespace = namespace.to_string();
let db = DB::open(&options, path)?;
let root_key = root_key.to_vec();
Ok(RocksDbStoreInternal {
db: Arc::new(db),
path_with_guard,
namespace,
_path_with_guard: path_with_guard,
root_key,
max_stream_queries,
cache_size,
})
}

fn connect_from_path(
path_with_guard: PathWithGuard,
namespace: &str,
max_stream_queries: usize,
cache_size: usize,
root_key: &[u8],
) -> Result<RocksDbStoreInternal, RocksDbStoreError> {
let mut rocksdb_stores = ROCKSDB_STORES.lock().unwrap();
let pair = (namespace.to_string(), root_key.to_vec());
match rocksdb_stores.stores.entry(pair) {
Entry::Occupied(entry) => {
let entry = entry.into_mut();
Ok(entry.clone())
}
Entry::Vacant(entry) => {
let store = Self::build(
path_with_guard,
namespace,
max_stream_queries,
cache_size,
root_key,
)?;
entry.insert(store.clone());
Ok(store)
}
}
}
}

impl WithError for RocksDbStoreInternal {
Expand All @@ -155,17 +115,18 @@ impl ReadableKeyValueStore for RocksDbStoreInternal {
async fn read_value_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>, RocksDbStoreError> {
ensure!(key.len() <= MAX_KEY_SIZE, RocksDbStoreError::KeyTooLong);
let client = self.clone();
let key = key.to_vec();
Ok(tokio::task::spawn_blocking(move || client.db.get(&key)).await??)
let mut full_key = self.root_key.to_vec();
full_key.extend(key);
Ok(tokio::task::spawn_blocking(move || client.db.get(&full_key)).await??)
}

async fn contains_key(&self, key: &[u8]) -> Result<bool, RocksDbStoreError> {
ensure!(key.len() <= MAX_KEY_SIZE, RocksDbStoreError::KeyTooLong);
let client = self.clone();
let key_may_exist = {
let key = key.to_vec();
tokio::task::spawn_blocking(move || client.db.key_may_exist(key)).await?
};
let mut full_key = self.root_key.to_vec();
full_key.extend(key);
let key_may_exist =
tokio::task::spawn_blocking(move || client.db.key_may_exist(full_key)).await?;
if !key_may_exist {
return Ok(false);
}
Expand All @@ -176,10 +137,12 @@ impl ReadableKeyValueStore for RocksDbStoreInternal {
let size = keys.len();
let mut results = vec![false; size];
let mut handles = Vec::new();
for key in keys.clone() {
for key in &keys {
ensure!(key.len() <= MAX_KEY_SIZE, RocksDbStoreError::KeyTooLong);
let mut full_key = self.root_key.to_vec();
full_key.extend(key);
let client = self.clone();
let handle = tokio::task::spawn_blocking(move || client.db.key_may_exist(key));
let handle = tokio::task::spawn_blocking(move || client.db.key_may_exist(full_key));
handles.push(handle);
}
let may_results: Vec<_> = join_all(handles)
Expand Down Expand Up @@ -209,7 +172,15 @@ impl ReadableKeyValueStore for RocksDbStoreInternal {
ensure!(key.len() <= MAX_KEY_SIZE, RocksDbStoreError::KeyTooLong);
}
let client = self.clone();
let entries = tokio::task::spawn_blocking(move || client.db.multi_get(&keys)).await?;
let full_keys = keys
.into_iter()
.map(|key| {
let mut full_key = self.root_key.to_vec();
full_key.extend(key);
full_key
})
.collect::<Vec<_>>();
let entries = tokio::task::spawn_blocking(move || client.db.multi_get(&full_keys)).await?;
Ok(entries.into_iter().collect::<Result<_, _>>()?)
}

Expand All @@ -222,8 +193,9 @@ impl ReadableKeyValueStore for RocksDbStoreInternal {
RocksDbStoreError::KeyTooLong
);
let client = self.clone();
let prefix = key_prefix.to_vec();
let len = key_prefix.len();
let mut prefix = self.root_key.clone();
prefix.extend(key_prefix);
let len = prefix.len();
let keys = tokio::task::spawn_blocking(move || {
let mut iter = client.db.raw_iterator();
let mut keys = Vec::new();
Expand Down Expand Up @@ -252,8 +224,9 @@ impl ReadableKeyValueStore for RocksDbStoreInternal {
RocksDbStoreError::KeyTooLong
);
let client = self.clone();
let prefix = key_prefix.to_vec();
let len = key_prefix.len();
let mut prefix = self.root_key.clone();
prefix.extend(key_prefix);
let len = prefix.len();
let key_values = tokio::task::spawn_blocking(move || {
let mut iter = client.db.raw_iterator();
let mut key_values = Vec::new();
Expand Down Expand Up @@ -292,35 +265,45 @@ impl WritableKeyValueStore for RocksDbStoreInternal {
if let WriteOperation::DeletePrefix { key_prefix } = op {
if get_upper_bound(key_prefix) == Bound::Unbounded {
for short_key in self.find_keys_by_prefix(key_prefix).await? {
let mut key = key_prefix.clone();
key.extend(short_key);
keys.push(key);
let mut full_key = self.root_key.clone();
full_key.extend(key_prefix);
full_key.extend(short_key);
keys.push(full_key);
}
}
}
}
for key in keys {
batch.operations.push(WriteOperation::Delete { key });
}
let root_key = self.root_key.to_vec();
tokio::task::spawn_blocking(move || -> Result<(), RocksDbStoreError> {
let mut inner_batch = rocksdb::WriteBatchWithTransaction::default();
for operation in batch.operations {
match operation {
WriteOperation::Delete { key } => {
ensure!(key.len() <= MAX_KEY_SIZE, RocksDbStoreError::KeyTooLong);
inner_batch.delete(&key)
let mut full_key = root_key.clone();
full_key.extend(key);
inner_batch.delete(&full_key)
}
WriteOperation::Put { key, value } => {
ensure!(key.len() <= MAX_KEY_SIZE, RocksDbStoreError::KeyTooLong);
inner_batch.put(&key, value)
let mut full_key = root_key.clone();
full_key.extend(key);
inner_batch.put(&full_key, value)
}
WriteOperation::DeletePrefix { key_prefix } => {
ensure!(
key_prefix.len() <= MAX_KEY_SIZE,
RocksDbStoreError::KeyTooLong
);
if let Excluded(upper_bound) = get_upper_bound(&key_prefix) {
inner_batch.delete_range(&key_prefix, &upper_bound);
let mut full_key1 = root_key.clone();
full_key1.extend(key_prefix);
let mut full_key2 = root_key.clone();
full_key2.extend(upper_bound);
inner_batch.delete_range(&full_key1, &full_key2);
}
}
}
Expand All @@ -337,10 +320,6 @@ impl WritableKeyValueStore for RocksDbStoreInternal {
}
}

fn root_key_as_string(root_key: &[u8]) -> String {
format!("ROOT_KEY_{}", hex::encode(root_key))
}

impl AdminKeyValueStore for RocksDbStoreInternal {
type Config = RocksDbStoreConfig;

Expand All @@ -365,26 +344,13 @@ impl AdminKeyValueStore for RocksDbStoreInternal {
path_with_guard.path_buf = path_buf;
let max_stream_queries = config.common_config.max_stream_queries;
let cache_size = config.common_config.cache_size;
RocksDbStoreInternal::connect_from_path(
path_with_guard,
namespace,
max_stream_queries,
cache_size,
root_key,
)
RocksDbStoreInternal::build(path_with_guard, max_stream_queries, cache_size, root_key)
}

fn clone_with_root_key(&self, root_key: &[u8]) -> Result<Self, RocksDbStoreError> {
let path_with_guard = self.path_with_guard.clone();
let max_stream_queries = self.max_stream_queries;
let cache_size = self.cache_size;
RocksDbStoreInternal::connect_from_path(
path_with_guard,
&self.namespace,
max_stream_queries,
cache_size,
root_key,
)
let mut store = self.clone();
store.root_key = root_key.to_vec();
Ok(store)
}

async fn list_all(config: &Self::Config) -> Result<Vec<String>, RocksDbStoreError> {
Expand Down

0 comments on commit cc039d7

Please sign in to comment.