Skip to content

Commit

Permalink
feat: Add LayeredFsStore and improve store implementations (#130)
Browse files Browse the repository at this point in the history
* feat: Add LayeredFsStore and improve store implementations

- Add LayeredFsStore to support layered storage with a write store and base store
- Add getters to store implementations using getset derive macro
- Add flush capability to MemoryBufferStore to persist buffered data
- Add IPLD reference handling for DagCbor blocks during flush
- Add IpldReferences implementation for Ipld type
- Improve documentation for store traits and implementations
- Add tests for MemoryBufferStore flush functionality

The main addition is the LayeredFsStore which provides a two-layer storage
approach with a writable layer on top of a read-only base layer. This enables
scenarios like overlaying temporary changes on a permanent store.

The MemoryBufferStore improvements allow proper flushing of buffered data
while preserving IPLD references, which is important for maintaining data
integrity when moving blocks between stores.

* refactor(store): improve LayeredFsStore and MemoryBufferStore implementations

- Rename LayeredStore to LayeredFsStore for clarity
- Add comprehensive documentation for LayeredFsStore explaining the two-layer
  architecture and read/write behaviors
- Enhance MemoryBufferStore documentation with detailed write-through caching
  pattern explanation
- Rename internal variables in MemoryBufferStore for better clarity
  (backup_store -> underlying_store)
- Add extensive test coverage for MemoryBufferStore node handling
- Create TestNode helper struct for improved testing scenarios
  • Loading branch information
appcypher authored Feb 6, 2025
1 parent 6ea0719 commit bf7faf0
Show file tree
Hide file tree
Showing 12 changed files with 525 additions and 153 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions monofs/lib/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::{
error::Error,
fmt::{self, Display},
io,
convert::Infallible, error::Error, fmt::{self, Display}, io
};

use monoutils_store::ipld::cid::Cid;
Expand Down Expand Up @@ -187,6 +185,10 @@ pub enum FsError {
/// An error that occurred when a migration error occurred
#[error("migration error: {0}")]
MigrationError(#[from] sqlx::migrate::MigrateError),

/// An error that occurred when a CBOR decode error occurred
#[error("CBOR decode error: {0}")]
CborDecodeError(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
}

/// An error that can represent any error.
Expand Down
14 changes: 11 additions & 3 deletions monofs/lib/store/flatfsstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{collections::HashSet, path::PathBuf, pin::Pin};
use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use getset::Getters;
use monoutils::SeekableReader;
use monoutils_store::ipld::codec::Links;
use monoutils_store::{ipld::cid::Cid, FastCDCChunker, FixedSizeChunker};
Expand Down Expand Up @@ -104,23 +105,30 @@ pub enum DirLevels {
/// The store uses a configurable chunking strategy to split data into smaller blocks. The chunker
/// is configurable via the `chunker` field. The layout strategy is configurable via the `layout`
/// field.
#[derive(Debug, Clone, TypedBuilder)]
#[derive(Debug, Clone, TypedBuilder, Getters)]
#[getset(get = "pub with_prefix")]
pub struct FlatFsStoreImpl<C = FastCDCChunker, L = FlatLayout>
where
C: Chunker + Default,
L: Layout + Default,
{
/// The root path for the store.
#[builder(setter(into))]
path: PathBuf,

/// The number of directory levels to use for organizing blocks.
#[builder(default)]
dir_levels: DirLevels,

#[builder(default = Arc::new(C::default()))]
/// The chunking algorithm used to split data into chunks.
#[builder(default)]
chunker: Arc<C>,

#[builder(default = Arc::new(L::default()))]
/// The layout strategy used to store chunked data.
#[builder(default)]
layout: Arc<L>,

/// Whether to enable reference counting for garbage collection.
#[builder(default = true)]
enable_refcount: bool,
}
Expand Down
149 changes: 149 additions & 0 deletions monofs/lib/store/layeredfsstore.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use std::{collections::HashSet, path::PathBuf, pin::Pin};

use async_trait::async_trait;
use bytes::Bytes;
use monoutils_store::{
ipld::cid::Cid, Codec, DualStore, DualStoreConfig, IpldReferences, IpldStore, RawStore,
StoreResult,
};
use serde::{de::DeserializeOwned, Serialize};
use tokio::io::AsyncRead;

use super::FlatFsStore;

//--------------------------------------------------------------------------------------------------
// Types
//--------------------------------------------------------------------------------------------------

/// An [`IpldStore`] implementation that provides a layered filesystem storage approach with a write layer
/// on top of a read-only base layer.
///
/// ## Architecture
/// The store consists of two layers:
/// 1. Write Layer: A mutable [`FlatFsStore`] where all new writes are directed, with reference counting enabled
/// 2. Base Layer: An immutable [`FlatFsStore`] that serves as a read-only foundation, with reference counting disabled
///
/// ## Read Behavior
/// When reading data (via `get_node`, `get_bytes`, etc.), the store:
/// 1. First checks the write layer for the requested data
/// 2. If not found, falls back to checking the base layer
/// 3. Returns the data from whichever layer it was found in first
///
/// ## Write Behavior
/// - All writes are directed exclusively to the write layer
/// - The base layer remains completely immutable
/// - Reference counting is only enabled in the write layer
/// - The base layer has reference counting explicitly disabled for optimal read-only performance
///
/// ## Example
/// ```ignore
/// let store = LayeredFsStore::new(
/// "path/to/write/layer", // Where new writes go (reference counting enabled)
/// "path/to/base/layer", // Read-only foundation (reference counting disabled)
/// );
///
/// // All writes go to the write layer
/// let cid = store.put_node(&my_data).await?;
///
/// // Reads check write layer first, then base layer
/// let data = store.get_node(&cid).await?;
/// ```
///
/// ## Implementation Notes
/// - The base layer has reference counting explicitly disabled for performance
/// - The write layer maintains its own reference counts independently
/// - Both layers use the [`FlatFsStore`] implementation but with different configurations
/// - The base layer's reference counting is disabled.
#[derive(Debug, Clone)]
pub struct LayeredFsStore {
inner: DualStore<
// Write store - mutable layer for new writes
FlatFsStore,
// Base store - immutable foundation layer
FlatFsStore,
>,
}

//--------------------------------------------------------------------------------------------------
// Methods
//--------------------------------------------------------------------------------------------------

impl LayeredFsStore {
/// Creates a new `LayeredFsStore` with separate paths for the write and base layers.
pub fn new(write_store_path: impl Into<PathBuf>, base_store_path: impl Into<PathBuf>) -> Self {
Self {
inner: DualStore::new(
FlatFsStore::new(write_store_path),
FlatFsStore::builder()
.path(base_store_path)
.enable_refcount(false)
.build(),
DualStoreConfig::default(),
),
}
}
}

//--------------------------------------------------------------------------------------------------
// Trait Implementations
//--------------------------------------------------------------------------------------------------

#[async_trait]
impl IpldStore for LayeredFsStore {
async fn put_node<T>(&self, data: &T) -> StoreResult<Cid>
where
T: Serialize + IpldReferences + Sync,
{
self.inner.put_node(data).await
}

async fn put_bytes(&self, reader: impl AsyncRead + Send + Sync) -> StoreResult<Cid> {
self.inner.put_bytes(reader).await
}

async fn get_node<T>(&self, cid: &Cid) -> StoreResult<T>
where
T: DeserializeOwned + Send,
{
self.inner.get_node(cid).await
}

async fn get_bytes(&self, cid: &Cid) -> StoreResult<Pin<Box<dyn AsyncRead + Send>>> {
self.inner.get_bytes(cid).await
}

async fn get_bytes_size(&self, cid: &Cid) -> StoreResult<u64> {
self.inner.get_bytes_size(cid).await
}

async fn has(&self, cid: &Cid) -> bool {
self.inner.has(cid).await
}

async fn get_supported_codecs(&self) -> HashSet<Codec> {
self.inner.get_supported_codecs().await
}

async fn get_max_node_block_size(&self) -> StoreResult<Option<u64>> {
self.inner.get_max_node_block_size().await
}

async fn get_block_count(&self) -> StoreResult<u64> {
self.inner.get_block_count().await
}
}

#[async_trait]
impl RawStore for LayeredFsStore {
async fn put_raw_block(&self, bytes: impl Into<Bytes> + Send) -> StoreResult<Cid> {
self.inner.put_raw_block(bytes).await
}

async fn get_raw_block(&self, cid: &Cid) -> StoreResult<Bytes> {
self.inner.get_raw_block(cid).await
}

async fn get_max_raw_block_size(&self) -> StoreResult<Option<u64>> {
self.inner.get_max_raw_block_size().await
}
}
120 changes: 0 additions & 120 deletions monofs/lib/store/layeredstore.rs

This file was deleted.

Loading

0 comments on commit bf7faf0

Please sign in to comment.