Skip to content

Commit

Permalink
added must_use decorations
Browse files Browse the repository at this point in the history
  • Loading branch information
mkatychev committed Aug 20, 2024
1 parent 8078301 commit a4510dc
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 41 deletions.
2 changes: 1 addition & 1 deletion src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct BuilderContext<K: Rex> {
}

impl<K: Rex> RexBuilder<K, (), ()> {
pub fn new() -> Self {
#[must_use] pub fn new() -> Self {
Self::default()
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ impl<K, In> PacketRouter<K, In>
where
for<'a> K: HashKind + TryFrom<&'a In, Error = Report<ConversionError>>,
{
#[must_use]
pub fn new(state_routers: Vec<BoxedStateRouter<K, In>>) -> Self {
let mut router_map: HashMap<K, BoxedStateRouter<K, In>> = HashMap::new();
for router in state_routers {
Expand All @@ -58,7 +59,7 @@ where

fn get_id(&self, packet: &In) -> Result<Option<StateId<K>>, Report<RexError>> {
let kind = K::try_from(packet);
let kind = kind.map_err(|e| e.into_ctx())?;
let kind = kind.map_err(IntoContext::into_ctx)?;
let Some(router) = self.0.get(&kind) else {
return Ok(None);
};
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub trait State: fmt::Debug + Send + PartialEq + Copy {

/// `&dyn Kind<State = Self>` cannot do direct partial comparison
/// due to type opacity
/// so State::new_state(self) is called to allow a vtable lookup
/// so `State::new_state(self)` is called to allow a vtable lookup
fn kind_eq(&self, kind: &dyn Kind<State = Self>) -> bool
where
Self: Sized,
Expand Down
14 changes: 7 additions & 7 deletions src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use crate::{
node::{Insert, Node, Update},
notification::{Notification, NotificationQueue},
queue::StreamableDeque,
storage::*,
storage::{StateStore, Tree},
timeout::TimeoutInput,
Kind, Rex, State, StateId,
};
Expand Down Expand Up @@ -105,7 +105,7 @@ where

pub type SignalQueue<K> = Arc<StreamableDeque<Signal<K>>>;

/// [SignalExt] calls [`Signal::state_change`] to consume a [`Kind::State`] and emit
/// [`SignalExt`] calls [`Signal::state_change`] to consume a [`Kind::State`] and emit
/// a state change [`Signal`] with a valid [`StateMachine::Input`]
pub trait SignalExt<K>
where
Expand Down Expand Up @@ -179,7 +179,7 @@ impl<K> StateMachineManager<K>
where
K: Rex,
{
pub fn context(&self) -> SmContext<K> {
#[must_use] pub fn context(&self) -> SmContext<K> {
SmContext {
signal_queue: self.signal_queue.clone(),
notification_queue: self.notification_queue.clone(),
Expand Down Expand Up @@ -288,7 +288,7 @@ where
/// NOTE [`StateMachineExt::new`] is created without a hierarchy
fn create_tree(&self, ctx: &SmContext<K>, id: StateId<K>) {
ctx.state_store
.insert_ref(id, Arc::new(FairMutex::new(Node::new(id))))
.insert_ref(id, Arc::new(FairMutex::new(Node::new(id))));
}

fn has_state(&self, ctx: &SmContext<K>, id: StateId<K>) -> bool {
Expand Down Expand Up @@ -341,12 +341,12 @@ where

fn set_timeout(&self, ctx: &SmContext<K>, id: StateId<K>, duration: Duration) {
ctx.notification_queue
.priority_send(Notification(TimeoutInput::set_timeout(id, duration).into()))
.priority_send(Notification(TimeoutInput::set_timeout(id, duration).into()));
}

fn cancel_timeout(&self, ctx: &SmContext<K>, id: StateId<K>) {
ctx.notification_queue
.priority_send(Notification(TimeoutInput::cancel_timeout(id).into()))
.priority_send(Notification(TimeoutInput::cancel_timeout(id).into()));
}

fn get_parent_id(&self, ctx: &SmContext<K>, id: StateId<K>) -> Option<StateId<K>> {
Expand Down Expand Up @@ -765,7 +765,7 @@ mod tests {
who_sleeps,
}) => {
if msg == 0 {
self.update(&ctx, id, ComponentState::Pong(PongState::Responding))
self.update(&ctx, id, ComponentState::Pong(PongState::Responding));
}
msg += 5;

Expand Down
17 changes: 14 additions & 3 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ where
S: State,
Id: Copy + Eq + PartialEq + Hash + fmt::Display + Kind<State = S> + fmt::Debug,
{
#[must_use]
pub fn new(id: Id) -> Self {
Self {
id,
Expand All @@ -52,6 +53,7 @@ where
}
}

#[must_use]
pub fn zipper(self) -> Zipper<Id, S> {
Zipper {
node: self,
Expand All @@ -60,6 +62,7 @@ where
}
}

#[must_use]
pub fn get(&self, id: Id) -> Option<&Node<Id, S>> {
if self.id == id {
return Some(self);
Expand All @@ -75,17 +78,20 @@ where
Some(node)
}

#[must_use]
pub fn get_state(&self, id: Id) -> Option<&S> {
self.get(id).map(|n| &n.state)
}

#[must_use]
pub fn child(&self, id: Id) -> Option<&Node<Id, S>> {
self.children
.iter()
.find(|node| node.id == id || node.descendant_keys.contains(&id))
}

// get array index by of node with Id in self.descendant_keys
#[must_use]
pub fn child_idx(&self, id: Id) -> Option<usize> {
self.children
.iter()
Expand All @@ -109,6 +115,7 @@ where
}

/// inserts a new node using self by value
#[must_use]
pub fn into_insert(self, Insert { parent_id, id }: Insert<Id>) -> Node<Id, S> {
// inserts at this point should be guaranteed Some(id)
// ince a parent_id.is_none() should be handled by the node
Expand All @@ -121,6 +128,7 @@ where
.finish_insert(id)
}

#[must_use]
pub fn get_parent_id(&self, id: Id) -> Option<Id> {
// root_node edge case
if !self.descendant_keys.contains(&id) {
Expand Down Expand Up @@ -180,6 +188,7 @@ where
std::mem::swap(&mut swap_node, self);
}

#[must_use]
pub fn into_update(self, Update { id, state }: Update<Id, S>) -> Node<Id, S> {
self.zipper().by_id(id).set_state(state).finish_update()
}
Expand Down Expand Up @@ -214,9 +223,10 @@ where
self = self.child(idx);
contains_id = self.node.descendant_keys.contains(&id);
}
if self.node.id != id {
panic!("id[{id}] should be in the node, this is a bug");
}
assert!(
!(self.node.id != id),
"id[{id}] should be in the node, this is a bug"
);
self
}

Expand Down Expand Up @@ -283,6 +293,7 @@ where
self.node
}

#[must_use]
pub fn finish_update(mut self) -> Node<Id, S> {
while self.parent.is_some() {
self = self.parent();
Expand Down
8 changes: 4 additions & 4 deletions src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,18 @@ where
pub struct NotificationQueue<M: RexMessage>(pub(crate) Arc<StreamableDeque<Notification<M>>>);

impl<M: RexMessage> NotificationQueue<M> {
pub fn new() -> Self {
#[must_use] pub fn new() -> Self {
Self(Arc::new(StreamableDeque::new()))
}
pub fn send(&self, notif: Notification<M>) {
self.0.push_back(notif)
self.0.push_back(notif);
}

pub fn priority_send(&self, notif: Notification<M>) {
self.0.push_front(notif)
self.0.push_front(notif);
}

pub fn stream(&self) -> StreamReceiver<Notification<M>> {
#[must_use] pub fn stream(&self) -> StreamReceiver<Notification<M>> {
self.0.stream()
}
}
Expand Down
38 changes: 17 additions & 21 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl<T> RawDeque<T> {
}
}

/// This type acts similarly to std::collections::VecDeque but
/// This type acts similarly to `std::collections::VecDeque` but
/// modifying queue is async
pub struct StreamableDeque<T> {
inner: Mutex<RawDeque<T>>,
Expand All @@ -85,7 +85,7 @@ impl<T> Default for StreamableDeque<T> {
}

impl<T> StreamableDeque<T> {
pub fn new() -> Self {
#[must_use] pub fn new() -> Self {
Self::default()
}

Expand All @@ -105,8 +105,8 @@ impl<T> StreamableDeque<T> {
inner.notify_rx();
}

/// Returns a stream of items using pop_front()
/// This opens us up to handle a back_stream() as well
/// Returns a stream of items using `pop_front()`
/// This opens us up to handle a `back_stream()` as well
pub fn stream(&self) -> StreamReceiver<T> {
StreamReceiver {
queue: self,
Expand Down Expand Up @@ -149,23 +149,19 @@ impl<'a, T> Stream for StreamReceiver<'a, T> {
.pop_front()
.or_else(|| inner.back_values.pop_front());

match value {
Some(v) => {
self.awake = None;
Poll::Ready(Some(v))
}
// if queue has no entries
None => {
// TODO avoid allocation of a new AtomicBool if possible
let awake = Arc::new(AtomicBool::new(false));
// push stream's waker onto buffer
inner.rx_notifiers.push_back(ReceiverNotifier {
handle: ctx.waker().clone(),
awake: awake.clone(),
});
self.awake = Some(awake);
Poll::Pending
}
if let Some(v) = value {
self.awake = None;
Poll::Ready(Some(v))
} else {
// TODO avoid allocation of a new AtomicBool if possible
let awake = Arc::new(AtomicBool::new(false));
// push stream's waker onto buffer
inner.rx_notifiers.push_back(ReceiverNotifier {
handle: ctx.waker().clone(),
awake: awake.clone(),
});
self.awake = Some(awake);
Poll::Pending
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ where
pub(crate) type Tree<K> = Arc<FairMutex<Node<StateId<K>, <K as Kind>::State>>>;

impl<K: Rex> StateStore<StateId<K>, K::State> {
pub fn new() -> Self {
#[must_use] pub fn new() -> Self {
Self {
trees: DashMap::new(),
}
Expand Down
9 changes: 7 additions & 2 deletions src/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::{
collections::{BTreeMap, HashMap, HashSet},
fmt,
iter::IntoIterator,
sync::Arc,
time::Duration,
};
Expand Down Expand Up @@ -57,7 +58,7 @@ fn hms_string(duration: Duration) -> String {
hms
}

/// TimeoutLedger` contains a [`BTreeMap`] that uses [`Instant`]s to time out
/// `TimeoutLedger` contains a [`BTreeMap`] that uses [`Instant`]s to time out
/// specific [`StateId`]s and a [`HashMap`] that indexes `Instant`s by [`StateId`].
///
/// This double indexing allows [`Operation::Cancel`]s to go
Expand Down Expand Up @@ -153,10 +154,12 @@ impl std::fmt::Display for Operation {
}

impl Operation {
#[must_use]
pub fn from_duration(duration: Duration) -> Self {
Self::Set(Instant::now() + duration)
}

#[must_use]
pub fn from_millis(millis: u64) -> Self {
Self::Set(Instant::now() + Duration::from_millis(millis))
}
Expand Down Expand Up @@ -221,6 +224,7 @@ where
K::Message: TryInto<TimeoutInput<K>>,
<K::Message as TryInto<TimeoutInput<K>>>::Error: Send,
{
#[must_use]
pub fn new(
signal_queue: SignalQueue<K>,
topic: impl Into<<K::Message as RexMessage>::Topic>,
Expand All @@ -233,6 +237,7 @@ where
}
}

#[must_use]
pub fn with_tick_rate(self, tick_rate: Duration) -> Self {
Self { tick_rate, ..self }
}
Expand Down Expand Up @@ -309,7 +314,7 @@ where
for id in expired
.iter()
.filter_map(|t| ledger.timers.remove(t))
.flat_map(|set| set.into_iter())
.flat_map(IntoIterator::into_iter)
.collect::<Vec<_>>()
{
warn!(%id, "timed out");
Expand Down

0 comments on commit a4510dc

Please sign in to comment.