Skip to content

Commit 8aeaa88

Browse files
committed
Continue DataActor in Rust
1 parent 0bbe8fa commit 8aeaa88

File tree

7 files changed

+142
-11
lines changed

7 files changed

+142
-11
lines changed

crates/common/src/actor/data_actor.rs

+22-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ use crate::{
5050
cache::Cache,
5151
clock::Clock,
5252
enums::{ComponentState, ComponentTrigger},
53-
logging::{CMD, RECV, SENT},
53+
logging::{CMD, RECV, REQ, SEND},
5454
messages::{
5555
data::{
5656
DataCommand, DataResponse, RequestBars, RequestBookSnapshot, RequestCommand,
@@ -731,13 +731,33 @@ impl DataActorCore {
731731

732732
fn send_data_cmd(&self, command: DataCommand) {
733733
if self.config.log_commands {
734-
log::info!("{CMD}{SENT} {command:?}");
734+
log::info!("{CMD}{SEND} {command:?}");
735735
}
736736

737737
let endpoint = MessagingSwitchboard::data_engine_execute();
738738
msgbus::send(&endpoint, command.as_any())
739739
}
740740

741+
fn send_data_req<A: DataActor>(&self, request: RequestCommand) {
742+
if self.config.log_commands {
743+
log::info!("{REQ}{SEND} {request:?}");
744+
}
745+
746+
let actor_id = self.actor_id.inner();
747+
let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
748+
move |response: &DataResponse| {
749+
get_actor_unchecked::<A>(&actor_id).handle_data_response(response);
750+
},
751+
)));
752+
753+
let msgbus = get_message_bus()
754+
.borrow_mut()
755+
.register_response_handler(request.request_id(), handler);
756+
757+
let endpoint = MessagingSwitchboard::data_engine_execute();
758+
msgbus::send(&endpoint, request.as_any())
759+
}
760+
741761
pub fn start(&mut self) -> anyhow::Result<()> {
742762
self.transition_state(ComponentTrigger::Start)?; // -> Starting
743763

crates/common/src/logging/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use self::{
3939
use crate::enums::LogLevel;
4040

4141
pub const RECV: &str = "<--";
42-
pub const SENT: &str = "-->";
42+
pub const SEND: &str = "-->";
4343
pub const CMD: &str = "[CMD]";
4444
pub const EVT: &str = "[EVT]";
4545
pub const DOC: &str = "[DOC]";

crates/common/src/messages/data.rs

+17
Original file line numberDiff line numberDiff line change
@@ -1008,6 +1008,23 @@ pub enum RequestCommand {
10081008
}
10091009

10101010
impl RequestCommand {
1011+
/// Converts the command to a dyn Any trait object for messaging.
1012+
pub fn as_any(&self) -> &dyn Any {
1013+
self
1014+
}
1015+
1016+
pub fn request_id(&self) -> &UUID4 {
1017+
match self {
1018+
Self::Data(cmd) => &cmd.request_id,
1019+
Self::Instruments(cmd) => &cmd.request_id,
1020+
Self::Instrument(cmd) => &cmd.request_id,
1021+
Self::BookSnapshot(cmd) => &cmd.request_id,
1022+
Self::Quotes(cmd) => &cmd.request_id,
1023+
Self::Trades(cmd) => &cmd.request_id,
1024+
Self::Bars(cmd) => &cmd.request_id,
1025+
}
1026+
}
1027+
10111028
pub fn client_id(&self) -> Option<&ClientId> {
10121029
match self {
10131030
Self::Data(cmd) => Some(&cmd.client_id),

crates/common/src/msgbus/handler.rs

+14
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,14 @@ pub trait MessageHandler: Any {
3131
fn as_any(&self) -> &dyn Any;
3232
}
3333

34+
impl PartialEq for dyn MessageHandler {
35+
fn eq(&self, other: &Self) -> bool {
36+
self.id() == other.id()
37+
}
38+
}
39+
40+
impl Eq for dyn MessageHandler {}
41+
3442
#[derive(Debug)]
3543
pub struct TypedMessageHandler<T: 'static + ?Sized, F: Fn(&T) + 'static> {
3644
id: Ustr,
@@ -124,6 +132,12 @@ fn generate_deterministic_handler_id<T: 'static + ?Sized, F: 'static + Fn(&T)>(
124132
#[derive(Clone)]
125133
pub struct ShareableMessageHandler(pub Rc<dyn MessageHandler>);
126134

135+
impl ShareableMessageHandler {
136+
pub fn id(&self) -> Ustr {
137+
self.0.id()
138+
}
139+
}
140+
127141
impl Debug for ShareableMessageHandler {
128142
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129143
f.debug_struct(stringify!(ShareableMessageHandler))

crates/common/src/msgbus/mod.rs

+43-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use std::{
3939
sync::OnceLock,
4040
};
4141

42+
use ahash::AHashMap;
4243
use handler::ShareableMessageHandler;
4344
use indexmap::IndexMap;
4445
use nautilus_core::UUID4;
@@ -86,6 +87,21 @@ pub fn send(endpoint: &Ustr, message: &dyn Any) {
8687
}
8788
}
8889

90+
/// Sends the `response` to the handler registered for the `correlation_id` (if found).
91+
pub fn response(correlation_id: &UUID4, message: &dyn Any) {
92+
let handler = get_message_bus()
93+
.borrow()
94+
.get_response_handler(correlation_id)
95+
.cloned();
96+
if let Some(handler) = handler {
97+
handler.0.handle(message);
98+
} else {
99+
log::error!(
100+
"Failed to handle response: handler not found for correlation_id {correlation_id}"
101+
)
102+
}
103+
}
104+
89105
/// Publishes the `message` to the `topic`.
90106
pub fn publish(topic: &Ustr, message: &dyn Any) {
91107
log::trace!("Publishing topic '{topic}' {message:?}");
@@ -303,8 +319,10 @@ pub struct MessageBus {
303319
/// Maps a pattern to all the handlers registered for it
304320
/// this is updated whenever a new subscription is created.
305321
patterns: IndexMap<Ustr, Vec<Subscription>>,
306-
/// Handles a message or a request destined for a specific endpoint.
322+
/// Index of endpoint addresses and their handlers.
307323
endpoints: IndexMap<Ustr, ShareableMessageHandler>,
324+
/// Index of request correlation IDs and their response handlers.
325+
correlation_index: AHashMap<UUID4, ShareableMessageHandler>,
308326
}
309327

310328
// SAFETY: Message bus is not meant to be passed between threads
@@ -328,6 +346,7 @@ impl MessageBus {
328346
subscriptions: IndexMap::new(),
329347
patterns: IndexMap::new(),
330348
endpoints: IndexMap::new(),
349+
correlation_index: AHashMap::new(),
331350
has_backing: false,
332351
}
333352
}
@@ -401,12 +420,19 @@ impl MessageBus {
401420
// TODO: Integrate the backing database
402421
Ok(())
403422
}
423+
404424
/// Returns the handler for the given `endpoint`.
405425
#[must_use]
406426
pub fn get_endpoint<T: AsRef<str>>(&self, endpoint: T) -> Option<&ShareableMessageHandler> {
407427
self.endpoints.get(&Ustr::from(endpoint.as_ref()))
408428
}
409429

430+
/// Returns the handler for the given `correlation_id`.
431+
#[must_use]
432+
pub fn get_response_handler(&self, correlation_id: &UUID4) -> Option<&ShareableMessageHandler> {
433+
self.correlation_index.get(correlation_id)
434+
}
435+
410436
#[must_use]
411437
pub fn matching_subscriptions(&self, pattern: &Ustr) -> Vec<Subscription> {
412438
let mut matching_subs: Vec<Subscription> = Vec::new();
@@ -433,6 +459,22 @@ impl MessageBus {
433459
matching_subs
434460
}
435461

462+
pub fn register_response_handler(
463+
&mut self,
464+
correlation_id: &UUID4,
465+
handler: ShareableMessageHandler,
466+
) -> anyhow::Result<()> {
467+
if self.correlation_index.contains_key(correlation_id) {
468+
return Err(anyhow::anyhow!(
469+
"Correlation ID <{correlation_id}> already has a registered handler",
470+
));
471+
}
472+
473+
self.correlation_index.insert(*correlation_id, handler);
474+
475+
Ok(())
476+
}
477+
436478
fn matching_handlers<'a>(
437479
&'a self,
438480
pattern: &'a Ustr,

crates/common/src/msgbus/tests.rs

+38
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,44 @@ fn test_is_subscribed_when_no_subscriptions() {
5555
assert!(!msgbus.borrow().is_subscribed("my-topic", handler));
5656
}
5757

58+
#[rstest]
59+
fn test_get_response_handler_when_no_handler() {
60+
let msgbus = get_message_bus();
61+
let msgbus_ref = msgbus.borrow();
62+
let handler = msgbus_ref.get_response_handler(&UUID4::new());
63+
assert!(handler.is_none());
64+
}
65+
66+
#[rstest]
67+
fn test_get_response_handler_when_already_registered() {
68+
let msgbus = get_message_bus();
69+
let mut msgbus_ref = msgbus.borrow_mut();
70+
let handler = get_stub_shareable_handler(None);
71+
72+
let request_id = UUID4::new();
73+
msgbus_ref
74+
.register_response_handler(&request_id, handler.clone())
75+
.unwrap();
76+
77+
let result = msgbus_ref.register_response_handler(&request_id, handler.clone());
78+
assert!(result.is_err());
79+
}
80+
81+
#[rstest]
82+
fn test_get_response_handler_when_registered() {
83+
let msgbus = get_message_bus();
84+
let mut msgbus_ref = msgbus.borrow_mut();
85+
let handler = get_stub_shareable_handler(None);
86+
87+
let request_id = UUID4::new();
88+
msgbus_ref
89+
.register_response_handler(&request_id, handler.clone())
90+
.unwrap();
91+
92+
let handler = msgbus_ref.get_response_handler(&request_id).unwrap();
93+
assert_eq!(handler.id(), handler.id());
94+
}
95+
5896
#[rstest]
5997
fn test_is_registered_when_no_registrations() {
6098
let msgbus = get_message_bus();

crates/execution/src/order_manager/manager.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use std::{cell::RefCell, collections::HashMap, rc::Rc};
1818
use nautilus_common::{
1919
cache::Cache,
2020
clock::Clock,
21-
logging::{CMD, EVT, SENT},
21+
logging::{CMD, EVT, SEND},
2222
msgbus::{self},
2323
};
2424
use nautilus_core::UUID4;
@@ -515,13 +515,13 @@ impl OrderManager {
515515

516516
// Message sending methods
517517
pub fn send_emulator_command(&self, command: TradingCommand) {
518-
log::info!("{CMD}{SENT} {command}");
518+
log::info!("{CMD}{SEND} {command}");
519519

520520
msgbus::send(&Ustr::from("OrderEmulator.execute"), &command);
521521
}
522522

523523
pub fn send_algo_command(&self, command: SubmitOrder, exec_algorithm_id: ExecAlgorithmId) {
524-
log::info!("{CMD}{SENT} {command}");
524+
log::info!("{CMD}{SEND} {command}");
525525

526526
let endpoint = format!("{exec_algorithm_id}.execute");
527527
msgbus::send(
@@ -531,22 +531,22 @@ impl OrderManager {
531531
}
532532

533533
pub fn send_risk_command(&self, command: TradingCommand) {
534-
log::info!("{CMD}{SENT} {command}");
534+
log::info!("{CMD}{SEND} {command}");
535535
msgbus::send(&Ustr::from("RiskEngine.execute"), &command);
536536
}
537537

538538
pub fn send_exec_command(&self, command: TradingCommand) {
539-
log::info!("{CMD}{SENT} {command}");
539+
log::info!("{CMD}{SEND} {command}");
540540
msgbus::send(&Ustr::from("ExecEngine.execute"), &command);
541541
}
542542

543543
pub fn send_risk_event(&self, event: OrderEventAny) {
544-
log::info!("{EVT}{SENT} {event}");
544+
log::info!("{EVT}{SEND} {event}");
545545
msgbus::send(&Ustr::from("RiskEngine.process"), &event);
546546
}
547547

548548
pub fn send_exec_event(&self, event: OrderEventAny) {
549-
log::info!("{EVT}{SENT} {event}");
549+
log::info!("{EVT}{SEND} {event}");
550550
msgbus::send(&Ustr::from("ExecEngine.process"), &event);
551551
}
552552
}

0 commit comments

Comments
 (0)