Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added TimeoutManager item retention #6

Merged
merged 17 commits into from
Sep 6, 2024
789 changes: 0 additions & 789 deletions Cargo.lock

This file was deleted.

7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rex-sm"
version = "0.5.3"
version = "0.6.0"
edition = "2021"
description = "Hierarchical state machine"
license = "MIT"
Expand All @@ -14,13 +14,16 @@ async-trait = "0.1"
bigerror = ">=0.8"
bs58 = "0.5.1"
dashmap = "5"
derive_more = { version = "1", features = ["display"] }
futures = "0.3"
parking_lot = "0.12"
thiserror = "1"
tokio = { version = "1", features = ["sync", "time", "rt", "macros"] }
tokio-stream = "0.1"
tracing = "0.1"
tracing-test = "0.2"
uuid = { version = "1", features = ["v4"] }
# cargo add async-trait dashmap bigerror futures thiserror tokio-stream tracing tracing-test uuid
# cargo add tokio --features sync,time
[dev-dependencies]
tracing-test = "0.2"
derive_more = { version = "1", features = ["display", "try_into", "try_from", "from"] }
86 changes: 21 additions & 65 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@ use tokio::{
};

use crate::{
ingress::{BoxedStateRouter, IngressAdapter, PacketRouter},
manager::BoxedStateMachine,
ingress::{BoxedStateRouter, Ingress, IngressAdapter, PacketRouter},
manager::{BoxedStateMachine, EmptyContext},
notification::NotificationQueue,
timeout::{self, TimeoutInput, TimeoutManager},
NotificationManager, NotificationProcessor, Rex, RexMessage, SignalQueue, SmContext,
StateMachine, StateMachineManager,
timeout::{self, Timeout, TimeoutManager, TimeoutMessage},
NotificationManager, NotificationProcessor, Rex, RexMessage, SignalQueue, StateMachine,
StateMachineManager,
};

