Skip to content

Commit

Permalink
updated put_block and get_block to async
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsan6sha committed Nov 24, 2024
1 parent ea20556 commit 759683e
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 25 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wnfsutils"
version = "1.1.7"
version = "1.1.8"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand Down
9 changes: 6 additions & 3 deletions src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use bytes::Bytes;
use libipld::Cid;
use wnfs::common::{BlockStore, BlockStoreError};

#[async_trait(?Send)]
pub trait FFIStore<'a>: FFIStoreClone<'a> {
fn get_block(&self, cid: Vec<u8>) -> Result<Vec<u8>>;
fn put_block(&self, cid: Vec<u8>, bytes: Vec<u8>) -> Result<()>;
async fn get_block(&self, cid: Vec<u8>) -> Result<Vec<u8>>;
async fn put_block(&self, cid: Vec<u8>, bytes: Vec<u8>) -> Result<()>;
}

pub trait FFIStoreClone<'a> {
Expand Down Expand Up @@ -52,6 +53,7 @@ impl<'a> BlockStore for FFIFriendlyBlockStore<'a> {
let bytes = self
.ffi_store
.get_block(cid.to_bytes())
.await // Await the async method
.map_err(|_| BlockStoreError::CIDNotFound(*cid))?;
Ok(Bytes::copy_from_slice(&bytes))
}
Expand All @@ -67,7 +69,8 @@ impl<'a> BlockStore for FFIFriendlyBlockStore<'a> {
let cid = cid_res.unwrap();
let result = self
.ffi_store
.put_block(cid.to_owned().to_bytes(), data.to_vec());
.put_block(cid.to_owned().to_bytes(), data.to_vec())
.await; // Await the async method
match result {
Ok(_) => Ok(cid.to_owned()),
Err(e) => Err(e),
Expand Down
60 changes: 39 additions & 21 deletions src/kvstore.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use kv::*;

use anyhow::Result;
use libipld::Cid;

use wnfs::common::BlockStoreError;

use crate::blockstore::FFIStore;
use async_trait::async_trait;

#[derive(Clone)]
pub struct KVBlockStore {
Expand All @@ -29,29 +27,49 @@ impl KVBlockStore {
}
}

#[async_trait(?Send)]
impl<'a> FFIStore<'a> for KVBlockStore {
/// Retrieves an array of bytes from the block store with given CID.
fn get_block(&self, cid: Vec<u8>) -> Result<Vec<u8>> {
// A Bucket provides typed access to a section of the key/value store
let bucket = self.store.bucket::<Raw, Raw>(Some("default"))?;

let bytes = bucket
.get(&Raw::from(cid.to_owned()))
.map_err(|_| BlockStoreError::CIDNotFound(Cid::try_from(cid).unwrap()))?
.unwrap()
.to_vec();
Ok(bytes)
async fn get_block(&self, cid: Vec<u8>) -> Result<Vec<u8>> {
// Offload the blocking operation to a separate thread
let store = self.store.clone();
let cid_clone = cid.clone(); // Clone cid for use in the closure

let result = tokio::task::spawn_blocking(move || {
// Perform the blocking operation
let bucket = store.bucket::<Raw, Raw>(Some("default"))?;
let bytes = bucket
.get(&Raw::from(cid_clone.clone())) // Clone cid_clone here
.map_err(|_| BlockStoreError::CIDNotFound(Cid::try_from(cid_clone.clone()).unwrap()))?
.ok_or_else(|| BlockStoreError::CIDNotFound(Cid::try_from(cid_clone.clone()).unwrap()))?
.to_vec();
Ok::<Vec<u8>, anyhow::Error>(bytes)
})
.await;

// Handle errors from spawn_blocking and return the result
result.map_err(|e| anyhow::Error::msg(format!("Failed to retrieve block: {:?}", e)))?
}

/// Stores an array of bytes in the block store.
fn put_block(&self, cid: Vec<u8>, bytes: Vec<u8>) -> Result<()> {
let key = Raw::from(cid.to_owned());
let value = Raw::from(bytes);
async fn put_block(&self, cid: Vec<u8>, bytes: Vec<u8>) -> Result<()> {
// Offload the blocking operation to a separate thread
let store = self.store.clone();
let cid_clone = cid.clone(); // Clone cid for use in the closure
let bytes_clone = bytes.clone(); // Clone bytes for use in the closure

let result = tokio::task::spawn_blocking(move || {
// Perform the blocking operation
let bucket = store.bucket::<Raw, Raw>(Some("default"))?;
let key = Raw::from(cid_clone);
let value = Raw::from(bytes_clone);

// A Bucket provides typed access to a section of the key/value store
let bucket = self.store.bucket::<Raw, Raw>(Some("default"))?;
bucket.set(&key, &value)?;
Ok::<(), anyhow::Error>(())
})
.await;

bucket.set(&key, &value)?;
Ok(())
// Handle errors from spawn_blocking and return the result
result.map_err(|e| anyhow::Error::msg(format!("Failed to store block: {:?}", e)))?
}
}
}

0 comments on commit 759683e

Please sign in to comment.