Skip to content

Commit

Permalink
Merge pull request #53 from fraktalio/feature/event_sourced_orchestra…
Browse files Browse the repository at this point in the history
…ting_aggregate

Identifier trait added - domain
  • Loading branch information
idugalic authored Jan 25, 2025
2 parents 33613d7 + 1f22bc7 commit f7a4faf
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 63 deletions.
19 changes: 16 additions & 3 deletions src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::marker::PhantomData;

use crate::decider::{Decider, EventComputation, StateComputation};
use crate::saga::{ActionComputation, Saga};
use crate::Identifier;

/// Event Repository trait
///
Expand Down Expand Up @@ -316,7 +317,11 @@ where
}
}
/// Handles the command by fetching the events from the repository, computing new events based on the current events and the command, and saving the new events to the repository.
pub async fn handle(&self, command: &C) -> Result<Vec<(E, Version)>, Error> {
pub async fn handle(&self, command: &C) -> Result<Vec<(E, Version)>, Error>
where
E: Identifier,
C: Identifier,
{
let events: Vec<(E, Version)> = self.fetch_events(command).await?;
let mut current_events: Vec<E> = vec![];
for (event, _) in events {
Expand All @@ -336,7 +341,11 @@ where
&self,
current_events: &[E],
command: &C,
) -> Result<Vec<E>, Error> {
) -> Result<Vec<E>, Error>
where
E: Identifier,
C: Identifier,
{
let current_state: S = current_events
.iter()
.fold((self.decider.initial_state)(), |state, event| {
Expand All @@ -361,7 +370,11 @@ where
.iter()
.map(|(e, _)| e.clone())
.collect::<Vec<E>>(),
initial_events.clone(),
initial_events
.clone()
.into_iter()
.filter(|e| e.identifier() == command.identifier())
.collect::<Vec<E>>(),
]
.concat();

Expand Down
7 changes: 7 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,10 @@ pub enum Sum<A, B> {
/// Second variant
Second(B),
}

/// Identify the state/command/event.
/// It is used to identify the concept to what the state/command/event belongs to. For example, the `order_id` or `restaurant_id`.
pub trait Identifier {
/// Returns the identifier of the state/command/event
fn identifier(&self) -> String;
}
15 changes: 10 additions & 5 deletions tests/aggregate_combined_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use fmodel_rust::aggregate::{
};
use fmodel_rust::decider::Decider;
use fmodel_rust::saga::Saga;
use fmodel_rust::Identifier;

use crate::api::{
CancelOrderCommand, CreateOrderCommand, CreateShipmentCommand, OrderCancelledEvent,
Expand All @@ -16,7 +17,6 @@ use crate::api::{
};
use crate::application::{
command_from_sum, event_from_sum, sum_to_command, sum_to_event, AggregateError, Command, Event,
Id,
};

mod api;
Expand Down Expand Up @@ -44,7 +44,7 @@ impl EventRepository<Command, Event, i32, AggregateError> for InMemoryEventRepos
.unwrap()
.clone()
.into_iter()
.filter(|(event, _)| event.id() == command.id())
.filter(|(event, _)| event.identifier() == command.identifier())
.collect())
}

Expand Down Expand Up @@ -75,7 +75,7 @@ impl EventRepository<Command, Event, i32, AggregateError> for InMemoryEventRepos
.unwrap()
.clone()
.into_iter()
.filter(|(e, _)| e.id() == event.id())
.filter(|(e, _)| e.identifier() == event.identifier())
.map(|(_, version)| version)
.last())
}
Expand All @@ -102,7 +102,12 @@ impl StateRepository<Command, (OrderState, ShipmentState), i32, AggregateError>
&self,
command: &Command,
) -> Result<Option<((OrderState, ShipmentState), i32)>, AggregateError> {
Ok(self.states.lock().unwrap().get(&command.id()).cloned())
Ok(self
.states
.lock()
.unwrap()
.get(&command.identifier().parse::<u32>().unwrap())
.cloned())
}

async fn save(
Expand All @@ -114,7 +119,7 @@ impl StateRepository<Command, (OrderState, ShipmentState), i32, AggregateError>
self.states
.lock()
.unwrap()
.insert(state.id(), (state.clone(), version + 1));
.insert(state.0.order_id, (state.clone(), version + 1));
Ok((state.clone(), version))
}
}
Expand Down
12 changes: 9 additions & 3 deletions tests/aggregate_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use fmodel_rust::aggregate::{
EventRepository, EventSourcedAggregate, StateRepository, StateStoredAggregate,
};
use fmodel_rust::decider::Decider;
use fmodel_rust::Identifier;