pub struct RexBuilder<K, In = (), Out = ()>
where
K: Rex,
In: Send + Sync + std::fmt::Debug,
Out: Send + Sync + std::fmt::Debug,
{
signal_queue: SignalQueue<K>,
notification_queue: NotificationQueue<K::Message>,
Expand All @@ -38,18 +36,16 @@ pub struct BuilderContext<K: Rex> {
}

impl<K: Rex> RexBuilder<K, (), ()> {
#[must_use] pub fn new() -> Self {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}

impl<K, In, Out> RexBuilder<K, In, Out>
where
K: Rex,
K::Message: From<TimeoutInput<K>> + TryInto<TimeoutInput<K>>,
<K::Message as TryInto<TimeoutInput<K>>>::Error: Send,
In: Send + Sync + std::fmt::Debug,
Out: Send + Sync + std::fmt::Debug,
K: Rex + Timeout,
K::Message: TimeoutMessage<K>,
TimeoutManager<K>: NotificationProcessor<K::Message>,
{
pub fn ctx(&self) -> BuilderContext<K> {
Expand Down Expand Up @@ -123,7 +119,7 @@ where
}
}

fn build_inner(mut self, join_set: &mut JoinSet<()>) -> SmContext<K> {
fn build_inner(mut self, join_set: &mut JoinSet<()>) -> EmptyContext<K> {
self.build_timeout_manager();

if !self.notification_processors.is_empty() {
Expand All @@ -141,70 +137,32 @@ where
);

sm_manager.init(join_set);
sm_manager.context()
sm_manager.ctx_builder()
}

pub fn build(self) -> SmContext<K> {
pub fn build(self) -> EmptyContext<K> {
let mut join_set = JoinSet::new();
let ctx = self.build_inner(&mut join_set);
join_set.detach_all();

ctx
}

pub fn build_with_handle(self, join_set: &mut JoinSet<()>) -> SmContext<K> {
pub fn build_with_handle(self, join_set: &mut JoinSet<()>) -> EmptyContext<K> {
self.build_inner(join_set)
}

// this does not return `Self` so that we can get access to an inbound_tx
#[must_use]
pub fn into_ingress_builder<In2, Out2>(
self,
outbound_tx: UnboundedSender<Out2>,
) -> (UnboundedSender<In2>, RexBuilder<K, In2, Out2>)
where
for<'a> K: TryFrom<&'a In2, Error = Report<ConversionError>>,
In2: Send + Sync + std::fmt::Debug + 'static,
Out2: Send + Sync + std::fmt::Debug + 'static,
K::Input: TryFrom<In2, Error = Report<ConversionError>>,
K::Message: TryInto<Out2, Error = Report<ConversionError>>,
{
let (inbound_tx, inbound_rx) = mpsc::unbounded_channel::<In2>();
(
inbound_tx.clone(),
RexBuilder::<K, In2, Out2> {
signal_queue: self.signal_queue,
notification_queue: self.notification_queue,
state_machines: self.state_machines,
notification_processors: self.notification_processors,
timeout_topic: self.timeout_topic,
tick_rate: self.tick_rate,
outbound_tx: Some(outbound_tx),
ingress_channel: Some((inbound_tx, inbound_rx)),
},
)
}
}

impl<K, In, Out> RexBuilder<K, In, Out>
impl<K> RexBuilder<K, K::In, K::Out>
where
K: Rex,

for<'a> K: TryFrom<&'a In, Error = Report<ConversionError>>,
In: Send + Sync + std::fmt::Debug + 'static,
Out: Send + Sync + std::fmt::Debug + 'static,

K::Input: TryFrom<In, Error = Report<ConversionError>>,
K::Message: TryInto<Out, Error = Report<ConversionError>>,
K::Message: From<TimeoutInput<K>> + TryInto<TimeoutInput<K>>,
<K::Message as TryInto<TimeoutInput<K>>>::Error: Send,
K: Rex + Timeout + Ingress,
K::Message: TimeoutMessage<K> + TryInto<K::Out, Error = Report<ConversionError>>,
K::Input: TryFrom<K::In, Error = Report<ConversionError>>,
TimeoutManager<K>: NotificationProcessor<K::Message>,
{
#[must_use]
pub fn new_connected(
outbound_tx: UnboundedSender<Out>,
) -> (UnboundedSender<In>, RexBuilder<K, In, Out>) {
let (inbound_tx, inbound_rx) = mpsc::unbounded_channel::<In>();
pub fn new_connected(outbound_tx: UnboundedSender<K::Out>) -> (UnboundedSender<K::In>, Self) {
let (inbound_tx, inbound_rx) = mpsc::unbounded_channel::<K::In>();
(
inbound_tx.clone(),
Self {
Expand All @@ -216,7 +174,7 @@ where
}

#[must_use]
pub fn ingress_tx(&self) -> UnboundedSender<In> {
pub fn ingress_tx(&self) -> UnboundedSender<K::In> {
self.ingress_channel
.as_ref()
.map(|(tx, _)| tx.clone())
Expand All @@ -226,7 +184,7 @@ where
#[must_use]
pub fn with_ingress_adapter(
mut self,
state_routers: Vec<BoxedStateRouter<K, In>>,
state_routers: Vec<BoxedStateRouter<K, K::In>>,
ingress_topic: <K::Message as RexMessage>::Topic,
) -> Self {
assert!(!state_routers.is_empty());
Expand Down Expand Up @@ -254,8 +212,6 @@ where
impl<K, In, Out> Default for RexBuilder<K, In, Out>
where
K: Rex,
In: Send + Sync + std::fmt::Debug,
Out: Send + Sync + std::fmt::Debug,
{
fn default() -> Self {
Self {
Expand Down
63 changes: 32 additions & 31 deletions src/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,38 +68,35 @@ where
}

/// Represents a bidirectional network connection
pub struct IngressAdapter<K, In, Out>
pub struct IngressAdapter<K>
where
K: Rex,
In: Send + Sync + fmt::Debug,
Out: Send + Sync + fmt::Debug,
K: Rex + Ingress,
K::Input: TryFrom<K::In, Error = Report<ConversionError>>,
K::Message: TryInto<K::Out, Error = Report<ConversionError>>,
{
pub(crate) outbound_tx: UnboundedSender<Out>,
pub(crate) outbound_tx: UnboundedSender<K::Out>,
pub(crate) signal_queue: SignalQueue<K>,
pub(crate) router: PacketRouter<K, In>,
pub inbound_tx: UnboundedSender<In>,
pub(crate) router: PacketRouter<K, K::In>,
pub inbound_tx: UnboundedSender<K::In>,
// `self.inbound_rx.take()` will be used on initialization
pub(crate) inbound_rx: Option<UnboundedReceiver<In>>,
pub(crate) inbound_rx: Option<UnboundedReceiver<K::In>>,
pub(crate) topic: <K::Message as RexMessage>::Topic,
}

impl<K, In, Out> IngressAdapter<K, In, Out>
impl<K> IngressAdapter<K>
where
K: Rex,
for<'a> K: TryFrom<&'a In, Error = Report<ConversionError>>,
K::Input: TryFrom<In, Error = Report<ConversionError>>,
K::Message: TryInto<Out, Error = Report<ConversionError>>,
In: Send + Sync + fmt::Debug + 'static,
Out: Send + Sync + fmt::Debug + 'static,
K: Rex + Ingress,
K::Input: TryFrom<K::In, Error = Report<ConversionError>>,
K::Message: TryInto<K::Out, Error = Report<ConversionError>>,
{
#[must_use]
pub fn new(
signal_queue: SignalQueue<K>,
outbound_tx: UnboundedSender<Out>,
state_routers: Vec<BoxedStateRouter<K, In>>,
outbound_tx: UnboundedSender<K::Out>,
state_routers: Vec<BoxedStateRouter<K, K::In>>,
topic: impl Into<<K::Message as RexMessage>::Topic>,
) -> Self {
let (inbound_tx, inbound_rx) = mpsc::unbounded_channel::<In>();
let (inbound_tx, inbound_rx) = mpsc::unbounded_channel::<K::In>();

Self {
signal_queue,
Expand All @@ -119,9 +116,9 @@ where
}

async fn process_inbound(
router: PacketRouter<K, In>,
router: PacketRouter<K, K::In>,
signal_queue: Arc<StreamableDeque<Signal<K>>>,
mut packet_rx: UnboundedReceiver<In>,
mut packet_rx: UnboundedReceiver<K::In>,
) {
debug!(target: "state_machine", spawning = "IngressAdapter.packet_tx");
while let Some(packet) = packet_rx.recv().await {
Expand All @@ -146,14 +143,21 @@ where
}
}

impl<K, In, Out> NotificationProcessor<K::Message> for IngressAdapter<K, In, Out>
pub trait Ingress: Rex
where
K: Rex,
for<'a> K: TryFrom<&'a In, Error = Report<ConversionError>>,
K::Input: TryFrom<In, Error = Report<ConversionError>>,
K::Message: TryInto<Out, Error = Report<ConversionError>>,
In: Send + Sync + fmt::Debug + 'static,
Out: Send + Sync + fmt::Debug + 'static,
Self::Input: TryFrom<Self::In, Error = Report<ConversionError>>,
Self::Message: TryInto<Self::Out, Error = Report<ConversionError>>,
for<'a> Self: TryFrom<&'a Self::In, Error = Report<ConversionError>>,
{
type In: Send + Sync + fmt::Debug + 'static;
type Out: Send + Sync + fmt::Debug + 'static;
}

impl<K> NotificationProcessor<K::Message> for IngressAdapter<K>
where
K: Rex + Ingress,
K::Input: TryFrom<K::In, Error = Report<ConversionError>>,
K::Message: TryInto<K::Out, Error = Report<ConversionError>>,
{
fn init(&mut self, join_set: &mut JoinSet<()>) -> UnboundedSender<Notification<K::Message>> {
debug!("calling IngressAdapter::process_inbound");
Expand Down Expand Up @@ -203,10 +207,7 @@ mod tests {
RexBuilder, StateId,
};

type TestIngressAdapter = (
IngressAdapter<TestKind, InPacket, OutPacket>,
UnboundedReceiver<OutPacket>,
);
type TestIngressAdapter = (IngressAdapter<TestKind>, UnboundedReceiver<OutPacket>);

impl TestDefault for TestIngressAdapter {
fn test_default() -> Self {
Expand Down
19 changes: 14 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub use notification::{
GetTopic, Notification, NotificationManager, NotificationProcessor, NotificationQueue,
Operation, Request, RequestInner, RexMessage, RexTopic, Subscriber, UnaryRequest,
};
pub use timeout::Timeout;

/// A trait for types representing state machine lifecycles. These types should be field-less
/// enumerations or enumerations whose variants only contain field-less enumerations; note that
Expand Down Expand Up @@ -106,11 +107,10 @@ pub trait Kind: fmt::Debug + Send {
/// type hierarchy defined by validly implementing the [`Kind`] and [`Rex`] traits,
/// double colons`::` directed down and right represent type associations:
/// ```text
/// Topic
/// ::
///
/// Kind -> Rex::Message
/// :: :: ::
/// State Input Topic
/// :: :: ::
/// State Input Topic
/// ```
pub trait Rex: Kind + HashKind {
type Input: Send + Sync + 'static + fmt::Debug;
Expand Down Expand Up @@ -147,7 +147,9 @@ where
f,
"{:?}<{}>",
self.kind,
bs58::encode(self.uuid).into_string()
self.is_nil()
.then(|| bs58::encode(self.uuid).into_string())
.unwrap_or_else(|| "NIL".to_string())
)
}
}
Expand All @@ -160,6 +162,13 @@ impl<K: Kind> StateId<K> {
pub fn new_rand(kind: K) -> Self {
Self::new(kind, Uuid::new_v4())
}

pub fn nil(kind: K) -> Self {
Self::new(kind, Uuid::nil())
}
pub fn is_nil(&self) -> bool {
self.uuid == Uuid::nil()
}
// for testing purposes, easily distinguish UUIDs
// by numerical value
#[cfg(test)]
Expand Down
Loading