From 759683e78f7daa891a03ee6e0fab873bc4fe71b9 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Sun, 24 Nov 2024 15:46:59 -0500 Subject: [PATCH] updated put_block and get_block to async --- Cargo.toml | 2 +- src/blockstore.rs | 9 ++++--- src/kvstore.rs | 60 ++++++++++++++++++++++++++++++----------------- 3 files changed, 46 insertions(+), 25 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b0b6f1f..66bf014 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "wnfsutils" -version = "1.1.7" +version = "1.1.8" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/src/blockstore.rs b/src/blockstore.rs index b7b82aa..08e6c8c 100644 --- a/src/blockstore.rs +++ b/src/blockstore.rs @@ -5,9 +5,10 @@ use bytes::Bytes; use libipld::Cid; use wnfs::common::{BlockStore, BlockStoreError}; +#[async_trait(?Send)] pub trait FFIStore<'a>: FFIStoreClone<'a> { - fn get_block(&self, cid: Vec) -> Result>; - fn put_block(&self, cid: Vec, bytes: Vec) -> Result<()>; + async fn get_block(&self, cid: Vec) -> Result>; + async fn put_block(&self, cid: Vec, bytes: Vec) -> Result<()>; } pub trait FFIStoreClone<'a> { @@ -52,6 +53,7 @@ impl<'a> BlockStore for FFIFriendlyBlockStore<'a> { let bytes = self .ffi_store .get_block(cid.to_bytes()) + .await // Await the async method .map_err(|_| BlockStoreError::CIDNotFound(*cid))?; Ok(Bytes::copy_from_slice(&bytes)) } @@ -67,7 +69,8 @@ impl<'a> BlockStore for FFIFriendlyBlockStore<'a> { let cid = cid_res.unwrap(); let result = self .ffi_store - .put_block(cid.to_owned().to_bytes(), data.to_vec()); + .put_block(cid.to_owned().to_bytes(), data.to_vec()) + .await; // Await the async method match result { Ok(_) => Ok(cid.to_owned()), Err(e) => Err(e), diff --git a/src/kvstore.rs b/src/kvstore.rs index 849c9de..d711103 100644 --- a/src/kvstore.rs +++ b/src/kvstore.rs @@ -1,11 +1,9 @@ use kv::*; - use anyhow::Result; use libipld::Cid; - use wnfs::common::BlockStoreError; - use crate::blockstore::FFIStore; +use async_trait::async_trait; #[derive(Clone)] pub struct KVBlockStore { @@ -29,29 +27,49 @@ impl KVBlockStore { } } +#[async_trait(?Send)] impl<'a> FFIStore<'a> for KVBlockStore { /// Retrieves an array of bytes from the block store with given CID. - fn get_block(&self, cid: Vec) -> Result> { - // A Bucket provides typed access to a section of the key/value store - let bucket = self.store.bucket::(Some("default"))?; - - let bytes = bucket - .get(&Raw::from(cid.to_owned())) - .map_err(|_| BlockStoreError::CIDNotFound(Cid::try_from(cid).unwrap()))? - .unwrap() - .to_vec(); - Ok(bytes) + async fn get_block(&self, cid: Vec) -> Result> { + // Offload the blocking operation to a separate thread + let store = self.store.clone(); + let cid_clone = cid.clone(); // Clone cid for use in the closure + + let result = tokio::task::spawn_blocking(move || { + // Perform the blocking operation + let bucket = store.bucket::(Some("default"))?; + let bytes = bucket + .get(&Raw::from(cid_clone.clone())) // Clone cid_clone here + .map_err(|_| BlockStoreError::CIDNotFound(Cid::try_from(cid_clone.clone()).unwrap()))? + .ok_or_else(|| BlockStoreError::CIDNotFound(Cid::try_from(cid_clone.clone()).unwrap()))? + .to_vec(); + Ok::, anyhow::Error>(bytes) + }) + .await; + + // Handle errors from spawn_blocking and return the result + result.map_err(|e| anyhow::Error::msg(format!("Failed to retrieve block: {:?}", e)))? } /// Stores an array of bytes in the block store. - fn put_block(&self, cid: Vec, bytes: Vec) -> Result<()> { - let key = Raw::from(cid.to_owned()); - let value = Raw::from(bytes); + async fn put_block(&self, cid: Vec, bytes: Vec) -> Result<()> { + // Offload the blocking operation to a separate thread + let store = self.store.clone(); + let cid_clone = cid.clone(); // Clone cid for use in the closure + let bytes_clone = bytes.clone(); // Clone bytes for use in the closure + + let result = tokio::task::spawn_blocking(move || { + // Perform the blocking operation + let bucket = store.bucket::(Some("default"))?; + let key = Raw::from(cid_clone); + let value = Raw::from(bytes_clone); - // A Bucket provides typed access to a section of the key/value store - let bucket = self.store.bucket::(Some("default"))?; + bucket.set(&key, &value)?; + Ok::<(), anyhow::Error>(()) + }) + .await; - bucket.set(&key, &value)?; - Ok(()) + // Handle errors from spawn_blocking and return the result + result.map_err(|e| anyhow::Error::msg(format!("Failed to store block: {:?}", e)))? } -} +} \ No newline at end of file