diff --git a/justfile b/justfile index 138015a..64a3cc9 100644 --- a/justfile +++ b/justfile @@ -11,3 +11,9 @@ fmt: # Run clippy fix and rustfmt afterwards fix *args: && fmt cd {{invocation_directory()}}; cargo clippy --fix --all-targets --all-features {{args}} + +# run cargo clippy, denying warnings +lint: + cd {{invocation_directory()}}; cargo clippy --all-targets --all-features -- -D warnings + + diff --git a/src/ingress.rs b/src/ingress.rs index f4a704f..4ba9a8a 100644 --- a/src/ingress.rs +++ b/src/ingress.rs @@ -214,7 +214,7 @@ where In: Send + Sync + fmt::Debug + 'static, Out: Send + Sync + fmt::Debug + 'static, { - fn init(&self, join_set: &mut JoinSet<()>) -> UnboundedSender> { + fn init(&mut self, join_set: &mut JoinSet<()>) -> UnboundedSender> { self.init_notification_processor_with_handle(join_set) } @@ -261,7 +261,7 @@ mod tests { let mut join_set = JoinSet::new(); let notification_manager: NotificationManager = - NotificationManager::new(&[&nw_adapter], &mut join_set); + NotificationManager::new(vec![Box::new(nw_adapter)], &mut join_set); let notification_tx = notification_manager.init(&mut join_set); let unknown_packet = OutPacket(b"unknown_packet".to_vec()); @@ -295,7 +295,7 @@ mod tests { let mut join_set = JoinSet::new(); let notification_manager: NotificationManager = - NotificationManager::new(&[&nw_adapter], &mut join_set); + NotificationManager::new(vec![Box::new(nw_adapter)], &mut join_set); let _notification_tx = notification_manager.init(&mut join_set); // An unknown packet should be unrouteable diff --git a/src/manager.rs b/src/manager.rs index 0577150..3c4ec3b 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -249,13 +249,8 @@ where .with_tick_rate(self.tick_rate.unwrap_or(timeout::DEFAULT_TICK_RATE)); self.notification_processors.push(Box::new(timeout_manager)); } - let processors: Vec<&dyn NotificationProcessor> = self - .notification_processors - .iter() - .map(|processor| processor.as_ref()) - .collect(); - let notification_manager = NotificationManager::new(processors.as_slice(), join_set); + let notification_manager = NotificationManager::new(self.notification_processors, join_set); let notification_queue: UnboundedSender> = notification_manager.init(join_set); @@ -297,10 +292,17 @@ where signal_queue: Arc>, notification_queue: UnboundedSender>, ) -> Self { + let sm_count = state_machines.len(); let state_machines: HashMap> = state_machines .into_iter() .map(|sm| (sm.get_kind(), sm)) .collect(); + assert_eq!( + sm_count, + state_machines.len(), + "multiple state machines using the same kind, SMs: {sm_count}, Kinds: {}", + state_machines.len(), + ); Self { signal_queue, notification_queue, diff --git a/src/notification.rs b/src/notification.rs index 9a6928f..ec86887 100644 --- a/src/notification.rs +++ b/src/notification.rs @@ -91,10 +91,13 @@ impl NotificationManager where M: RexMessage, { - pub fn new(processors: &[&dyn NotificationProcessor], join_set: &mut JoinSet<()>) -> Self { + pub fn new( + processors: Vec>>, + join_set: &mut JoinSet<()>, + ) -> Self { let processors: HashMap>>> = processors - .iter() - .fold(HashMap::new(), |mut subscribers, processor| { + .into_iter() + .fold(HashMap::new(), |mut subscribers, mut processor| { let subscriber_tx = processor.init(join_set); for topic in processor.get_topics() { subscribers @@ -142,7 +145,7 @@ pub trait NotificationProcessor: Send + Sync where M: RexMessage, { - fn init(&self, join_set: &mut JoinSet<()>) -> UnboundedSender>; + fn init(&mut self, join_set: &mut JoinSet<()>) -> UnboundedSender>; fn get_topics(&self) -> &[M::Topic]; } @@ -228,10 +231,14 @@ mod tests { use crate::timeout::*; let timeout_manager = TimeoutManager::test_default(); + let sq1 = timeout_manager.signal_queue.clone(); let timeout_manager_two = TimeoutManager::test_default(); + let sq2 = timeout_manager_two.signal_queue.clone(); let mut join_set = JoinSet::new(); - let notification_manager: NotificationManager = - NotificationManager::new(&[&timeout_manager, &timeout_manager_two], &mut join_set); + let notification_manager: NotificationManager = NotificationManager::new( + vec![Box::new(timeout_manager), Box::new(timeout_manager_two)], + &mut join_set, + ); let notification_tx = notification_manager.init(&mut join_set); let test_id = StateId::new_with_u128(TestKind, 1); @@ -243,14 +250,8 @@ mod tests { tokio::time::sleep(Duration::from_millis(10)).await; - let timeout_one = timeout_manager - .signal_queue - .pop_front() - .expect("timeout one"); - let timeout_two = timeout_manager_two - .signal_queue - .pop_front() - .expect("timeout two"); + let timeout_one = sq1.pop_front().expect("timeout one"); + let timeout_two = sq2.pop_front().expect("timeout two"); assert_eq!(timeout_one.id, timeout_two.id); } } diff --git a/src/timeout.rs b/src/timeout.rs index 9c19388..d899e32 100644 --- a/src/timeout.rs +++ b/src/timeout.rs @@ -7,6 +7,7 @@ use std::{ time::Duration, }; +use bigerror::attachment::DisplayDuration; use parking_lot::Mutex; use tokio::{ sync::{mpsc, mpsc::UnboundedSender}, @@ -29,19 +30,6 @@ pub trait TimeoutMessage: RexMessage + From> { pub const DEFAULT_TICK_RATE: Duration = Duration::from_millis(5); const SHORT_TIMEOUT: Duration = Duration::from_secs(10); -pub struct DisplayDuration(pub Duration); -impl std::fmt::Display for DisplayDuration { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", hms_string(self.0)) - } -} - -impl From for DisplayDuration { - fn from(duration: Duration) -> Self { - Self(duration) - } -} - /// convert a [`Duration`] into a "0H00m00s" string fn hms_string(duration: Duration) -> String { if duration.is_zero() { @@ -348,7 +336,7 @@ where K::Message: TryInto>, >>::Error: Send, { - fn init(&self, join_set: &mut JoinSet<()>) -> UnboundedSender> { + fn init(&mut self, join_set: &mut JoinSet<()>) -> UnboundedSender> { self.init_inner_with_handle(join_set) } @@ -382,7 +370,7 @@ mod tests { #[tokio::test] async fn timeout_to_signal() { - let timeout_manager = TimeoutManager::test_default(); + let mut timeout_manager = TimeoutManager::test_default(); let mut join_set = JoinSet::new(); let timeout_tx: UnboundedSender> = @@ -415,7 +403,7 @@ mod tests { #[tokio::test] async fn timeout_cancellation() { - let timeout_manager = TimeoutManager::test_default(); + let mut timeout_manager = TimeoutManager::test_default(); let mut join_set = JoinSet::new(); let timeout_tx: UnboundedSender> = @@ -448,7 +436,7 @@ mod tests { #[tokio::test] #[tracing_test::traced_test] async fn partial_timeout_cancellation() { - let timeout_manager = TimeoutManager::test_default(); + let mut timeout_manager = TimeoutManager::test_default(); let mut join_set = JoinSet::new(); let timeout_tx: UnboundedSender> =