use crate::api::{
CancelOrderCommand, CreateOrderCommand, OrderCancelledEvent, OrderCommand, OrderCreatedEvent,
Expand Down Expand Up @@ -43,7 +44,7 @@ impl EventRepository<OrderCommand, OrderEvent, i32, AggregateError>
.unwrap()
.clone()
.into_iter()
.filter(|(event, _)| event.id() == command.id())
.filter(|(event, _)| event.identifier() == command.identifier())
.collect())
}

Expand Down Expand Up @@ -74,7 +75,7 @@ impl EventRepository<OrderCommand, OrderEvent, i32, AggregateError>
.unwrap()
.clone()
.into_iter()
.filter(|(e, _)| e.id() == event.id())
.filter(|(e, _)| e.identifier() == event.identifier())
.map(|(_, version)| version)
.last())
}
Expand All @@ -100,7 +101,12 @@ impl StateRepository<OrderCommand, OrderState, i32, AggregateError>
&self,
command: &OrderCommand,
) -> Result<Option<(OrderState, i32)>, AggregateError> {
Ok(self.states.lock().unwrap().get(&command.id()).cloned())
Ok(self
.states
.lock()
.unwrap()
.get(&command.identifier().parse::<u32>().unwrap())
.cloned())
}

async fn save(
Expand Down
29 changes: 15 additions & 14 deletions tests/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// ############################ Order API ############################
// ###################################################################

use fmodel_rust::Identifier;

/// The state of the Order entity
#[derive(Debug, Clone, PartialEq)]
pub struct OrderState {
Expand Down Expand Up @@ -47,14 +49,13 @@ pub struct CancelOrderCommand {
pub order_id: u32,
}

/// Provides a way to get the id of the Order commands
impl OrderCommand {
impl Identifier for OrderCommand {
#[allow(dead_code)]
pub fn id(&self) -> u32 {
fn identifier(&self) -> String {
match self {
OrderCommand::Create(c) => c.order_id.to_owned(),
OrderCommand::Update(c) => c.order_id.to_owned(),
OrderCommand::Cancel(c) => c.order_id.to_owned(),
OrderCommand::Create(c) => c.order_id.to_string(),
OrderCommand::Update(c) => c.order_id.to_string(),
OrderCommand::Cancel(c) => c.order_id.to_string(),
}
}
}
Expand Down Expand Up @@ -87,13 +88,13 @@ pub struct OrderCancelledEvent {
}

/// Provides a way to get the id of the Order events
impl OrderEvent {
impl Identifier for OrderEvent {
#[allow(dead_code)]
pub fn id(&self) -> u32 {
fn identifier(&self) -> String {
match self {
OrderEvent::Created(c) => c.order_id.to_owned(),
OrderEvent::Updated(c) => c.order_id.to_owned(),
OrderEvent::Cancelled(c) => c.order_id.to_owned(),
OrderEvent::Created(c) => c.order_id.to_string(),
OrderEvent::Updated(c) => c.order_id.to_string(),
OrderEvent::Cancelled(c) => c.order_id.to_string(),
}
}
}
Expand Down Expand Up @@ -136,11 +137,11 @@ pub struct CreateShipmentCommand {
}

/// Provides a way to get the id of the Shipment commands
impl ShipmentCommand {
impl Identifier for ShipmentCommand {
#[allow(dead_code)]
pub fn id(&self) -> u32 {
fn identifier(&self) -> String {
match self {
ShipmentCommand::Create(c) => c.shipment_id.to_owned(),
ShipmentCommand::Create(c) => c.shipment_id.to_string(),
}
}
}
Expand Down
49 changes: 15 additions & 34 deletions tests/application/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use derive_more::Display;
use fmodel_rust::Sum;
use fmodel_rust::{Identifier, Sum};
use std::error::Error;

use crate::api::{
CancelOrderCommand, CreateOrderCommand, CreateShipmentCommand, OrderCancelledEvent,
OrderCommand, OrderCreatedEvent, OrderEvent, OrderState, OrderUpdatedEvent, OrderViewState,
ShipmentCommand, ShipmentCreatedEvent, ShipmentEvent, ShipmentState, ShipmentViewState,
UpdateOrderCommand,
OrderCommand, OrderCreatedEvent, OrderEvent, OrderUpdatedEvent, ShipmentCommand,
ShipmentCreatedEvent, ShipmentEvent, UpdateOrderCommand,
};

/// The command enum for all the domain commands (shipment and order)
Expand Down Expand Up @@ -105,46 +104,28 @@ pub fn sum_to_event(event: &Sum<OrderEvent, ShipmentEvent>) -> Event {
}
}

/// A trait to provide a way to get the id of the messages/entities
#[allow(dead_code)]
pub trait Id {
fn id(&self) -> u32;
}

impl Id for Event {
fn id(&self) -> u32 {
impl Identifier for Event {
fn identifier(&self) -> String {
match self {
Event::OrderCreated(event) => event.order_id,
Event::OrderCancelled(event) => event.order_id,
Event::OrderUpdated(event) => event.order_id,
Event::ShipmentCreated(event) => event.shipment_id,
Event::ShipmentCreated(evt) => evt.shipment_id.to_string(),
Event::OrderCreated(evt) => evt.order_id.to_string(),
Event::OrderUpdated(evt) => evt.order_id.to_string(),
Event::OrderCancelled(evt) => evt.order_id.to_string(),
}
}
}

impl Id for Command {
fn id(&self) -> u32 {
impl Identifier for Command {
fn identifier(&self) -> String {
match self {
Command::OrderCreate(cmd) => cmd.order_id,
Command::OrderUpdate(cmd) => cmd.order_id,
Command::OrderCancel(cmd) => cmd.order_id,
Command::ShipmentCreate(cmd) => cmd.shipment_id,
Command::OrderCreate(cmd) => cmd.order_id.to_string(),
Command::OrderUpdate(cmd) => cmd.order_id.to_string(),
Command::OrderCancel(cmd) => cmd.order_id.to_string(),
Command::ShipmentCreate(cmd) => cmd.shipment_id.to_string(),
}
}
}

impl Id for (OrderState, ShipmentState) {
fn id(&self) -> u32 {
self.0.order_id
}
}

impl Id for (OrderViewState, ShipmentViewState) {
fn id(&self) -> u32 {
self.0.order_id
}
}

/// Error type for the application/aggregate
#[derive(Debug, Display)]
#[allow(dead_code)]
Expand Down
12 changes: 9 additions & 3 deletions tests/materialized_view_combined_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ use std::thread;

use fmodel_rust::materialized_view::{MaterializedView, ViewStateRepository};
use fmodel_rust::view::View;
use fmodel_rust::Identifier;

use crate::api::{
OrderCancelledEvent, OrderCreatedEvent, OrderEvent, OrderUpdatedEvent, OrderViewState,
ShipmentEvent, ShipmentViewState,
};
use crate::application::{event_from_sum, Event, Id, MaterializedViewError};
use crate::application::{event_from_sum, Event, MaterializedViewError};

mod api;
mod application;
Expand Down Expand Up @@ -85,7 +86,12 @@ impl ViewStateRepository<Event, (OrderViewState, ShipmentViewState), Materialize
&self,
event: &Event,
) -> Result<Option<(OrderViewState, ShipmentViewState)>, MaterializedViewError> {
Ok(self.states.lock().unwrap().get(&event.id()).cloned())
Ok(self
.states
.lock()
.unwrap()
.get(&event.identifier().parse::<u32>().unwrap())
.cloned())
}

async fn save(
Expand All @@ -95,7 +101,7 @@ impl ViewStateRepository<Event, (OrderViewState, ShipmentViewState), Materialize
self.states
.lock()
.unwrap()
.insert(state.id(), state.clone());
.insert(state.0.order_id, state.clone());
Ok(state.clone())
}
}
Expand Down
8 changes: 7 additions & 1 deletion tests/materialized_view_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::thread;

use fmodel_rust::materialized_view::{MaterializedView, ViewStateRepository};
use fmodel_rust::view::View;
use fmodel_rust::Identifier;

use crate::api::{
OrderCancelledEvent, OrderCreatedEvent, OrderEvent, OrderUpdatedEvent, OrderViewState,
Expand Down Expand Up @@ -61,7 +62,12 @@ impl ViewStateRepository<OrderEvent, OrderViewState, MaterializedViewError>
&self,
event: &OrderEvent,
) -> Result<Option<OrderViewState>, MaterializedViewError> {
Ok(self.states.lock().unwrap().get(&event.id()).cloned())
Ok(self
.states
.lock()
.unwrap()
.get(&event.identifier().parse::<u32>().unwrap())
.cloned())
}

async fn save(&self, state: &OrderViewState) -> Result<OrderViewState, MaterializedViewError> {
Expand Down

0 comments on commit f7a4faf

Please sign in to comment.