diff --git a/monofs/lib/store/flatfsstore.rs b/monofs/lib/store/flatfsstore.rs index c7dee2b..d8db06d 100644 --- a/monofs/lib/store/flatfsstore.rs +++ b/monofs/lib/store/flatfsstore.rs @@ -299,7 +299,7 @@ where } // Create CID and store the block - let cid = monoutils_store::make_cid(Codec::DagCbor, &bytes); + let cid = monoutils_store::generate_cid(Codec::DagCbor, &bytes); let block_path = self.get_block_path(&cid); if !block_path.exists() { @@ -570,7 +570,7 @@ where } } - let cid = monoutils_store::make_cid(Codec::Raw, bytes.as_ref()); + let cid = monoutils_store::generate_cid(Codec::Raw, bytes.as_ref()); let block_path = self.get_block_path(&cid); if !block_path.exists() { diff --git a/monofs/lib/store/layeredstore.rs b/monofs/lib/store/layeredstore.rs new file mode 100644 index 0000000..deef418 --- /dev/null +++ b/monofs/lib/store/layeredstore.rs @@ -0,0 +1,120 @@ +use std::{collections::HashSet, pin::Pin}; + +use async_trait::async_trait; +use bytes::Bytes; +use monoutils_store::{ + ipld::cid::Cid, Codec, DualStore, DualStoreConfig, IpldReferences, IpldStore, MemoryStore, + RawStore, StoreResult, +}; +use serde::{de::DeserializeOwned, Serialize}; +use tokio::io::AsyncRead; + +//-------------------------------------------------------------------------------------------------- +// Types: LayeredStore +//-------------------------------------------------------------------------------------------------- + +#[derive(Debug, Clone)] +pub struct LayeredStore +where + S: IpldStore, +{ + inner: LayeredStoreInner, +} + +struct LayeredStoreInner +where + S: IpldStore, +{ + write_store: S, + base_store: S, +} + +//-------------------------------------------------------------------------------------------------- +// Methods: MemoryBufferStore +//-------------------------------------------------------------------------------------------------- + +impl MemoryBufferStore +where + S: IpldStore, +{ + /// Creates a new `MemoryBufferStore` with the given backup store. + pub fn new(backup_store: S) -> Self { + Self { + inner: DualStore::new( + MemoryStore::default(), + backup_store, + DualStoreConfig::default(), + ), + } + } +} + +// //-------------------------------------------------------------------------------------------------- +// // Trait Implementations +// //-------------------------------------------------------------------------------------------------- + +// #[async_trait] +// impl IpldStore for MemoryBufferStore +// where +// S: IpldStore + Sync, +// { +// async fn put_node(&self, data: &T) -> StoreResult +// where +// T: Serialize + IpldReferences + Sync, +// { +// self.inner.put_node(data).await +// } + +// async fn put_bytes(&self, reader: impl AsyncRead + Send + Sync) -> StoreResult { +// self.inner.put_bytes(reader).await +// } + +// async fn get_node(&self, cid: &Cid) -> StoreResult +// where +// T: DeserializeOwned + Send, +// { +// self.inner.get_node(cid).await +// } + +// async fn get_bytes(&self, cid: &Cid) -> StoreResult>> { +// self.inner.get_bytes(cid).await +// } + +// async fn get_bytes_size(&self, cid: &Cid) -> StoreResult { +// self.inner.get_bytes_size(cid).await +// } + +// async fn has(&self, cid: &Cid) -> bool { +// self.inner.has(cid).await +// } + +// async fn get_supported_codecs(&self) -> HashSet { +// self.inner.get_supported_codecs().await +// } + +// async fn get_max_node_block_size(&self) -> StoreResult> { +// self.inner.get_max_node_block_size().await +// } + +// async fn get_block_count(&self) -> StoreResult { +// self.inner.get_block_count().await +// } +// } + +// #[async_trait] +// impl RawStore for MemoryBufferStore +// where +// S: IpldStore + Sync, +// { +// async fn put_raw_block(&self, bytes: impl Into + Send) -> StoreResult { +// self.inner.put_raw_block(bytes).await +// } + +// async fn get_raw_block(&self, cid: &Cid) -> StoreResult { +// self.inner.get_raw_block(cid).await +// } + +// async fn get_max_raw_block_size(&self) -> StoreResult> { +// self.inner.get_max_raw_block_size().await +// } +// } diff --git a/monoutils-store/lib/implementations/stores/dualstore.rs b/monoutils-store/lib/implementations/stores/dualstore.rs index 1243aaf..d3fab83 100644 --- a/monoutils-store/lib/implementations/stores/dualstore.rs +++ b/monoutils-store/lib/implementations/stores/dualstore.rs @@ -12,7 +12,32 @@ use crate::{Codec, IpldReferences, IpldStore, RawStore, StoreError, StoreResult} // Types //-------------------------------------------------------------------------------------------------- -/// A dual store that stores blocks on two different stores. +/// A store that combines two IPLD stores, allowing configurable read and write operations between them. +/// +/// `DualStore` provides a way to use two different IPLD stores together, with configurable policies for +/// reading and writing data. This can be useful in several scenarios: +/// +/// - Implementing a caching layer where writes go to a fast store and reads check the cache first +/// - Migrating data between stores where writes go to the new store but reads check both +/// - Creating a hybrid store that combines the benefits of two different storage backends +/// +/// ## Examples +/// +/// ``` +/// use monoutils_store::{MemoryStore, DualStore, DualStoreConfig, Choice}; +/// +/// // Create two stores +/// let store_a = MemoryStore::default(); +/// let store_b = MemoryStore::default(); +/// +/// // Configure to read from A first, write to B +/// let config = DualStoreConfig { +/// read_from: Choice::A, +/// write_to: Choice::B, +/// }; +/// +/// let dual_store = DualStore::new(store_a, store_b, config); +/// ``` #[derive(Debug, Clone)] pub struct DualStore where @@ -24,20 +49,82 @@ where config: DualStoreConfig, } -/// Choices for selecting which store to use for a given operation. +/// Specifies which store to use for operations in a `DualStore`. +/// +/// This enum is used to configure the read and write behavior of a `DualStore`. +/// It allows for flexible routing of operations between the two underlying stores. +/// +/// ## Examples +/// +/// ``` +/// use monoutils_store::{DualStoreConfig, Choice}; +/// +/// // Configure to read from store A and write to store B +/// let config = DualStoreConfig { +/// read_from: Choice::A, +/// write_to: Choice::B, +/// }; +/// +/// // Configure to use store A for both reads and writes +/// let config = DualStoreConfig { +/// read_from: Choice::A, +/// write_to: Choice::A, +/// }; +/// ``` #[derive(Clone, Copy, Debug, PartialEq)] pub enum Choice { - /// Use the first store. + /// Use the first store (store A) A, - /// Use the second store. + /// Use the second store (store B) B, } -/// Configuration for a dual store. +/// Configuration for a `DualStore` that specifies which store to use for reads and writes. +/// +/// This configuration allows for flexible routing of operations between the two stores: +/// - `read_from`: Determines which store to check first when reading data +/// - `write_to`: Determines which store new data should be written to +/// +/// When reading data, if the data is not found in the primary store specified by `read_from`, +/// the other store will be checked as a fallback. +/// +/// By default, both `read_from` and `write_to` are set to `Choice::A`, meaning all operations +/// will primarily use the first store. This provides a simple single-store behavior out of the box, +/// while still allowing for fallback reads from the second store. +/// +/// ## Examples +/// +/// ``` +/// use monoutils_store::{DualStoreConfig, Choice}; +/// +/// // Default configuration - use store A for both reads and writes +/// let default_config = DualStoreConfig::default(); +/// assert_eq!(default_config.read_from, Choice::A); +/// assert_eq!(default_config.write_to, Choice::A); +/// +/// // Create a cache-like configuration where: +/// // - Reads check the fast store (A) first, then fallback to the slow store (B) +/// // - Writes go to the fast store (A) +/// let cache_config = DualStoreConfig { +/// read_from: Choice::A, +/// write_to: Choice::A, +/// }; +/// +/// // Create a migration configuration where: +/// // - Reads check both stores (starting with A) +/// // - Writes go only to the new store (B) +/// let migration_config = DualStoreConfig { +/// read_from: Choice::A, +/// write_to: Choice::B, +/// }; +/// ``` #[derive(Debug, Clone)] pub struct DualStoreConfig { - /// The default store to use. - pub default: Choice, + /// The store to write to. + pub write_to: Choice, + + /// The store to read from first. + pub read_from: Choice, } //-------------------------------------------------------------------------------------------------- @@ -59,7 +146,7 @@ where } /// Gets the type stored as an IPLD data from a chosen store by its `Cid`. - pub async fn get_node_from(&self, cid: &Cid, choice: Choice) -> StoreResult + async fn get_node_in(&self, cid: &Cid, choice: Choice) -> StoreResult where D: DeserializeOwned + Send, { @@ -70,7 +157,7 @@ where } /// Gets the bytes stored in a chosen store as raw bytes by its `Cid`. - pub async fn get_bytes_from( + async fn get_bytes_in( &self, cid: &Cid, choice: Choice, @@ -82,7 +169,7 @@ where } /// Gets the size of all the blocks associated with the given `Cid` in bytes. - pub async fn get_bytes_size_from(&self, cid: &Cid, choice: Choice) -> StoreResult { + async fn get_bytes_size_from(&self, cid: &Cid, choice: Choice) -> StoreResult { match choice { Choice::A => self.store_a.get_bytes_size(cid).await, Choice::B => self.store_b.get_bytes_size(cid).await, @@ -90,7 +177,7 @@ where } /// Gets raw bytes from a chosen store as a single block by its `Cid`. - pub async fn get_raw_block_from(&self, cid: &Cid, choice: Choice) -> StoreResult { + async fn get_raw_block_from(&self, cid: &Cid, choice: Choice) -> StoreResult { match choice { Choice::A => self.store_a.get_raw_block(cid).await, Choice::B => self.store_b.get_raw_block(cid).await, @@ -98,7 +185,7 @@ where } /// Saves a serializable type to a chosen store and returns the `Cid` to it. - pub async fn put_node_into(&self, data: &T, choice: Choice) -> StoreResult + async fn put_node_into(&self, data: &T, choice: Choice) -> StoreResult where T: Serialize + IpldReferences + Sync, { @@ -109,7 +196,7 @@ where } /// Saves raw bytes to a chosen store and returns the `Cid` to it. - pub async fn put_bytes_into( + async fn put_bytes_into( &self, bytes: impl AsyncRead + Send + Sync, choice: Choice, @@ -121,7 +208,7 @@ where } /// Saves raw bytes as a single block to a chosen store and returns the `Cid` to it. - pub async fn put_raw_block_into( + async fn put_raw_block_into( &self, bytes: impl Into + Send, choice: Choice, @@ -133,7 +220,7 @@ where } /// Checks if a block exists in a chosen store by its `Cid`. - pub async fn has_from(&self, cid: &Cid, choice: Choice) -> bool { + async fn has_in(&self, cid: &Cid, choice: Choice) -> bool { match choice { Choice::A => self.store_a.has(cid).await, Choice::B => self.store_b.has(cid).await, @@ -161,50 +248,50 @@ where A: IpldStore + Sync, B: IpldStore + Sync, { - async fn put_node(&self, data: &T) -> StoreResult + async fn put_node(&self, node: &T) -> StoreResult where T: Serialize + IpldReferences + Sync, { - self.put_node_into(data, self.config.default).await + self.put_node_into(node, self.config.write_to).await } async fn put_bytes(&self, bytes: impl AsyncRead + Send + Sync) -> StoreResult { - self.put_bytes_into(bytes, self.config.default).await + self.put_bytes_into(bytes, self.config.write_to).await } async fn get_node(&self, cid: &Cid) -> StoreResult where D: DeserializeOwned + Send, { - match self.get_node_from(cid, self.config.default).await { + match self.get_node_in(cid, self.config.read_from).await { Ok(data) => Ok(data), Err(StoreError::BlockNotFound(_)) => { - let choice = self.config.default.other(); - self.get_node_from(cid, choice).await + let choice = self.config.read_from.other(); + self.get_node_in(cid, choice).await } Err(err) => Err(err), } } async fn get_bytes(&self, cid: &Cid) -> StoreResult>> { - match self.get_bytes_from(cid, self.config.default).await { + match self.get_bytes_in(cid, self.config.read_from).await { Ok(bytes) => Ok(bytes), Err(StoreError::BlockNotFound(_)) => { - let choice = self.config.default.other(); - self.get_bytes_from(cid, choice).await + let choice = self.config.read_from.other(); + self.get_bytes_in(cid, choice).await } Err(err) => Err(err), } } async fn get_bytes_size(&self, cid: &Cid) -> StoreResult { - self.get_bytes_size_from(cid, self.config.default).await + self.get_bytes_size_from(cid, self.config.read_from).await } async fn has(&self, cid: &Cid) -> bool { - match self.has_from(cid, self.config.default).await { + match self.has_in(cid, self.config.read_from).await { true => true, - false => self.has_from(cid, self.config.default.other()).await, + false => self.has_in(cid, self.config.read_from.other()).await, } } @@ -236,14 +323,14 @@ where B: IpldStore + Sync, { async fn put_raw_block(&self, bytes: impl Into + Send) -> StoreResult { - self.put_raw_block_into(bytes, self.config.default).await + self.put_raw_block_into(bytes, self.config.write_to).await } async fn get_raw_block(&self, cid: &Cid) -> StoreResult { - match self.get_raw_block_from(cid, self.config.default).await { + match self.get_raw_block_from(cid, self.config.read_from).await { Ok(bytes) => Ok(bytes), Err(StoreError::BlockNotFound(_)) => { - let choice = self.config.default.other(); + let choice = self.config.read_from.other(); self.get_raw_block_from(cid, choice).await } Err(err) => Err(err), @@ -259,7 +346,10 @@ where impl Default for DualStoreConfig { fn default() -> Self { - Self { default: Choice::A } + Self { + write_to: Choice::A, + read_from: Choice::A, + } } } @@ -269,25 +359,167 @@ impl Default for DualStoreConfig { #[cfg(test)] mod tests { + use tokio::io::AsyncReadExt; + use crate::MemoryStore; use super::*; #[tokio::test] - async fn test_dual_store_put_and_get() -> anyhow::Result<()> { + async fn test_dual_store_default_config() -> anyhow::Result<()> { + let store_a = MemoryStore::default(); + let store_b = MemoryStore::default(); + let dual_store = DualStore::new(store_a.clone(), store_b.clone(), Default::default()); + + // Test that data is written to store A by default + let cid = dual_store.put_node(&"test data").await?; + assert!(store_a.has(&cid).await); + assert!(!store_b.has(&cid).await); + + // Test that data is read from store A by default + assert_eq!(dual_store.get_node::(&cid).await?, "test data"); + + Ok(()) + } + + #[tokio::test] + async fn test_dual_store_basic_operations() -> anyhow::Result<()> { + let store_a = MemoryStore::default(); + let store_b = MemoryStore::default(); + let dual_store = DualStore::new(store_a.clone(), store_b.clone(), Default::default()); + + // Test putting and getting data + let cid = dual_store.put_node(&"test data").await?; + assert_eq!(dual_store.get_node::(&cid).await?, "test data"); + + // Verify data is in store A (default write_to) but not in store B + assert!(store_a.has(&cid).await); + assert!(!store_b.has(&cid).await); + + Ok(()) + } + + #[tokio::test] + async fn test_dual_store_read_fallback() -> anyhow::Result<()> { + let store_a = MemoryStore::default(); + let store_b = MemoryStore::default(); + + // Configure to read from A first, write to B + let config = DualStoreConfig { + read_from: Choice::A, + write_to: Choice::B, + }; + let dual_store = DualStore::new(store_a.clone(), store_b.clone(), config); + + // Write data - should go to store B + let cid = dual_store.put_node(&"fallback test").await?; + + // Data should be readable even though it's not in the primary read store + assert_eq!(dual_store.get_node::(&cid).await?, "fallback test"); + + Ok(()) + } + + #[tokio::test] + async fn test_dual_store_different_configurations() -> anyhow::Result<()> { let store_a = MemoryStore::default(); let store_b = MemoryStore::default(); - let dual_store = DualStore::new(store_a, store_b, Default::default()); - let cid_0 = dual_store.put_node_into(&"hello", Choice::A).await?; - let cid_1 = dual_store.put_node_into(&250, Choice::B).await?; - let cid_2 = dual_store.put_node_into(&"world", Choice::A).await?; - let cid_3 = dual_store.put_node_into(&500, Choice::B).await?; + // Test writing to A, reading from B + let config_1 = DualStoreConfig { + read_from: Choice::B, + write_to: Choice::A, + }; + let dual_store_1 = DualStore::new(store_a.clone(), store_b.clone(), config_1); + + // Test writing to B, reading from A + let config_2 = DualStoreConfig { + read_from: Choice::A, + write_to: Choice::B, + }; + let dual_store_2 = DualStore::new(store_a.clone(), store_b.clone(), config_2); + + // Write data using both configurations + let cid_1 = dual_store_1.put_node(&"data 1").await?; + let cid_2 = dual_store_2.put_node(&"data 2").await?; + + // Verify data location + assert!(store_a.has(&cid_1).await); + assert!(store_b.has(&cid_2).await); + + // Both should be readable from either dual store + assert_eq!(dual_store_1.get_node::(&cid_1).await?, "data 1"); + assert_eq!(dual_store_1.get_node::(&cid_2).await?, "data 2"); + assert_eq!(dual_store_2.get_node::(&cid_1).await?, "data 1"); + assert_eq!(dual_store_2.get_node::(&cid_2).await?, "data 2"); + + Ok(()) + } + + #[tokio::test] + async fn test_dual_store_raw_operations() -> anyhow::Result<()> { + let store_a = MemoryStore::default(); + let store_b = MemoryStore::default(); + let dual_store = DualStore::new(store_a.clone(), store_b.clone(), Default::default()); + + // Test raw block operations + let data = Bytes::from("raw test data"); + let cid = dual_store.put_raw_block(data.clone()).await?; + + let retrieved = dual_store.get_raw_block(&cid).await?; + assert_eq!(retrieved, data); + + Ok(()) + } + + #[tokio::test] + async fn test_dual_store_bytes_operations() -> anyhow::Result<()> { + let store_a = MemoryStore::default(); + let store_b = MemoryStore::default(); + let dual_store = DualStore::new(store_a.clone(), store_b.clone(), Default::default()); + + // Create test data + let test_data = b"test bytes data".to_vec(); + let reader = std::io::Cursor::new(test_data.clone()); + + // Store the data + let cid = dual_store.put_bytes(reader).await?; + + // Read the data back + let mut retrieved_data = Vec::new(); + let mut reader = dual_store.get_bytes(&cid).await?; + reader.read_to_end(&mut retrieved_data).await?; + + assert_eq!(retrieved_data, test_data); + + // Check size + let size = dual_store.get_bytes_size(&cid).await?; + assert_eq!(size as usize, test_data.len()); + + Ok(()) + } + + #[tokio::test] + async fn test_dual_store_metadata() -> anyhow::Result<()> { + let store_a = MemoryStore::default(); + let store_b = MemoryStore::default(); + let dual_store = DualStore::new(store_a.clone(), store_b.clone(), Default::default()); + + // Test initial state + assert!(dual_store.is_empty().await?); + assert_eq!(dual_store.get_block_count().await?, 0); + + // Add some data + dual_store.put_node(&"test data 1").await?; + dual_store.put_node(&"test data 2").await?; + + // Check updated state + assert!(!dual_store.is_empty().await?); + assert_eq!(dual_store.get_block_count().await?, 2); - assert_eq!(dual_store.get_node::(&cid_0).await?, "hello"); - assert_eq!(dual_store.get_node::(&cid_1).await?, 250); - assert_eq!(dual_store.get_node::(&cid_2).await?, "world"); - assert_eq!(dual_store.get_node::(&cid_3).await?, 500); + // Check supported codecs + let codecs = dual_store.get_supported_codecs().await; + assert!(!codecs.is_empty()); Ok(()) } diff --git a/monoutils-store/lib/implementations/stores/memstore.rs b/monoutils-store/lib/implementations/stores/memstore.rs index 0007fba..1442fc0 100644 --- a/monoutils-store/lib/implementations/stores/memstore.rs +++ b/monoutils-store/lib/implementations/stores/memstore.rs @@ -129,7 +129,7 @@ where /// Stores raw bytes in the store without any size checks. /// Returns a tuple of (Cid, bool) where the bool indicates if the data already existed in the store. async fn store_raw(&self, bytes: Bytes, codec: Codec) -> (Cid, bool) { - let cid = utils::make_cid(codec, &bytes); + let cid = utils::generate_cid(codec, &bytes); let mut blocks = self.blocks.write().await; let existed = blocks.contains_key(&cid); if !existed { @@ -149,12 +149,12 @@ where C: Chunker + Clone + Send + Sync + 'static, L: Layout + Clone + Send + Sync + 'static, { - async fn put_node(&self, data: &T) -> StoreResult + async fn put_node(&self, node: &T) -> StoreResult where T: Serialize + IpldReferences + Sync, { // Serialize the data to bytes. - let bytes = Bytes::from(serde_ipld_dagcbor::to_vec(&data).map_err(StoreError::custom)?); + let bytes = Bytes::from(serde_ipld_dagcbor::to_vec(&node).map_err(StoreError::custom)?); // Check if the data exceeds the node maximum block size. if let Some(max_size) = self.get_max_node_block_size().await? { @@ -167,7 +167,7 @@ where // Only increment reference counts if this is a new entry if !existed { - self.increment_reference_counts(data.get_references()).await; + self.increment_reference_counts(node.get_references()).await; } Ok(cid) diff --git a/monoutils-store/lib/store.rs b/monoutils-store/lib/store.rs index b5ed396..489d082 100644 --- a/monoutils-store/lib/store.rs +++ b/monoutils-store/lib/store.rs @@ -60,7 +60,7 @@ pub trait IpldStore: RawStore + Clone { /// /// ## Arguments /// - /// * `data` - The object to store + /// * `node` - The object to store /// /// ## Returns /// @@ -69,7 +69,7 @@ pub trait IpldStore: RawStore + Clone { /// ## Errors /// /// Returns `StoreError::NodeBlockTooLarge` if the serialized data exceeds the store's maximum node block size. - async fn put_node(&self, data: &T) -> StoreResult + async fn put_node(&self, node: &T) -> StoreResult where T: Serialize + IpldReferences + Sync; @@ -321,6 +321,15 @@ pub trait StoreSwitchable { fn change_store(self, new_store: U) -> Self::WithStore; } +/// A trait for stores that can be configured. +pub trait StoreConfig { + /// The type of the configuration. + type Config: Serialize + DeserializeOwned; + + /// Returns the configuration for the store. + fn get_config(&self) -> Self::Config; +} + //-------------------------------------------------------------------------------------------------- // Trait Implementations //-------------------------------------------------------------------------------------------------- @@ -334,7 +343,7 @@ impl TryFrom for Codec { 0x71 => Ok(Codec::DagCbor), 0x0129 => Ok(Codec::DagJson), 0x70 => Ok(Codec::DagPb), - _ => Err(StoreError::UnsupportedCodec(value)), + v => Ok(Codec::Unknown(v)), } } } diff --git a/monoutils-store/lib/utils.rs b/monoutils-store/lib/utils.rs index 9328371..9671520 100644 --- a/monoutils-store/lib/utils.rs +++ b/monoutils-store/lib/utils.rs @@ -5,7 +5,6 @@ use multihash_codetable::{Code, MultihashDigest}; use crate::Codec; - //-------------------------------------------------------------------------------------------------- // Functions //-------------------------------------------------------------------------------------------------- @@ -13,7 +12,7 @@ use crate::Codec; /// Hashes data with [Blake3-256][blake] and returns a new [`Cid`] to it. /// /// [blake]: https://en.wikipedia.org/wiki/BLAKE_(hash_function) -pub fn make_cid(codec: Codec, data: &[u8]) -> Cid { +pub fn generate_cid(codec: Codec, data: &[u8]) -> Cid { let digest = Code::Blake3_256.digest(data); Cid::new_v1(codec.into(), digest) }