Skip to content

Commit 72f01d7

Browse files
committed
Add subscriber actor for databento crate
1 parent b6f3397 commit 72f01d7

File tree

2 files changed

+221
-0
lines changed

2 files changed

+221
-0
lines changed
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
// -------------------------------------------------------------------------------------------------
2+
// Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3+
// https://nautechsystems.io
4+
//
5+
// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6+
// You may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
// -------------------------------------------------------------------------------------------------
15+
16+
//! Basic Databento subscriber actor implementation for quotes and trades.
17+
18+
use std::{
19+
any::Any,
20+
cell::RefCell,
21+
ops::{Deref, DerefMut},
22+
rc::Rc,
23+
};
24+
25+
use nautilus_common::{
26+
actor::{Actor, DataActor, DataActorCore, data_actor::DataActorConfig},
27+
cache::Cache,
28+
clock::Clock,
29+
component::Component,
30+
enums::{ComponentState, ComponentTrigger},
31+
timer::TimeEvent,
32+
};
33+
use nautilus_model::{
34+
data::{QuoteTick, TradeTick},
35+
identifiers::{ClientId, ComponentId, InstrumentId},
36+
};
37+
use ustr::Ustr;
38+
39+
/// Configuration for the Databento subscriber actor.
40+
#[derive(Debug, Clone)]
41+
pub struct DatabentoSubscriberActorConfig {
42+
/// Base data actor configuration.
43+
pub base: DataActorConfig,
44+
/// Instrument IDs to subscribe to.
45+
pub instrument_ids: Vec<InstrumentId>,
46+
/// Client ID to use for subscriptions.
47+
pub client_id: ClientId,
48+
}
49+
50+
impl DatabentoSubscriberActorConfig {
51+
/// Creates a new [`DatabentoSubscriberActorConfig`] instance.
52+
#[must_use]
53+
pub fn new(instrument_ids: Vec<InstrumentId>, client_id: ClientId) -> Self {
54+
Self {
55+
base: DataActorConfig::default(),
56+
instrument_ids,
57+
client_id,
58+
}
59+
}
60+
}
61+
62+
/// A basic Databento subscriber actor that subscribes to quotes and trades.
63+
///
64+
/// This actor demonstrates how to use the DataActor trait to subscribe to market data
65+
/// from Databento for specified instruments. It logs received quotes and trades to
66+
/// demonstrate the data flow.
67+
#[derive(Debug)]
68+
pub struct DatabentoSubscriberActor {
69+
core: DataActorCore,
70+
config: DatabentoSubscriberActorConfig,
71+
pub received_quotes: Vec<QuoteTick>,
72+
pub received_trades: Vec<TradeTick>,
73+
}
74+
75+
impl Deref for DatabentoSubscriberActor {
76+
type Target = DataActorCore;
77+
78+
fn deref(&self) -> &Self::Target {
79+
&self.core
80+
}
81+
}
82+
83+
impl DerefMut for DatabentoSubscriberActor {
84+
fn deref_mut(&mut self) -> &mut Self::Target {
85+
&mut self.core
86+
}
87+
}
88+
89+
impl Actor for DatabentoSubscriberActor {
90+
fn id(&self) -> Ustr {
91+
self.core.actor_id.inner()
92+
}
93+
94+
fn handle(&mut self, msg: &dyn Any) {
95+
// Let the core handle message routing
96+
self.core.handle(msg);
97+
}
98+
99+
fn as_any(&self) -> &dyn Any {
100+
self
101+
}
102+
}
103+
104+
impl Component for DatabentoSubscriberActor {
105+
fn id(&self) -> ComponentId {
106+
ComponentId::from(self.core.actor_id.inner().as_str())
107+
}
108+
109+
fn state(&self) -> ComponentState {
110+
self.core.state()
111+
}
112+
113+
fn trigger(&self) -> ComponentTrigger {
114+
ComponentTrigger::Initialize
115+
}
116+
117+
fn is_running(&self) -> bool {
118+
matches!(self.core.state(), ComponentState::Running)
119+
}
120+
121+
fn is_stopped(&self) -> bool {
122+
matches!(self.core.state(), ComponentState::Stopped)
123+
}
124+
125+
fn is_disposed(&self) -> bool {
126+
matches!(self.core.state(), ComponentState::Disposed)
127+
}
128+
129+
fn start(&mut self) -> anyhow::Result<()> {
130+
self.core.start()
131+
}
132+
133+
fn stop(&mut self) -> anyhow::Result<()> {
134+
self.core.stop()
135+
}
136+
137+
fn reset(&mut self) -> anyhow::Result<()> {
138+
self.core.reset()
139+
}
140+
141+
fn dispose(&mut self) -> anyhow::Result<()> {
142+
self.core.dispose()
143+
}
144+
145+
fn handle_event(&mut self, _event: TimeEvent) {
146+
// No-op for now
147+
}
148+
}
149+
150+
impl DataActor for DatabentoSubscriberActor {
151+
fn state(&self) -> ComponentState {
152+
self.core.state()
153+
}
154+
155+
fn on_start(&mut self) -> anyhow::Result<()> {
156+
log::info!(
157+
"Starting Databento subscriber actor for {} instruments",
158+
self.config.instrument_ids.len()
159+
);
160+
161+
// Clone config values to avoid borrowing issues
162+
let instrument_ids = self.config.instrument_ids.clone();
163+
let client_id = self.config.client_id;
164+
165+
// Subscribe to quotes and trades for each instrument
166+
for instrument_id in instrument_ids {
167+
log::info!("Subscribing to quotes for {instrument_id}");
168+
self.subscribe_quotes::<DatabentoSubscriberActor>(instrument_id, Some(client_id), None);
169+
170+
log::info!("Subscribing to trades for {instrument_id}");
171+
self.subscribe_trades::<DatabentoSubscriberActor>(instrument_id, Some(client_id), None);
172+
}
173+
174+
log::info!("Databento subscriber actor started successfully");
175+
Ok(())
176+
}
177+
178+
fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
179+
log::info!("Received quote: {quote:?}");
180+
self.received_quotes.push(*quote);
181+
Ok(())
182+
}
183+
184+
fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
185+
log::info!("Received trade: {trade:?}");
186+
self.received_trades.push(*trade);
187+
Ok(())
188+
}
189+
}
190+
191+
impl DatabentoSubscriberActor {
192+
/// Creates a new [`DatabentoSubscriberActor`] instance.
193+
#[must_use]
194+
pub fn new(
195+
config: DatabentoSubscriberActorConfig,
196+
cache: Rc<RefCell<Cache>>,
197+
clock: Rc<RefCell<dyn Clock>>,
198+
) -> Self {
199+
Self {
200+
core: DataActorCore::new(config.base.clone(), cache, clock),
201+
config,
202+
received_quotes: Vec::new(),
203+
received_trades: Vec::new(),
204+
}
205+
}
206+
207+
/// Returns the number of quotes received by this actor.
208+
#[must_use]
209+
pub fn quote_count(&self) -> usize {
210+
self.received_quotes.len()
211+
}
212+
213+
/// Returns the number of trades received by this actor.
214+
#[must_use]
215+
pub fn trade_count(&self) -> usize {
216+
self.received_trades.len()
217+
}
218+
}

crates/adapters/databento/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,6 @@ pub mod factories;
5656

5757
#[cfg(feature = "live")]
5858
pub mod live;
59+
60+
#[cfg(feature = "live")]
61+
pub mod actor;

0 commit comments

Comments
 (0)