Skip to content

Commit b6f3397

Browse files
committed
Continue LiveNode in Rust
1 parent a638d9d commit b6f3397

File tree

2 files changed

+37
-15
lines changed

2 files changed

+37
-15
lines changed

crates/live/src/node.rs

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
#![allow(dead_code)]
1818
#![allow(unused_variables)]
1919

20-
use std::collections::HashMap;
20+
use std::{cell::RefCell, collections::HashMap, rc::Rc};
2121

22-
use nautilus_common::enums::Environment;
22+
use nautilus_common::{clock::LiveClock, component::Component, enums::Environment};
2323
use nautilus_core::UUID4;
2424
use nautilus_model::identifiers::TraderId;
2525
use nautilus_system::{
@@ -28,17 +28,15 @@ use nautilus_system::{
2828
kernel::NautilusKernel,
2929
};
3030

31-
use crate::{
32-
config::LiveNodeConfig,
33-
runner::{AsyncRunner, Runner},
34-
};
31+
use crate::{config::LiveNodeConfig, runner::AsyncRunner};
3532

3633
/// High-level abstraction for a live Nautilus system node.
3734
///
3835
/// Provides a simplified interface for running live systems
3936
/// with automatic client management and lifecycle handling.
4037
#[derive(Debug)]
4138
pub struct LiveNode {
39+
clock: Rc<RefCell<LiveClock>>,
4240
kernel: NautilusKernel,
4341
runner: AsyncRunner,
4442
config: LiveNodeConfig,
@@ -80,12 +78,14 @@ impl LiveNode {
8078
}
8179
}
8280

81+
let clock = Rc::new(RefCell::new(LiveClock::new()));
8382
let kernel = NautilusKernel::new(name, config.clone())?;
84-
let runner = AsyncRunner::new();
83+
let runner = AsyncRunner::new(clock.clone());
8584

8685
log::info!("LiveNode built successfully with kernel config");
8786

8887
Ok(Self {
88+
clock,
8989
kernel,
9090
runner,
9191
config,
@@ -182,6 +182,28 @@ impl LiveNode {
182182
pub const fn is_running(&self) -> bool {
183183
self.is_running
184184
}
185+
186+
/// Adds an actor to the trader.
187+
///
188+
/// This method provides a high-level interface for adding actors to the underlying
189+
/// trader without requiring direct access to the kernel. Actors should be added
190+
/// after the node is built but before starting the node.
191+
///
192+
/// # Errors
193+
///
194+
/// Returns an error if:
195+
/// - The trader is not in a valid state for adding components.
196+
/// - An actor with the same ID is already registered.
197+
/// - The node is currently running.
198+
pub fn add_actor(&mut self, actor: Box<dyn Component>) -> anyhow::Result<()> {
199+
if self.is_running {
200+
anyhow::bail!(
201+
"Cannot add actor while node is running. Add actors before calling start()."
202+
);
203+
}
204+
205+
self.kernel.trader.add_actor(actor)
206+
}
185207
}
186208

187209
/// Builder for constructing a [`LiveNode`] with a fluent API.
@@ -357,10 +379,12 @@ impl LiveNodeBuilder {
357379
log::info!("LiveNode built successfully");
358380

359381
// Create kernel directly with the config
382+
let clock = Rc::new(RefCell::new(LiveClock::new()));
360383
let kernel = NautilusKernel::new("LiveNode".to_string(), self.config.clone())?;
361-
let runner = AsyncRunner::new();
384+
let runner = AsyncRunner::new(clock.clone());
362385

363386
Ok(LiveNode {
387+
clock,
364388
kernel,
365389
runner,
366390
config: self.config,

crates/live/src/runner.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use futures::StreamExt;
1919
use nautilus_common::{
2020
clock::{Clock, LiveClock},
2121
messages::DataEvent,
22-
runner::{DataQueue, RunnerEvent, get_data_cmd_queue, set_data_evt_queue, set_global_clock},
22+
runner::{DataQueue, RunnerEvent, get_data_cmd_queue, set_data_evt_queue},
2323
runtime::get_runtime,
2424
};
2525
use nautilus_data::engine::DataEngine;
@@ -43,7 +43,6 @@ impl DataQueue for AsyncDataQueue {
4343

4444
// TODO: Use message bus instead of direct reference to DataEngine
4545
pub trait Runner {
46-
fn new() -> Self;
4746
fn run(&mut self, data_engine: &mut DataEngine);
4847
}
4948

@@ -60,17 +59,16 @@ impl Debug for AsyncRunner {
6059
}
6160
}
6261

63-
impl Runner for AsyncRunner {
64-
fn new() -> Self {
62+
impl AsyncRunner {
63+
pub fn new(clock: Rc<RefCell<LiveClock>>) -> Self {
6564
let (data_tx, data_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
6665
set_data_evt_queue(Rc::new(RefCell::new(AsyncDataQueue(data_tx))));
6766

68-
let clock = Rc::new(RefCell::new(LiveClock::new()));
69-
set_global_clock(clock.clone());
70-
7167
Self { clock, data_rx }
7268
}
69+
}
7370

71+
impl Runner for AsyncRunner {
7472
fn run(&mut self, data_engine: &mut DataEngine) {
7573
let mut time_event_stream = self.clock.borrow().get_event_stream();
7674
let data_cmd_queue = get_data_cmd_queue();

0 commit comments

Comments
 (0)