diff --git a/src/aggregate.rs b/src/aggregate.rs index 22f2461..6477430 100644 --- a/src/aggregate.rs +++ b/src/aggregate.rs @@ -3,6 +3,7 @@ use std::marker::PhantomData; use crate::decider::{Decider, EventComputation, StateComputation}; use crate::saga::{ActionComputation, Saga}; +use crate::Identifier; /// Event Repository trait /// @@ -316,7 +317,11 @@ where } } /// Handles the command by fetching the events from the repository, computing new events based on the current events and the command, and saving the new events to the repository. - pub async fn handle(&self, command: &C) -> Result, Error> { + pub async fn handle(&self, command: &C) -> Result, Error> + where + E: Identifier, + C: Identifier, + { let events: Vec<(E, Version)> = self.fetch_events(command).await?; let mut current_events: Vec = vec![]; for (event, _) in events { @@ -336,7 +341,11 @@ where &self, current_events: &[E], command: &C, - ) -> Result, Error> { + ) -> Result, Error> + where + E: Identifier, + C: Identifier, + { let current_state: S = current_events .iter() .fold((self.decider.initial_state)(), |state, event| { @@ -361,7 +370,11 @@ where .iter() .map(|(e, _)| e.clone()) .collect::>(), - initial_events.clone(), + initial_events + .clone() + .into_iter() + .filter(|e| e.identifier() == command.identifier()) + .collect::>(), ] .concat(); diff --git a/src/lib.rs b/src/lib.rs index 82e38ec..6890301 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -340,3 +340,10 @@ pub enum Sum { /// Second variant Second(B), } + +/// Identify the state/command/event. +/// It is used to identify the concept to what the state/command/event belongs to. For example, the `order_id` or `restaurant_id`. +pub trait Identifier { + /// Returns the identifier of the state/command/event + fn identifier(&self) -> String; +} diff --git a/tests/aggregate_combined_test.rs b/tests/aggregate_combined_test.rs index 0fb5c7c..fc4f711 100644 --- a/tests/aggregate_combined_test.rs +++ b/tests/aggregate_combined_test.rs @@ -8,6 +8,7 @@ use fmodel_rust::aggregate::{ }; use fmodel_rust::decider::Decider; use fmodel_rust::saga::Saga; +use fmodel_rust::Identifier; use crate::api::{ CancelOrderCommand, CreateOrderCommand, CreateShipmentCommand, OrderCancelledEvent, @@ -16,7 +17,6 @@ use crate::api::{ }; use crate::application::{ command_from_sum, event_from_sum, sum_to_command, sum_to_event, AggregateError, Command, Event, - Id, }; mod api; @@ -44,7 +44,7 @@ impl EventRepository for InMemoryEventRepos .unwrap() .clone() .into_iter() - .filter(|(event, _)| event.id() == command.id()) + .filter(|(event, _)| event.identifier() == command.identifier()) .collect()) } @@ -75,7 +75,7 @@ impl EventRepository for InMemoryEventRepos .unwrap() .clone() .into_iter() - .filter(|(e, _)| e.id() == event.id()) + .filter(|(e, _)| e.identifier() == event.identifier()) .map(|(_, version)| version) .last()) } @@ -102,7 +102,12 @@ impl StateRepository &self, command: &Command, ) -> Result, AggregateError> { - Ok(self.states.lock().unwrap().get(&command.id()).cloned()) + Ok(self + .states + .lock() + .unwrap() + .get(&command.identifier().parse::().unwrap()) + .cloned()) } async fn save( @@ -114,7 +119,7 @@ impl StateRepository self.states .lock() .unwrap() - .insert(state.id(), (state.clone(), version + 1)); + .insert(state.0.order_id, (state.clone(), version + 1)); Ok((state.clone(), version)) } } diff --git a/tests/aggregate_test.rs b/tests/aggregate_test.rs index dad6fc5..6a8861c 100644 --- a/tests/aggregate_test.rs +++ b/tests/aggregate_test.rs @@ -6,6 +6,7 @@ use fmodel_rust::aggregate::{ EventRepository, EventSourcedAggregate, StateRepository, StateStoredAggregate, }; use fmodel_rust::decider::Decider; +use fmodel_rust::Identifier; use crate::api::{ CancelOrderCommand, CreateOrderCommand, OrderCancelledEvent, OrderCommand, OrderCreatedEvent, @@ -43,7 +44,7 @@ impl EventRepository .unwrap() .clone() .into_iter() - .filter(|(event, _)| event.id() == command.id()) + .filter(|(event, _)| event.identifier() == command.identifier()) .collect()) } @@ -74,7 +75,7 @@ impl EventRepository .unwrap() .clone() .into_iter() - .filter(|(e, _)| e.id() == event.id()) + .filter(|(e, _)| e.identifier() == event.identifier()) .map(|(_, version)| version) .last()) } @@ -100,7 +101,12 @@ impl StateRepository &self, command: &OrderCommand, ) -> Result, AggregateError> { - Ok(self.states.lock().unwrap().get(&command.id()).cloned()) + Ok(self + .states + .lock() + .unwrap() + .get(&command.identifier().parse::().unwrap()) + .cloned()) } async fn save( diff --git a/tests/api/mod.rs b/tests/api/mod.rs index c7d2adb..a47aadc 100644 --- a/tests/api/mod.rs +++ b/tests/api/mod.rs @@ -2,6 +2,8 @@ // ############################ Order API ############################ // ################################################################### +use fmodel_rust::Identifier; + /// The state of the Order entity #[derive(Debug, Clone, PartialEq)] pub struct OrderState { @@ -47,14 +49,13 @@ pub struct CancelOrderCommand { pub order_id: u32, } -/// Provides a way to get the id of the Order commands -impl OrderCommand { +impl Identifier for OrderCommand { #[allow(dead_code)] - pub fn id(&self) -> u32 { + fn identifier(&self) -> String { match self { - OrderCommand::Create(c) => c.order_id.to_owned(), - OrderCommand::Update(c) => c.order_id.to_owned(), - OrderCommand::Cancel(c) => c.order_id.to_owned(), + OrderCommand::Create(c) => c.order_id.to_string(), + OrderCommand::Update(c) => c.order_id.to_string(), + OrderCommand::Cancel(c) => c.order_id.to_string(), } } } @@ -87,13 +88,13 @@ pub struct OrderCancelledEvent { } /// Provides a way to get the id of the Order events -impl OrderEvent { +impl Identifier for OrderEvent { #[allow(dead_code)] - pub fn id(&self) -> u32 { + fn identifier(&self) -> String { match self { - OrderEvent::Created(c) => c.order_id.to_owned(), - OrderEvent::Updated(c) => c.order_id.to_owned(), - OrderEvent::Cancelled(c) => c.order_id.to_owned(), + OrderEvent::Created(c) => c.order_id.to_string(), + OrderEvent::Updated(c) => c.order_id.to_string(), + OrderEvent::Cancelled(c) => c.order_id.to_string(), } } } @@ -136,11 +137,11 @@ pub struct CreateShipmentCommand { } /// Provides a way to get the id of the Shipment commands -impl ShipmentCommand { +impl Identifier for ShipmentCommand { #[allow(dead_code)] - pub fn id(&self) -> u32 { + fn identifier(&self) -> String { match self { - ShipmentCommand::Create(c) => c.shipment_id.to_owned(), + ShipmentCommand::Create(c) => c.shipment_id.to_string(), } } } diff --git a/tests/application/mod.rs b/tests/application/mod.rs index 3eaa7fd..3eaf68d 100644 --- a/tests/application/mod.rs +++ b/tests/application/mod.rs @@ -1,12 +1,11 @@ use derive_more::Display; -use fmodel_rust::Sum; +use fmodel_rust::{Identifier, Sum}; use std::error::Error; use crate::api::{ CancelOrderCommand, CreateOrderCommand, CreateShipmentCommand, OrderCancelledEvent, - OrderCommand, OrderCreatedEvent, OrderEvent, OrderState, OrderUpdatedEvent, OrderViewState, - ShipmentCommand, ShipmentCreatedEvent, ShipmentEvent, ShipmentState, ShipmentViewState, - UpdateOrderCommand, + OrderCommand, OrderCreatedEvent, OrderEvent, OrderUpdatedEvent, ShipmentCommand, + ShipmentCreatedEvent, ShipmentEvent, UpdateOrderCommand, }; /// The command enum for all the domain commands (shipment and order) @@ -105,46 +104,28 @@ pub fn sum_to_event(event: &Sum) -> Event { } } -/// A trait to provide a way to get the id of the messages/entities -#[allow(dead_code)] -pub trait Id { - fn id(&self) -> u32; -} - -impl Id for Event { - fn id(&self) -> u32 { +impl Identifier for Event { + fn identifier(&self) -> String { match self { - Event::OrderCreated(event) => event.order_id, - Event::OrderCancelled(event) => event.order_id, - Event::OrderUpdated(event) => event.order_id, - Event::ShipmentCreated(event) => event.shipment_id, + Event::ShipmentCreated(evt) => evt.shipment_id.to_string(), + Event::OrderCreated(evt) => evt.order_id.to_string(), + Event::OrderUpdated(evt) => evt.order_id.to_string(), + Event::OrderCancelled(evt) => evt.order_id.to_string(), } } } -impl Id for Command { - fn id(&self) -> u32 { +impl Identifier for Command { + fn identifier(&self) -> String { match self { - Command::OrderCreate(cmd) => cmd.order_id, - Command::OrderUpdate(cmd) => cmd.order_id, - Command::OrderCancel(cmd) => cmd.order_id, - Command::ShipmentCreate(cmd) => cmd.shipment_id, + Command::OrderCreate(cmd) => cmd.order_id.to_string(), + Command::OrderUpdate(cmd) => cmd.order_id.to_string(), + Command::OrderCancel(cmd) => cmd.order_id.to_string(), + Command::ShipmentCreate(cmd) => cmd.shipment_id.to_string(), } } } -impl Id for (OrderState, ShipmentState) { - fn id(&self) -> u32 { - self.0.order_id - } -} - -impl Id for (OrderViewState, ShipmentViewState) { - fn id(&self) -> u32 { - self.0.order_id - } -} - /// Error type for the application/aggregate #[derive(Debug, Display)] #[allow(dead_code)] diff --git a/tests/materialized_view_combined_test.rs b/tests/materialized_view_combined_test.rs index 8e5eff0..fe80960 100644 --- a/tests/materialized_view_combined_test.rs +++ b/tests/materialized_view_combined_test.rs @@ -4,12 +4,13 @@ use std::thread; use fmodel_rust::materialized_view::{MaterializedView, ViewStateRepository}; use fmodel_rust::view::View; +use fmodel_rust::Identifier; use crate::api::{ OrderCancelledEvent, OrderCreatedEvent, OrderEvent, OrderUpdatedEvent, OrderViewState, ShipmentEvent, ShipmentViewState, }; -use crate::application::{event_from_sum, Event, Id, MaterializedViewError}; +use crate::application::{event_from_sum, Event, MaterializedViewError}; mod api; mod application; @@ -85,7 +86,12 @@ impl ViewStateRepository Result, MaterializedViewError> { - Ok(self.states.lock().unwrap().get(&event.id()).cloned()) + Ok(self + .states + .lock() + .unwrap() + .get(&event.identifier().parse::().unwrap()) + .cloned()) } async fn save( @@ -95,7 +101,7 @@ impl ViewStateRepository &self, event: &OrderEvent, ) -> Result, MaterializedViewError> { - Ok(self.states.lock().unwrap().get(&event.id()).cloned()) + Ok(self + .states + .lock() + .unwrap() + .get(&event.identifier().parse::().unwrap()) + .cloned()) } async fn save(&self, state: &OrderViewState) -> Result {