Skip to content

Commit

Permalink
Added TimeoutManager item retention (#6)
Browse files Browse the repository at this point in the history
Approved by: @jsgoyette, @jkleinknox
============
`TimeoutManager` can now hold a `Copy` item (to be returned to sender) for a set amount of time:
https://github.com/knox-networks/rex-sm/blob/efb8b508209a02692a6bc4fd5d6064f54e45851c/src/timeout.rs#L172-L176

This opens the way for retry logic implementations that can re-run on a cadence without blocking `StateMachine` processors.

* `StateMachine::process` is not a sync method
* `StateId` is now part of the `SmContext`:
  https://github.com/knox-networks/rex-sm/blob/0bbc75433fa07cef1a3ff99d5fdbf6455a9db2c1/src/manager.rs#L129-L134
* Added `timeout::Timeout` trait for `Kind`s that handle timeout items:
  https://github.com/knox-networks/rex-sm/blob/efb8b508209a02692a6bc4fd5d6064f54e45851c/src/timeout.rs#L159-L166
* Added `timeout::TimeoutMessage` for `RexMessage`s that handle timeoue items:
  https://github.com/knox-networks/rex-sm/blob/efb8b508209a02692a6bc4fd5d6064f54e45851c/src/timeout.rs#L150-L157
  • Loading branch information
mkatychev authored Sep 6, 2024
1 parent 4927774 commit d8991f7
Show file tree
Hide file tree
Showing 11 changed files with 461 additions and 1,226 deletions.
789 changes: 0 additions & 789 deletions Cargo.lock

This file was deleted.

7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rex-sm"
version = "0.5.3"
version = "0.6.0"
edition = "2021"
description = "Hierarchical state machine"
license = "MIT"
Expand All @@ -14,13 +14,16 @@ async-trait = "0.1"
bigerror = ">=0.8"
bs58 = "0.5.1"
dashmap = "5"
derive_more = { version = "1", features = ["display"] }
futures = "0.3"
parking_lot = "0.12"
thiserror = "1"
tokio = { version = "1", features = ["sync", "time", "rt", "macros"] }
tokio-stream = "0.1"
tracing = "0.1"
tracing-test = "0.2"
uuid = { version = "1", features = ["v4"] }
# cargo add async-trait dashmap bigerror futures thiserror tokio-stream tracing tracing-test uuid
# cargo add tokio --features sync,time
[dev-dependencies]
tracing-test = "0.2"
derive_more = { version = "1", features = ["display", "try_into", "try_from", "from"] }
86 changes: 21 additions & 65 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@ use tokio::{
};

use crate::{
ingress::{BoxedStateRouter, IngressAdapter, PacketRouter},
manager::BoxedStateMachine,
ingress::{BoxedStateRouter, Ingress, IngressAdapter, PacketRouter},
manager::{BoxedStateMachine, EmptyContext},
notification::NotificationQueue,
timeout::{self, TimeoutInput, TimeoutManager},
NotificationManager, NotificationProcessor, Rex, RexMessage, SignalQueue, SmContext,
StateMachine, StateMachineManager,
timeout::{self, Timeout, TimeoutManager, TimeoutMessage},
NotificationManager, NotificationProcessor, Rex, RexMessage, SignalQueue, StateMachine,
StateMachineManager,
};

pub struct RexBuilder<K, In = (), Out = ()>
where
K: Rex,
In: Send + Sync + std::fmt::Debug,
Out: Send + Sync + std::fmt::Debug,
{
signal_queue: SignalQueue<K>,
notification_queue: NotificationQueue<K::Message>,
Expand All @@ -38,18 +36,16 @@ pub struct BuilderContext<K: Rex> {
}

impl<K: Rex> RexBuilder<K, (), ()> {
#[must_use] pub fn new() -> Self {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}

impl<K, In, Out> RexBuilder<K, In, Out>
where
K: Rex,
K::Message: From<TimeoutInput<K>> + TryInto<TimeoutInput<K>>,
<K::Message as TryInto<TimeoutInput<K>>>::Error: Send,
In: Send + Sync + std::fmt::Debug,
Out: Send + Sync + std::fmt::Debug,
K: Rex + Timeout,
K::Message: TimeoutMessage<K>,
TimeoutManager<K>: NotificationProcessor<K::Message>,
{
pub fn ctx(&self) -> BuilderContext<K> {
Expand Down Expand Up @@ -123,7 +119,7 @@ where
}
}

fn build_inner(mut self, join_set: &mut JoinSet<()>) -> SmContext<K> {
fn build_inner(mut self, join_set: &mut JoinSet<()>) -> EmptyContext<K> {
self.build_timeout_manager();

if !self.notification_processors.is_empty() {
Expand All @@ -141,70 +137,32 @@ where
);

sm_manager.init(join_set);
sm_manager.context()
sm_manager.ctx_builder()
}

pub fn build(self) -> SmContext<K> {
pub fn build(self) -> EmptyContext<K> {
let mut join_set = JoinSet::new();
let ctx = self.build_inner(&mut join_set);
join_set.detach_all();

ctx
}

pub fn build_with_handle(self, join_set: &mut JoinSet<()>) -> SmContext<K> {
pub fn build_with_handle(self, join_set: &mut JoinSet<()>) -> EmptyContext<K> {
self.build_inner(join_set)
}

// this does not return `Self` so that we can get access to an inbound_tx
#[must_use]
pub fn into_ingress_builder<In2, Out2>(
self,
outbound_tx: UnboundedSender<Out2>,
) -> (UnboundedSender<In2>, RexBuilder<K, In2, Out2>)
where
for<'a> K: TryFrom<&'a In2, Error = Report<ConversionError>>,
In2: Send + Sync + std::fmt::Debug + 'static,
Out2: Send + Sync + std::fmt::Debug + 'static,
K::Input: TryFrom<In2, Error = Report<ConversionError>>,
K::Message: TryInto<Out2, Error = Report<ConversionError>>,
{
let (inbound_tx, inbound_rx) = mpsc::unbounded_channel::<In2>();
(
inbound_tx.clone(),
RexBuilder::<K, In2, Out2> {
signal_queue: self.signal_queue,
notification_queue: self.notification_queue,
state_machines: self.state_machines,
notification_processors: self.notification_processors,
timeout_topic: self.timeout_topic,
tick_rate: self.tick_rate,
outbound_tx: Some(outbound_tx),
ingress_channel: Some((inbound_tx, inbound_rx)),
},
)
}
}

impl<K, In, Out> RexBuilder<K, In, Out>
impl<K> RexBuilder<K, K::In, K::Out>
where
K: Rex,

for<'a> K: TryFrom<&'a In, Error = Report<ConversionError>>,
In: Send + Sync + std::fmt::Debug + 'static,
Out: Send + Sync + std::fmt::Debug + 'static,

K::Input: TryFrom<In, Error = Report<ConversionError>>,
K::Message: TryInto<Out, Error = Report<ConversionError>>,
K::Message: From<TimeoutInput<K>> + TryInto<TimeoutInput<K>>,
<K::Message as TryInto<TimeoutInput<K>>>::Error: Send,
K: Rex + Timeout + Ingress,
K::Message: TimeoutMessage<K> + TryInto<K::Out, Error = Report<ConversionError>>,
K::Input: TryFrom<K::In, Error = Report<ConversionError>>,
TimeoutManager<K>: NotificationProcessor<K::Message>,
{
#[must_use]
pub fn new_connected(
outbound_tx: UnboundedSender<Out>,
) -> (UnboundedSender<In>, RexBuilder<K, In, Out>) {
let (inbound_tx, inbound_rx) = mpsc::unbounded_channel::<In>();
pub fn new_connected(outbound_tx: UnboundedSender<K::Out>) -> (UnboundedSender<K::In>, Self) {
let (inbound_tx, inbound_rx) = mpsc::unbounded_channel::<K::In>();
(
inbound_tx.clone(),
Self {
Expand All @@ -216,7 +174,7 @@ where
}

#[must_use]
pub fn ingress_tx(&self) -> UnboundedSender<In> {
pub fn ingress_tx(&self) -> UnboundedSender<K::In> {
self.ingress_channel
.as_ref()
.map(|(tx, _)| tx.clone())
Expand All @@ -226,7 +184,7 @@ where
#[must_use]
pub fn with_ingress_adapter(
mut self,
state_routers: Vec<BoxedStateRouter<K, In>>,
state_routers: Vec<BoxedStateRouter<K, K::In>>,
ingress_topic: <K::Message as RexMessage>::Topic,
) -> Self {
assert!(!state_routers.is_empty());
Expand Down Expand Up @@ -254,8 +212,6 @@ where
impl<K, In, Out> Default for RexBuilder<K, In, Out>
where
K: Rex,
In: Send + Sync + std::fmt::Debug,
Out: Send + Sync + std::fmt::Debug,
{
fn default() -> Self {
Self {
Expand Down
63 changes: 32 additions & 31 deletions src/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,38 +68,35 @@ where
}

/// Represents a bidirectional network connection
pub struct IngressAdapter<K, In, Out>
pub struct IngressAdapter<K>
where
K: Rex,
In: Send + Sync + fmt::Debug,
Out: Send + Sync + fmt::Debug,
K: Rex + Ingress,
K::Input: TryFrom<K::In, Error = Report<ConversionError>>,
K::Message: TryInto<K::Out, Error = Report<ConversionError>>,
{
pub(crate) outbound_tx: UnboundedSender<Out>,
pub(crate) outbound_tx: UnboundedSender<K::Out>,
pub(crate) signal_queue: SignalQueue<K>,
pub(crate) router: PacketRouter<K, In>,
pub inbound_tx: UnboundedSender<In>,
pub(crate) router: PacketRouter<K, K::In>,
pub inbound_tx: UnboundedSender<K::In>,
// `self.inbound_rx.take()` will be used on initialization
pub(crate) inbound_rx: Option<UnboundedReceiver<In>>,
pub(crate) inbound_rx: Option<UnboundedReceiver<K::In>>,
pub(crate) topic: <K::Message as RexMessage>::Topic,
}

impl<K, In, Out> IngressAdapter<K, In, Out>
impl<K> IngressAdapter<K>
where
K: Rex,
for<'a> K: TryFrom<&'a In, Error = Report<ConversionError>>,
K::Input: TryFrom<In, Error = Report<ConversionError>>,
K::Message: TryInto<Out, Error = Report<ConversionError>>,
In: Send + Sync + fmt::Debug + 'static,
Out: Send + Sync + fmt::Debug + 'static,
K: Rex + Ingress,
K::Input: TryFrom<K::In, Error = Report<ConversionError>>,
K::Message: TryInto<K::Out, Error = Report<ConversionError>>,
{
#[must_use]
pub fn new(
signal_queue: SignalQueue<K>,
outbound_tx: UnboundedSender<Out>,
state_routers: Vec<BoxedStateRouter<K, In>>,
outbound_tx: UnboundedSender<K::Out>,
state_routers: Vec<BoxedStateRouter<K, K::In>>,
topic: impl Into<<K::Message as RexMessage>::Topic>,
) -> Self {
let (inbound_tx, inbound_rx) = mpsc::unbounded_channel::<In>();
let (inbound_tx, inbound_rx) = mpsc::unbounded_channel::<K::In>();

Self {
signal_queue,
Expand All @@ -119,9 +116,9 @@ where
}

async fn process_inbound(
router: PacketRouter<K, In>,
router: PacketRouter<K, K::In>,
signal_queue: Arc<StreamableDeque<Signal<K>>>,
mut packet_rx: UnboundedReceiver<In>,
mut packet_rx: UnboundedReceiver<K::In>,
) {
debug!(target: "state_machine", spawning = "IngressAdapter.packet_tx");
while let Some(packet) = packet_rx.recv().await {
Expand All @@ -146,14 +143,21 @@ where
}
}

impl<K, In, Out> NotificationProcessor<K::Message> for IngressAdapter<K, In, Out>
pub trait Ingress: Rex
where
K: Rex,
for<'a> K: TryFrom<&'a In, Error = Report<ConversionError>>,
K::Input: TryFrom<In, Error = Report<ConversionError>>,
K::Message: TryInto<Out, Error = Report<ConversionError>>,
In: Send + Sync + fmt::Debug + 'static,
Out: Send + Sync + fmt::Debug + 'static,
Self::Input: TryFrom<Self::In, Error = Report<ConversionError>>,
Self::Message: TryInto<Self::Out, Error = Report<ConversionError>>,
for<'a> Self: TryFrom<&'a Self::In, Error = Report<ConversionError>>,
{
type In: Send + Sync + fmt::Debug + 'static;
type Out: Send + Sync + fmt::Debug + 'static;
}

impl<K> NotificationProcessor<K::Message> for IngressAdapter<K>
where
K: Rex + Ingress,
K::Input: TryFrom<K::In, Error = Report<ConversionError>>,
K::Message: TryInto<K::Out, Error = Report<ConversionError>>,
{
fn init(&mut self, join_set: &mut JoinSet<()>) -> UnboundedSender<Notification<K::Message>> {
debug!("calling IngressAdapter::process_inbound");
Expand Down Expand Up @@ -203,10 +207,7 @@ mod tests {
RexBuilder, StateId,
};

type TestIngressAdapter = (
IngressAdapter<TestKind, InPacket, OutPacket>,
UnboundedReceiver<OutPacket>,
);
type TestIngressAdapter = (IngressAdapter<TestKind>, UnboundedReceiver<OutPacket>);

impl TestDefault for TestIngressAdapter {
fn test_default() -> Self {
Expand Down
19 changes: 14 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub use notification::{
GetTopic, Notification, NotificationManager, NotificationProcessor, NotificationQueue,
Operation, Request, RequestInner, RexMessage, RexTopic, Subscriber, UnaryRequest,
};
pub use timeout::Timeout;

/// A trait for types representing state machine lifecycles. These types should be field-less
/// enumerations or enumerations whose variants only contain field-less enumerations; note that
Expand Down Expand Up @@ -106,11 +107,10 @@ pub trait Kind: fmt::Debug + Send {
/// type hierarchy defined by validly implementing the [`Kind`] and [`Rex`] traits,
/// double colons`::` directed down and right represent type associations:
/// ```text
/// Topic
/// ::
///
/// Kind -> Rex::Message
/// :: :: ::
/// State Input Topic
/// :: :: ::
/// State Input Topic
/// ```
pub trait Rex: Kind + HashKind {
type Input: Send + Sync + 'static + fmt::Debug;
Expand Down Expand Up @@ -147,7 +147,9 @@ where
f,
"{:?}<{}>",
self.kind,
bs58::encode(self.uuid).into_string()
self.is_nil()
.then(|| bs58::encode(self.uuid).into_string())
.unwrap_or_else(|| "NIL".to_string())
)
}
}
Expand All @@ -160,6 +162,13 @@ impl<K: Kind> StateId<K> {
pub fn new_rand(kind: K) -> Self {
Self::new(kind, Uuid::new_v4())
}

pub fn nil(kind: K) -> Self {
Self::new(kind, Uuid::nil())
}
pub fn is_nil(&self) -> bool {
self.uuid == Uuid::nil()
}
// for testing purposes, easily distinguish UUIDs
// by numerical value
#[cfg(test)]
Expand Down
Loading

0 comments on commit d8991f7

Please sign in to comment.