diff --git a/rust/noosphere-storage/Cargo.toml b/rust/noosphere-storage/Cargo.toml index 8655649e2..7236f6643 100644 --- a/rust/noosphere-storage/Cargo.toml +++ b/rust/noosphere-storage/Cargo.toml @@ -32,9 +32,9 @@ libipld-cbor = { workspace = true } serde = { workspace = true } base64 = "=0.21.2" url = { version = "^2" } +witty-phrase-generator = "~0.2" [dev-dependencies] -witty-phrase-generator = "~0.2" wasm-bindgen-test = { workspace = true } rand = { workspace = true } # examples/bench @@ -42,11 +42,9 @@ noosphere-core = { version = "0.15.2", path = "../noosphere-core", features = [" # examples/bench noosphere-common = { version = "0.1.0", path = "../noosphere-common", features = ["helpers"] } -[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] -tempfile = { workspace = true } - [target.'cfg(not(target_arch = "wasm32"))'.dependencies] sled = "~0.34" +tempfile = { workspace = true } tokio = { workspace = true, features = ["full"] } rocksdb = { version = "0.21.0", optional = true } diff --git a/rust/noosphere-storage/examples/bench/main.rs b/rust/noosphere-storage/examples/bench/main.rs index 419e69a80..334f44437 100644 --- a/rust/noosphere-storage/examples/bench/main.rs +++ b/rust/noosphere-storage/examples/bench/main.rs @@ -128,9 +128,7 @@ impl BenchmarkStorage { ))] let (storage, storage_name) = { ( - noosphere_storage::SledStorage::new(noosphere_storage::SledStorageInit::Path( - storage_path.into(), - ))?, + noosphere_storage::SledStorage::new(&storage_path)?, "SledDbStorage", ) }; @@ -138,7 +136,7 @@ impl BenchmarkStorage { #[cfg(all(not(target_arch = "wasm32"), feature = "rocksdb"))] let (storage, storage_name) = { ( - noosphere_storage::RocksDbStorage::new(storage_path.into())?, + noosphere_storage::RocksDbStorage::new(&storage_path)?, "RocksDbStorage", ) }; diff --git a/rust/noosphere-storage/src/extra.rs b/rust/noosphere-storage/src/extra.rs new file mode 100644 index 000000000..aa856ba4a --- /dev/null +++ b/rust/noosphere-storage/src/extra.rs @@ -0,0 +1,94 @@ +use crate::storage::Storage; +use anyhow::Result; +use async_trait::async_trait; +use noosphere_common::ConditionalSend; +use std::{fs, path::Path}; + +/// [Storage] that can be opened via [Path] reference. +/// [FsBackedStorage] types get a blanket implementation. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait OpenStorage: Storage + Sized { + async fn open + ConditionalSend>(path: P) -> Result; +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl OpenStorage for T +where + T: FsBackedStorage, +{ + async fn open + ConditionalSend>(path: P) -> Result { + FsBackedStorage::open(path).await + } +} + +/// [Storage] that can be deleted via [Path] reference. +/// [FsBackedStorage] types get a blanket implementation. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait RemoveStorage: Storage + Sized { + async fn remove + ConditionalSend>(path: P) -> Result<()>; +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl RemoveStorage for T +where + T: FsBackedStorage, +{ + async fn remove + ConditionalSend>(path: P) -> Result<()> { + ::remove(path).await + } +} + +/// [Storage] that can be moved/renamed via [Path] reference. +/// [FsBackedStorage] types get a blanket implementation. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait RenameStorage: Storage + Sized { + async fn rename + ConditionalSend, Q: AsRef + ConditionalSend>( + from: P, + to: Q, + ) -> Result<()>; +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl RenameStorage for T +where + T: FsBackedStorage, +{ + async fn rename + ConditionalSend, Q: AsRef + ConditionalSend>( + from: P, + to: Q, + ) -> Result<()> { + ::rename(from, to).await + } +} + +/// [Storage] that is based on a file system. Implementing [FsBackedStorage] +/// provides blanket implementations for other trait-based [Storage] operations. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait FsBackedStorage: Storage + Sized { + /// Opens the storage at `path`. + async fn open + ConditionalSend>(path: P) -> Result; + + /// Deletes the storage located at `path` directory. Returns `Ok(())` if + /// the directory is successfully removed, or if it already does not exist. + async fn remove + ConditionalSend>(path: P) -> Result<()> { + match fs::metadata(path.as_ref()) { + Ok(_) => fs::remove_dir_all(path.as_ref()).map_err(|e| e.into()), + Err(_) => Ok(()), + } + } + + /// Moves the storage located at `from` to the `to` location. + async fn rename + ConditionalSend, Q: AsRef + ConditionalSend>( + from: P, + to: Q, + ) -> Result<()> { + fs::rename(from, to).map_err(|e| e.into()) + } +} diff --git a/rust/noosphere-storage/src/helpers.rs b/rust/noosphere-storage/src/helpers.rs deleted file mode 100644 index 5c531ef3c..000000000 --- a/rust/noosphere-storage/src/helpers.rs +++ /dev/null @@ -1,34 +0,0 @@ -use crate::Storage; -use anyhow::Result; - -#[cfg(not(target_arch = "wasm32"))] -use crate::{SledStorage, SledStorageInit, SledStore}; - -#[cfg(not(target_arch = "wasm32"))] -pub async fn make_disposable_store() -> Result { - let temp_dir = std::env::temp_dir(); - let temp_name: String = witty_phrase_generator::WPGen::new() - .with_words(3) - .unwrap() - .into_iter() - .map(String::from) - .collect(); - let provider = SledStorage::new(SledStorageInit::Path(temp_dir.join(temp_name)))?; - provider.get_block_store("foo").await -} - -#[cfg(target_arch = "wasm32")] -use crate::{IndexedDbStorage, IndexedDbStore}; - -#[cfg(target_arch = "wasm32")] -pub async fn make_disposable_store() -> Result { - let temp_name: String = witty_phrase_generator::WPGen::new() - .with_words(3) - .unwrap() - .into_iter() - .map(|word| String::from(word)) - .collect(); - - let provider = IndexedDbStorage::new(&temp_name).await?; - provider.get_block_store(crate::db::BLOCK_STORE).await -} diff --git a/rust/noosphere-storage/src/implementation/indexed_db.rs b/rust/noosphere-storage/src/implementation/indexed_db.rs index 6327108da..9c23703ab 100644 --- a/rust/noosphere-storage/src/implementation/indexed_db.rs +++ b/rust/noosphere-storage/src/implementation/indexed_db.rs @@ -1,12 +1,14 @@ use crate::store::Store; use crate::{db::SPHERE_DB_STORE_NAMES, storage::Storage}; use anyhow::{anyhow, Error, Result}; +use async_stream::try_stream; use async_trait::async_trait; use js_sys::Uint8Array; +use noosphere_common::ConditionalSend; use rexie::{ KeyRange, ObjectStore, Rexie, RexieBuilder, Store as IdbStore, Transaction, TransactionMode, }; -use std::{fmt::Debug, rc::Rc}; +use std::{fmt::Debug, path::Path, rc::Rc}; use wasm_bindgen::{JsCast, JsValue}; pub const INDEXEDDB_STORAGE_VERSION: u32 = 1; @@ -69,7 +71,12 @@ impl IndexedDbStorage { let db = Rc::into_inner(self.db) .ok_or_else(|| anyhow!("Could not unwrap inner during database clear."))?; db.close(); - Rexie::delete(&name) + Self::delete(&name).await + } + + /// Deletes database with key `db_name` from origin storage. + pub async fn delete(db_name: &str) -> Result<()> { + Rexie::delete(db_name) .await .map_err(|error| anyhow!("{:?}", error)) } @@ -90,6 +97,30 @@ impl Storage for IndexedDbStorage { } } +#[async_trait(?Send)] +impl crate::extra::OpenStorage for IndexedDbStorage { + async fn open + ConditionalSend>(path: P) -> Result { + IndexedDbStorage::new( + path.as_ref() + .to_str() + .ok_or_else(|| anyhow!("Could not stringify path."))?, + ) + .await + } +} + +#[async_trait(?Send)] +impl crate::extra::RemoveStorage for IndexedDbStorage { + async fn remove + ConditionalSend>(path: P) -> Result<()> { + Self::delete( + path.as_ref() + .to_str() + .ok_or_else(|| anyhow!("Could not stringify path."))?, + ) + .await + } +} + #[derive(Clone)] pub struct IndexedDbStore { db: Rc, @@ -114,87 +145,104 @@ impl IndexedDbStore { Ok(()) } - fn bytes_to_typed_array(bytes: &[u8]) -> Result { - let array = Uint8Array::new_with_length(bytes.len() as u32); - array.copy_from(&bytes); - Ok(JsValue::from(array)) - } - - async fn contains(key: &JsValue, store: &IdbStore) -> Result { + async fn contains(key: &[u8], store: &IdbStore) -> Result { + let key_js = bytes_to_typed_array(key)?; let count = store .count(Some( - &KeyRange::only(key).map_err(|error| anyhow!("{:?}", error))?, + &KeyRange::only(&key_js).map_err(|error| anyhow!("{:?}", error))?, )) .await .map_err(|error| anyhow!("{:?}", error))?; Ok(count > 0) } - async fn read(key: &JsValue, store: &IdbStore) -> Result>> { + async fn read(key: &[u8], store: &IdbStore) -> Result>> { + let key_js = bytes_to_typed_array(key)?; Ok(match IndexedDbStore::contains(&key, &store).await? { - true => Some( + true => Some(typed_array_to_bytes( store - .get(&key) + .get(&key_js) .await - .map_err(|error| anyhow!("{:?}", error))? - .dyn_into::() - .map_err(|error| anyhow!("{:?}", error))? - .to_vec(), - ), + .map_err(|error| anyhow!("{:?}", error))?, + )?), false => None, }) } + + async fn put(key: &[u8], value: &[u8], store: &IdbStore) -> Result<()> { + let key_js = bytes_to_typed_array(key)?; + let value_js = bytes_to_typed_array(value)?; + store + .put(&value_js, Some(&key_js)) + .await + .map_err(|error| anyhow!("{:?}", error))?; + Ok(()) + } + + async fn delete(key: &[u8], store: &IdbStore) -> Result<()> { + let key_js = bytes_to_typed_array(key)?; + store + .delete(&key_js) + .await + .map_err(|error| anyhow!("{:?}", error))?; + Ok(()) + } } #[async_trait(?Send)] impl Store for IndexedDbStore { async fn read(&self, key: &[u8]) -> Result>> { let (store, tx) = self.start_transaction(TransactionMode::ReadOnly)?; - let key = IndexedDbStore::bytes_to_typed_array(key)?; - - let maybe_dag = IndexedDbStore::read(&key, &store).await?; - + let maybe_dag = IndexedDbStore::read(key, &store).await?; IndexedDbStore::finish_transaction(tx).await?; - Ok(maybe_dag) } async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result>> { let (store, tx) = self.start_transaction(TransactionMode::ReadWrite)?; - - let key = IndexedDbStore::bytes_to_typed_array(key)?; - let value = IndexedDbStore::bytes_to_typed_array(bytes)?; - let old_bytes = IndexedDbStore::read(&key, &store).await?; - - store - .put(&value, Some(&key)) - .await - .map_err(|error| anyhow!("{:?}", error))?; - + IndexedDbStore::put(key, bytes, &store).await?; IndexedDbStore::finish_transaction(tx).await?; - Ok(old_bytes) } async fn remove(&mut self, key: &[u8]) -> Result>> { let (store, tx) = self.start_transaction(TransactionMode::ReadWrite)?; - - let key = IndexedDbStore::bytes_to_typed_array(key)?; - - let old_value = IndexedDbStore::read(&key, &store).await?; - - store - .delete(&key) - .await - .map_err(|error| anyhow!("{:?}", error))?; - + let old_value = IndexedDbStore::read(key, &store).await?; + IndexedDbStore::delete(key, &store).await?; IndexedDbStore::finish_transaction(tx).await?; - Ok(old_value) } } +impl crate::IterableStore for IndexedDbStore { + fn get_all_entries(&self) -> crate::IterableStoreStream<'_> { + Box::pin(try_stream! { + let (store, tx) = self.start_transaction(TransactionMode::ReadWrite)?; + let limit = 100; + let mut offset = 0; + loop { + let results = store.get_all(None, Some(limit), Some(offset), None).await + .map_err(|error| anyhow!("{:?}", error))?; + let count = results.len(); + if count == 0 { + IndexedDbStore::finish_transaction(tx).await?; + break; + } + + offset += count as u32; + + for (key_js, value_js) in results { + yield ( + typed_array_to_bytes(JsValue::from(Uint8Array::new(&key_js)))?, + Some(typed_array_to_bytes(value_js)?) + ); + } + } + }) + } +} + #[cfg(feature = "performance")] struct SpaceUsageError(Error); @@ -263,3 +311,16 @@ impl crate::Space for IndexedDbStorage { } } } + +fn bytes_to_typed_array(bytes: &[u8]) -> Result { + let array = Uint8Array::new_with_length(bytes.len() as u32); + array.copy_from(&bytes); + Ok(JsValue::from(array)) +} + +fn typed_array_to_bytes(js_value: JsValue) -> Result> { + Ok(js_value + .dyn_into::() + .map_err(|error| anyhow!("{:?}", error))? + .to_vec()) +} diff --git a/rust/noosphere-storage/src/implementation/memory.rs b/rust/noosphere-storage/src/implementation/memory.rs index 8fd92e692..5c933db04 100644 --- a/rust/noosphere-storage/src/implementation/memory.rs +++ b/rust/noosphere-storage/src/implementation/memory.rs @@ -1,6 +1,8 @@ use anyhow::{anyhow, Result}; +use async_stream::try_stream; use async_trait::async_trait; use cid::Cid; +use noosphere_common::ConditionalSend; use std::{collections::HashMap, sync::Arc}; use tokio::sync::Mutex; @@ -57,6 +59,14 @@ impl Storage for MemoryStorage { } } +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl crate::extra::OpenStorage for MemoryStorage { + async fn open + ConditionalSend>(_: P) -> Result { + Ok(MemoryStorage::default()) + } +} + #[derive(Clone, Default, Debug)] pub struct MemoryStore { pub entries: Arc, Vec>>>, @@ -131,6 +141,19 @@ impl Store for MemoryStore { } } +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl crate::IterableStore for MemoryStore { + fn get_all_entries(&self) -> crate::IterableStoreStream<'_> { + Box::pin(try_stream! { + let dags = self.entries.lock().await; + for key in dags.keys() { + yield (key.to_owned(), dags.get(key).cloned()); + } + }) + } +} + #[cfg(feature = "performance")] #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] diff --git a/rust/noosphere-storage/src/implementation/rocks_db.rs b/rust/noosphere-storage/src/implementation/rocks_db.rs index 2a787e66f..26a1d097a 100644 --- a/rust/noosphere-storage/src/implementation/rocks_db.rs +++ b/rust/noosphere-storage/src/implementation/rocks_db.rs @@ -1,6 +1,7 @@ use crate::{storage::Storage, store::Store, SPHERE_DB_STORE_NAMES}; use anyhow::{anyhow, Result}; use async_trait::async_trait; +use noosphere_common::ConditionalSend; use rocksdb::{ColumnFamilyDescriptor, DBWithThreadMode, Options}; use std::{ path::{Path, PathBuf}, @@ -29,9 +30,9 @@ pub struct RocksDbStorage { } impl RocksDbStorage { - pub fn new(path: PathBuf) -> Result { - std::fs::create_dir_all(&path)?; - let canonicalized = path.canonicalize()?; + pub fn new>(path: P) -> Result { + std::fs::create_dir_all(path.as_ref())?; + let canonicalized = path.as_ref().canonicalize()?; let db = Arc::new(RocksDbStorage::init_db(canonicalized.clone())?); Ok(RocksDbStorage { db, @@ -83,6 +84,13 @@ impl Storage for RocksDbStorage { } } +#[async_trait] +impl crate::extra::FsBackedStorage for RocksDbStorage { + async fn open + ConditionalSend>(path: P) -> Result { + RocksDbStorage::new(path) + } +} + #[derive(Clone)] pub struct RocksDbStore { name: String, diff --git a/rust/noosphere-storage/src/implementation/sled.rs b/rust/noosphere-storage/src/implementation/sled.rs index f1a116bf8..b37aead65 100644 --- a/rust/noosphere-storage/src/implementation/sled.rs +++ b/rust/noosphere-storage/src/implementation/sled.rs @@ -1,35 +1,26 @@ -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use crate::storage::Storage; use crate::store::Store; use anyhow::Result; +use async_stream::try_stream; use async_trait::async_trait; +use noosphere_common::ConditionalSend; use sled::{Db, Tree}; -pub enum SledStorageInit { - Path(PathBuf), - Db(Db), -} - #[derive(Clone, Debug)] pub struct SledStorage { db: Db, #[allow(unused)] - path: Option, + path: PathBuf, } impl SledStorage { - pub fn new(init: SledStorageInit) -> Result { - let mut db_path = None; - let db: Db = match init { - SledStorageInit::Path(path) => { - std::fs::create_dir_all(&path)?; - db_path = Some(path.clone().canonicalize()?); - sled::open(path)? - } - SledStorageInit::Db(db) => db, - }; + pub fn new>(path: P) -> Result { + std::fs::create_dir_all(path.as_ref())?; + let db_path = path.as_ref().canonicalize()?; + let db = sled::open(&db_path)?; Ok(SledStorage { db, path: db_path }) } @@ -54,6 +45,13 @@ impl Storage for SledStorage { } } +#[async_trait] +impl crate::extra::FsBackedStorage for SledStorage { + async fn open + ConditionalSend>(path: P) -> Result { + SledStorage::new(path) + } +} + #[derive(Clone)] pub struct SledStore { db: Tree, @@ -104,16 +102,21 @@ impl Drop for SledStorage { } } +impl crate::IterableStore for SledStore { + fn get_all_entries(&self) -> crate::IterableStoreStream<'_> { + Box::pin(try_stream! { + for entry in self.db.iter() { + let (key, value) = entry?; + yield (Vec::from(key.as_ref()), Some(Vec::from(value.as_ref()))); + } + }) + } +} + #[cfg(feature = "performance")] #[async_trait] impl crate::Space for SledStorage { async fn get_space_usage(&self) -> Result { - if let Some(path) = &self.path { - crate::get_dir_size(path).await - } else { - Err(anyhow::anyhow!( - "Could not calculate storage space, requires usage of a path constructor." - )) - } + crate::get_dir_size(&self.path).await } } diff --git a/rust/noosphere-storage/src/lib.rs b/rust/noosphere-storage/src/lib.rs index 8d357a87a..00343f3f3 100644 --- a/rust/noosphere-storage/src/lib.rs +++ b/rust/noosphere-storage/src/lib.rs @@ -12,34 +12,37 @@ mod key_value; mod db; mod encoding; +mod extra; +mod migration; mod retry; mod storage; mod store; mod tap; +mod temp; mod ucan; pub use crate::ucan::*; pub use block::*; pub use db::*; pub use encoding::*; +pub use extra::*; pub use implementation::*; pub use key_value::*; +pub use migration::*; pub use retry::*; pub use storage::*; pub use store::*; pub use tap::*; +pub use temp::*; #[cfg(feature = "performance")] mod space; #[cfg(feature = "performance")] pub use space::*; -#[cfg(test)] -pub mod helpers; - #[cfg(test)] mod tests { - use crate::{block::BlockStore, helpers::make_disposable_store}; + use crate::{block::BlockStore, Storage, TempStorage}; use libipld_cbor::DagCborCodec; #[cfg(target_arch = "wasm32")] @@ -49,13 +52,19 @@ mod tests { #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] - async fn it_can_store_and_retrieve_bytes() { - let mut storage = make_disposable_store().await.unwrap(); + async fn it_can_store_and_retrieve_bytes() -> anyhow::Result<()> { + #[cfg(target_arch = "wasm32")] + type TempStorageType = crate::IndexedDbStorage; + #[cfg(not(target_arch = "wasm32"))] + type TempStorageType = crate::SledStorage; + let storage = TempStorage::::new().await?; + let mut store = storage.get_block_store("foo").await?; let bytes = b"I love every kind of cat"; - let cid = storage.save::(bytes).await.unwrap(); - let retrieved = storage.load::>(&cid).await.unwrap(); + let cid = store.save::(bytes).await?; + let retrieved = store.load::>(&cid).await?; assert_eq!(retrieved, bytes); + Ok(()) } } diff --git a/rust/noosphere-storage/src/migration.rs b/rust/noosphere-storage/src/migration.rs new file mode 100644 index 000000000..a38427b2f --- /dev/null +++ b/rust/noosphere-storage/src/migration.rs @@ -0,0 +1,234 @@ +use crate::{OpenStorage, RemoveStorage, RenameStorage, Storage, Store, SPHERE_DB_STORE_NAMES}; +use anyhow::Result; +use async_trait::async_trait; +use noosphere_common::ConditionalSync; +use std::pin::Pin; +use tokio_stream::{Stream, StreamExt}; + +#[cfg(target_arch = "wasm32")] +pub type IterableStoreStream<'a> = + Pin, Option>)>> + 'a>>; +#[cfg(not(target_arch = "wasm32"))] +pub type IterableStoreStream<'a> = + Pin, Option>)>> + Send + 'a>>; + +/// For stores that can iterate over all of their entries. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait IterableStore { + fn get_all_entries(&self) -> IterableStoreStream<'_>; +} + +/// [ExportStorage] [Storage] can be imported by an [ImportStorage]. A [Storage] +/// is [ExportStorage] if its `KeyValueStore` also implements [IterableStore]. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait ExportStorage +where + Self: Storage, + ::KeyValueStore: IterableStore, +{ + async fn get_all_store_names(&self) -> Result> { + let mut names = vec![]; + names.extend(SPHERE_DB_STORE_NAMES.iter().map(|name| String::from(*name))); + Ok(names) + } +} + +impl ExportStorage for S +where + S: Storage, + S::KeyValueStore: IterableStore, +{ +} + +/// A blanket implementation for [Storage]s to import +/// an [ExportStorage] storage. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait ImportStorage<'a, E> +where + Self: Storage, + Self::KeyValueStore: Store, + E: ExportStorage + ConditionalSync + 'a, + ::KeyValueStore: IterableStore, +{ + /// Copy all stores' entries from `exportable` into this [Storage]. + async fn import(&'a mut self, exportable: &E) -> Result<()> { + for store_name in exportable.get_all_store_names().await? { + let mut store = self.get_key_value_store(&store_name).await?; + let export_store = exportable.get_key_value_store(&store_name).await?; + let mut stream = export_store.get_all_entries(); + while let Some((key, value)) = stream.try_next().await? { + if let Some(value) = value { + Store::write(&mut store, key.as_ref(), value.as_ref()).await?; + } + } + } + Ok(()) + } +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl<'a, T, E> ImportStorage<'a, E> for T +where + T: Storage, + T::KeyValueStore: Store, + E: ExportStorage + ConditionalSync + 'a, + ::KeyValueStore: IterableStore, +{ +} + +/// A [Storage] that can be used as the source in [migrate_storage]. +pub trait MigratableSource +where + Self: OpenStorage + RemoveStorage, + ::KeyValueStore: IterableStore, +{ +} +impl MigratableSource for T +where + Self: OpenStorage + RemoveStorage, + ::KeyValueStore: IterableStore, +{ +} + +/// A [Storage] that can be used as the destination target in [migrate_storage]. +pub trait MigratableTarget<'a, S> +where + S: MigratableSource + ConditionalSync + 'a, + ::KeyValueStore: IterableStore, + Self: ImportStorage<'a, S> + Storage + OpenStorage + RemoveStorage + RenameStorage, + ::KeyValueStore: Store, +{ +} + +impl<'a, S, T> MigratableTarget<'a, S> for T +where + S: MigratableSource + ConditionalSync + 'a, + ::KeyValueStore: IterableStore, + Self: ImportStorage<'a, S> + Storage + OpenStorage + RemoveStorage + RenameStorage, + ::KeyValueStore: Store, +{ +} + +#[cfg(not(target_arch = "wasm32"))] +/// Opens the [Storage] at `path` as storage type `S`, creates a new [Storage] +/// of type `T`, copies over all data, and moves the new storage to `path` upon +/// success. +pub async fn migrate_storage(path: impl AsRef) -> Result +where + for<'a> T: MigratableTarget<'a, S>, + ::KeyValueStore: Store, + S: MigratableSource + ConditionalSync, + ::KeyValueStore: IterableStore, +{ + let temp_dir = tempfile::TempDir::new()?; + let temp_path = temp_dir.path(); + { + let mut to_storage = T::open(temp_path).await?; + let from_storage = S::open(path.as_ref()).await?; + to_storage.import(&from_storage).await?; + } + S::remove(path.as_ref()).await?; + T::rename(temp_path, path.as_ref()).await?; + T::open(path.as_ref()).await +} + +#[cfg(test)] +mod test { + use crate::TempStorage; + + use super::*; + + #[cfg(target_arch = "wasm32")] + use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure}; + #[cfg(target_arch = "wasm32")] + wasm_bindgen_test_configure!(run_in_browser); + + /// wasm32: IndexedDbStorage -> MemoryStorage + /// native: SledStorage -> MemoryStorage + /// native+rocks: SledStorage -> RocksDbStorage + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + pub async fn it_can_import_export_storages() -> Result<()> { + noosphere_core::tracing::initialize_tracing(None); + + #[cfg(target_arch = "wasm32")] + type FromStorage = crate::IndexedDbStorage; + #[cfg(not(target_arch = "wasm32"))] + type FromStorage = crate::SledStorage; + + #[cfg(target_arch = "wasm32")] + type ToStorage = crate::MemoryStorage; + #[cfg(all(feature = "rocksdb", not(target_arch = "wasm32")))] + type ToStorage = crate::RocksDbStorage; + #[cfg(all(not(feature = "rocksdb"), not(target_arch = "wasm32")))] + type ToStorage = crate::MemoryStorage; + + let from_storage = TempStorage::::new().await?; + let mut to_storage = TempStorage::::new().await?; + { + let mut store = from_storage.get_key_value_store("links").await?; + for n in 0..500u32 { + let slug = format!("slug-{}", n); + let bytes = vec![n as u8; 10]; + store.write(slug.as_ref(), bytes.as_ref()).await?; + } + } + + to_storage.import(from_storage.as_ref()).await?; + + { + let store = to_storage.get_key_value_store("links").await?; + for n in 0..500u32 { + let slug = format!("slug-{}", n); + let expected_bytes = vec![n as u8; 10]; + + if let Some(bytes) = store.read(slug.as_ref()).await? { + assert_eq!(bytes, expected_bytes); + } else { + panic!("Expected key `{n}` to exist in new db"); + } + } + } + Ok(()) + } + + #[cfg(all(not(target_arch = "wasm32"), feature = "rocksdb"))] + #[tokio::test] + pub async fn it_can_migrate_native_dbs() -> Result<()> { + noosphere_core::tracing::initialize_tracing(None); + let temp_dir = tempfile::TempDir::new()?; + let storage_path = temp_dir.path(); + + { + let from_storage = crate::SledStorage::new(storage_path)?; + let mut store = from_storage.get_key_value_store("links").await?; + for n in 0..500u32 { + let slug = format!("slug-{}", n); + let bytes = vec![n as u8; 10]; + store.write(slug.as_ref(), bytes.as_ref()).await?; + } + } + + let to_storage: crate::RocksDbStorage = + migrate_storage::(storage_path).await?; + + { + let store = to_storage.get_key_value_store("links").await?; + for n in 0..500u32 { + let slug = format!("slug-{}", n); + let expected_bytes = vec![n as u8; 10]; + + if let Some(bytes) = store.read(slug.as_ref()).await? { + assert_eq!(bytes, expected_bytes); + } else { + panic!("Expected key `{n}` to exist in new db"); + } + } + } + Ok(()) + } +} diff --git a/rust/noosphere-storage/src/temp.rs b/rust/noosphere-storage/src/temp.rs new file mode 100644 index 000000000..eb88619b7 --- /dev/null +++ b/rust/noosphere-storage/src/temp.rs @@ -0,0 +1,78 @@ +use crate::{extra::OpenStorage, storage::Storage}; +use anyhow::Result; +use std::ops::{Deref, DerefMut}; + +/// An ephemeral [Storage] that does not persist after dropping. +/// Currently, native builds create a temp dir syncing lifetimes, and web +/// builds use a randomly generated database name. +/// In the future, we may have web builds that use +/// a file-system backed Storage, or native builds that do not use +/// the file-system (currently the case with [MemoryStorage]), where +/// a more complex configuration is needed. Mostly used in tests. +pub struct TempStorage +where + S: Storage + OpenStorage, +{ + inner: S, + #[cfg(not(target_arch = "wasm32"))] + _temp_dir: tempfile::TempDir, + #[cfg(target_arch = "wasm32")] + key: String, +} + +impl TempStorage +where + S: Storage + OpenStorage, +{ + pub async fn new() -> Result { + #[cfg(target_arch = "wasm32")] + let key: String = witty_phrase_generator::WPGen::new() + .with_words(3) + .unwrap() + .into_iter() + .map(|word| String::from(word)) + .collect(); + #[cfg(target_arch = "wasm32")] + let inner = S::open(&key).await?; + #[cfg(target_arch = "wasm32")] + let out = Self { inner, key }; + + #[cfg(not(target_arch = "wasm32"))] + let _temp_dir = tempfile::TempDir::new()?; + #[cfg(not(target_arch = "wasm32"))] + let inner = S::open(_temp_dir.path()).await?; + #[cfg(not(target_arch = "wasm32"))] + let out = Self { _temp_dir, inner }; + + Ok(out) + } +} + +impl Deref for TempStorage +where + S: Storage + OpenStorage, +{ + type Target = S; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for TempStorage +where + S: Storage + OpenStorage, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl AsRef for TempStorage +where + S: Storage + OpenStorage, +{ + fn as_ref(&self) -> &S { + &self.inner + } +} diff --git a/rust/noosphere/src/storage.rs b/rust/noosphere/src/storage.rs index 0caacef17..81155d43e 100644 --- a/rust/noosphere/src/storage.rs +++ b/rust/noosphere/src/storage.rs @@ -45,15 +45,11 @@ impl From for PathBuf { impl StorageLayout { pub async fn to_storage(&self) -> Result { #[cfg(sled)] - { - noosphere_storage::SledStorage::new(noosphere_storage::SledStorageInit::Path( - PathBuf::from(self), - )) - } + let storage = noosphere_storage::SledStorage::new(PathBuf::from(self)); #[cfg(rocksdb)] - { - noosphere_storage::RocksDbStorage::new(PathBuf::from(self)) - } + let storage = noosphere_storage::RocksDbStorage::new(PathBuf::from(self)); + + storage } }