From 695ec9ad282374fe180c18c4999446d0da71cc65 Mon Sep 17 00:00:00 2001 From: luofucong Date: Fri, 17 Jan 2025 11:42:44 +0800 Subject: [PATCH 1/5] fix: `Handle`'s sender got dropped early --- src/producer/fetch.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/producer/fetch.rs b/src/producer/fetch.rs index 5b77522..775e48d 100644 --- a/src/producer/fetch.rs +++ b/src/producer/fetch.rs @@ -78,7 +78,6 @@ impl ResultFetcher { }; fetcher.inner.normal.store(false, Ordering::Relaxed); - fetcher.inner.subscriptions.clear(); sleep(fetcher.inner.retry_interval).await; } }); From b4ea828bbb29b45061e6f01ace26f86bfc3f2834 Mon Sep 17 00:00:00 2001 From: luofucong Date: Fri, 17 Jan 2025 16:39:28 +0800 Subject: [PATCH 2/5] decode the raw data bytes early to free the space in the underlying ringbuf --- examples/consumer.rs | 24 +++++++++++++++++++----- src/consumer.rs | 18 ++++++++++++++++-- src/consumer/process.rs | 9 ++++++++- src/error.rs | 1 + tests/common.rs | 27 ++++++++++++++++++++------- 5 files changed, 64 insertions(+), 15 deletions(-) diff --git a/examples/consumer.rs b/examples/consumer.rs index 2563cbb..3bb4523 100644 --- a/examples/consumer.rs +++ b/examples/consumer.rs @@ -1,4 +1,4 @@ -use std::str::from_utf8; +use std::fmt::{Display, Formatter}; use std::time::Duration; use shm_ringbuf::consumer::process::{DataProcess, ResultSender}; @@ -23,8 +23,15 @@ async fn main() { pub struct StringPrint; impl DataProcess for StringPrint { - async fn process(&self, data: &[u8], result_sender: ResultSender) { - if let Err(e) = self.do_process(data).await { + type Message = String; + type Error = Error; + + fn decode(&self, data: &[u8]) -> Result { + String::from_utf8(data.to_vec()).map_err(|_| Error::DecodeError) + } + + async fn process(&self, msg: Self::Message, result_sender: ResultSender) { + if let Err(e) = self.do_process(&msg).await { result_sender.push_result(e).await; } else { result_sender.push_ok().await; @@ -33,8 +40,7 @@ impl DataProcess for StringPrint { } impl StringPrint { - async fn do_process(&self, data: &[u8]) -> Result<(), Error> { - let msg = from_utf8(data).map_err(|_| Error::DecodeError)?; + async fn do_process(&self, msg: &str) -> Result<(), Error> { info!("receive: {}", msg); Ok(()) } @@ -70,3 +76,11 @@ impl From for DataProcessResult { } } } + +impl Display for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.message()) + } +} + +impl std::error::Error for Error {} diff --git a/src/consumer.rs b/src/consumer.rs index 3ab839a..41d64f1 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -19,8 +19,8 @@ use tokio::time::sleep; use tokio_util::sync::CancellationToken; use tracing::warn; -use crate::error::DataProcessResult; use crate::error::CHECKSUM_MISMATCH; +use crate::error::{DataProcessResult, DECODE_ERROR}; use crate::fd_pass::FdRecvServer; use crate::grpc::proto::shm_control_server::ShmControlServer; use crate::grpc::server::ShmCtlHandler; @@ -231,8 +231,22 @@ where session: session.clone(), }; - processor.process(data_slice, result_sender).await; + let result = processor.decode(data_slice); unsafe { ringbuf.advance_consume_offset(data_block.total_len()) } + drop(data_block); + + match result { + Ok(message) => { + processor.process(message, result_sender).await; + } + Err(e) => { + let result = DataProcessResult { + status_code: DECODE_ERROR, + message: e.to_string(), + }; + result_sender.push_result(result).await; + } + }; } } diff --git a/src/consumer/process.rs b/src/consumer/process.rs index 378c38b..93dc1a8 100644 --- a/src/consumer/process.rs +++ b/src/consumer/process.rs @@ -1,3 +1,4 @@ +use std::error::Error; use std::fmt::Debug; use std::future::Future; @@ -6,9 +7,15 @@ use crate::error::DataProcessResult; use super::session_manager::SessionRef; pub trait DataProcess: Send + Sync { + type Message: Debug; + + type Error: Error; + + fn decode(&self, data: &[u8]) -> Result; + fn process( &self, - data: &[u8], + message: Self::Message, result_sender: ResultSender, ) -> impl Future; } diff --git a/src/error.rs b/src/error.rs index 94a78b8..371b51b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -194,3 +194,4 @@ impl DataProcessResult { /// 100000 - 200000 are error codes used internally by shm-ringbuf and should /// not be used as business codes. pub const CHECKSUM_MISMATCH: u32 = 100000; +pub const DECODE_ERROR: u32 = 10001; diff --git a/tests/common.rs b/tests/common.rs index 0a57193..9407565 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -1,10 +1,10 @@ -use std::{str::from_utf8, sync::Arc, time::Duration}; - use shm_ringbuf::{ consumer::process::{DataProcess, ResultSender}, error::DataProcessResult, producer::{prealloc::PreAlloc, RingbufProducer}, }; +use std::fmt::{Display, Formatter}; +use std::{sync::Arc, time::Duration}; use tokio::{sync::mpsc::Sender, time::sleep}; use tracing::{error, warn}; @@ -13,8 +13,15 @@ pub struct MsgForward { } impl DataProcess for MsgForward { - async fn process(&self, data: &[u8], result_sender: ResultSender) { - if let Err(e) = self.do_process(data).await { + type Message = String; + type Error = Error; + + fn decode(&self, data: &[u8]) -> Result { + String::from_utf8(data.to_vec()).map_err(|_| Error::DecodeError) + } + + async fn process(&self, msg: Self::Message, result_sender: ResultSender) { + if let Err(e) = self.do_process(&msg).await { result_sender.push_result(e).await; } else { result_sender.push_ok().await; @@ -23,9 +30,7 @@ impl DataProcess for MsgForward { } impl MsgForward { - async fn do_process(&self, data: &[u8]) -> Result<(), Error> { - let msg = from_utf8(data).map_err(|_| Error::DecodeError)?; - + async fn do_process(&self, msg: &str) -> Result<(), Error> { let _ = self.sender.send(msg.to_string()).await; Ok(()) @@ -63,6 +68,14 @@ impl From for DataProcessResult { } } +impl Display for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.message()) + } +} + +impl std::error::Error for Error {} + pub fn msg_num() -> usize { std::env::var("MSG_NUM") .unwrap_or_else(|_| "10000".to_string()) From b53254307e262a545b7287645e28b2f33ef6fc4b Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Mon, 17 Feb 2025 19:27:43 +0800 Subject: [PATCH 3/5] Update src/error.rs Co-authored-by: fys <40801205+fengys1996@users.noreply.github.com> --- src/error.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/error.rs b/src/error.rs index 371b51b..d7621fa 100644 --- a/src/error.rs +++ b/src/error.rs @@ -194,4 +194,4 @@ impl DataProcessResult { /// 100000 - 200000 are error codes used internally by shm-ringbuf and should /// not be used as business codes. pub const CHECKSUM_MISMATCH: u32 = 100000; -pub const DECODE_ERROR: u32 = 10001; +pub const DECODE_ERROR: u32 = 100001; From 3aca69f329019e3dbb7f976aa71d8fcfe72df1da Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Mon, 17 Feb 2025 19:31:23 +0800 Subject: [PATCH 4/5] Update src/consumer/process.rs --- src/consumer/process.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/consumer/process.rs b/src/consumer/process.rs index 93dc1a8..abc6c00 100644 --- a/src/consumer/process.rs +++ b/src/consumer/process.rs @@ -7,7 +7,7 @@ use crate::error::DataProcessResult; use super::session_manager::SessionRef; pub trait DataProcess: Send + Sync { - type Message: Debug; + type Message; type Error: Error; From f5709c5c72139361e2f3986e5af8cf521c6f5e10 Mon Sep 17 00:00:00 2001 From: luofucong Date: Tue, 18 Feb 2025 09:07:17 +0800 Subject: [PATCH 5/5] revert --- src/producer/fetch.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/producer/fetch.rs b/src/producer/fetch.rs index 775e48d..5b77522 100644 --- a/src/producer/fetch.rs +++ b/src/producer/fetch.rs @@ -78,6 +78,7 @@ impl ResultFetcher { }; fetcher.inner.normal.store(false, Ordering::Relaxed); + fetcher.inner.subscriptions.clear(); sleep(fetcher.inner.retry_interval).await; } });