Skip to content

Commit 605fe65

Browse files
committed
Add bars_timestamp_on_close config option for Databento
1 parent 5811d04 commit 605fe65

File tree

14 files changed

+261
-53
lines changed

14 files changed

+261
-53
lines changed

RELEASES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ Released on TBD (UTC).
1717
- Added support for `MarkPriceUpdate` streaming from catalog (#2582), thanks @bartolootrit
1818
- Added support for Binance Futures margin type (#2660), thanks @bartolootrit
1919
- Added support for Binances mark price stream across all markets (#2670), thanks @sunlei
20+
- Added `bars_timestamp_on_close` config option for Databento which defaults to `True` to consistently align with Nautilus conventions
2021
- Added `activation_price` support for trailing stop orders (#2610), thanks @hope2see
2122
- Added trailing stops for OrderFactory bracket orders (#2654), thanks @hope2see
2223
- Added `raise_exception` config option for `BacktestRunConfig` (default `False` to retain current behavior) which will raise exceptions to interrupt a nodes run process

crates/adapters/databento/src/decode.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -843,14 +843,22 @@ pub fn decode_ohlcv_msg(
843843
instrument_id: InstrumentId,
844844
price_precision: u8,
845845
ts_init: Option<UnixNanos>,
846+
timestamp_on_close: bool,
846847
) -> anyhow::Result<Bar> {
847848
let bar_type = decode_bar_type(msg, instrument_id)?;
848849
let ts_event_adjustment = decode_ts_event_adjustment(msg)?;
849850

850-
// Adjust `ts_event` from open to close of bar
851-
let ts_event = msg.hd.ts_event.into();
852-
let ts_init = ts_init.unwrap_or(ts_event);
853-
let ts_init = cmp::max(ts_init, ts_event) + ts_event_adjustment;
851+
let ts_event_raw = msg.hd.ts_event.into();
852+
let ts_init_raw = ts_init.unwrap_or(ts_event_raw);
853+
854+
let (ts_event, ts_init) = if timestamp_on_close {
855+
// Both ts_event and ts_init are set to close time
856+
let ts_close = cmp::max(ts_init_raw, ts_event_raw) + ts_event_adjustment;
857+
(ts_close, ts_close)
858+
} else {
859+
// Both ts_event and ts_init are set to open time
860+
(ts_event_raw, ts_event_raw)
861+
};
854862

855863
let bar = Bar::new(
856864
bar_type,
@@ -905,6 +913,7 @@ pub fn decode_record(
905913
price_precision: u8,
906914
ts_init: Option<UnixNanos>,
907915
include_trades: bool,
916+
bars_timestamp_on_close: bool,
908917
) -> anyhow::Result<(Option<Data>, Option<Data>)> {
909918
// We don't handle `TbboMsg` here as Nautilus separates this schema
910919
// into quotes and trades when loading, and the live client will
@@ -955,7 +964,13 @@ pub fn decode_record(
955964
(Some(Data::from(depth)), None)
956965
} else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
957966
let ts_init = determine_timestamp(ts_init, msg.hd.ts_event.into());
958-
let bar = decode_ohlcv_msg(msg, instrument_id, price_precision, Some(ts_init))?;
967+
let bar = decode_ohlcv_msg(
968+
msg,
969+
instrument_id,
970+
price_precision,
971+
Some(ts_init),
972+
bars_timestamp_on_close,
973+
)?;
959974
(Some(Data::Bar(bar)), None)
960975
} else {
961976
anyhow::bail!("DBN message type is not currently supported")
@@ -1659,7 +1674,7 @@ mod tests {
16591674
let msg = dbn_stream.next().unwrap().unwrap();
16601675

16611676
let instrument_id = InstrumentId::from("ESM4.GLBX");
1662-
let bar = decode_ohlcv_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
1677+
let bar = decode_ohlcv_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
16631678

16641679
assert_eq!(
16651680
bar.bar_type,

crates/adapters/databento/src/live.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,13 @@ pub struct DatabentoFeedHandler {
7676
symbol_venue_map: Arc<RwLock<HashMap<Symbol, Venue>>>,
7777
replay: bool,
7878
use_exchange_as_venue: bool,
79+
bars_timestamp_on_close: bool,
7980
}
8081

8182
impl DatabentoFeedHandler {
8283
/// Creates a new [`DatabentoFeedHandler`] instance.
8384
#[must_use]
85+
#[allow(clippy::too_many_arguments)]
8486
pub const fn new(
8587
key: String,
8688
dataset: String,
@@ -89,6 +91,7 @@ impl DatabentoFeedHandler {
8991
publisher_venue_map: IndexMap<PublisherId, Venue>,
9092
symbol_venue_map: Arc<RwLock<HashMap<Symbol, Venue>>>,
9193
use_exchange_as_venue: bool,
94+
bars_timestamp_on_close: bool,
9295
) -> Self {
9396
Self {
9497
key,
@@ -99,6 +102,7 @@ impl DatabentoFeedHandler {
99102
symbol_venue_map,
100103
replay: false,
101104
use_exchange_as_venue,
105+
bars_timestamp_on_close,
102106
}
103107
}
104108

@@ -319,6 +323,7 @@ impl DatabentoFeedHandler {
319323
&mut instrument_id_map,
320324
ts_init,
321325
&initialized_books,
326+
self.bars_timestamp_on_close,
322327
)
323328
} {
324329
Ok(decoded) => decoded,
@@ -564,6 +569,7 @@ fn handle_statistics_msg(
564569
decode_statistics_msg(msg, instrument_id, price_precision, Some(ts_init))
565570
}
566571

572+
#[allow(clippy::too_many_arguments)]
567573
fn handle_record(
568574
record: dbn::RecordRef,
569575
symbol_map: &PitSymbolMap,
@@ -572,6 +578,7 @@ fn handle_record(
572578
instrument_id_map: &mut HashMap<u32, InstrumentId>,
573579
ts_init: UnixNanos,
574580
initialized_books: &HashSet<InstrumentId>,
581+
bars_timestamp_on_close: bool,
575582
) -> anyhow::Result<(Option<Data>, Option<Data>)> {
576583
let instrument_id = update_instrument_id_map(
577584
&record,
@@ -590,5 +597,6 @@ fn handle_record(
590597
price_precision,
591598
Some(ts_init),
592599
include_trades,
600+
bars_timestamp_on_close,
593601
)
594602
}

crates/adapters/databento/src/loader.rs

Lines changed: 117 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ impl DatabentoDataLoader {
278278
instrument_id: Option<InstrumentId>,
279279
price_precision: Option<u8>,
280280
include_trades: bool,
281+
bars_timestamp_on_close: Option<bool>,
281282
) -> anyhow::Result<impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>> + '_>
282283
where
283284
T: dbn::Record + dbn::HasRType + 'static,
@@ -313,6 +314,7 @@ impl DatabentoDataLoader {
313314
price_precision,
314315
None,
315316
include_trades,
317+
bars_timestamp_on_close.unwrap_or(true),
316318
)?;
317319
Ok(Some((item1, item2)))
318320
} else {
@@ -349,7 +351,7 @@ impl DatabentoDataLoader {
349351
instrument_id: Option<InstrumentId>,
350352
price_precision: Option<u8>,
351353
) -> anyhow::Result<Vec<OrderBookDelta>> {
352-
self.read_records::<dbn::MboMsg>(filepath, instrument_id, price_precision, false)?
354+
self.read_records::<dbn::MboMsg>(filepath, instrument_id, price_precision, false, None)?
353355
.filter_map(|result| match result {
354356
Ok((Some(item1), _)) => {
355357
if let Data::Delta(delta) = item1 {
@@ -373,7 +375,7 @@ impl DatabentoDataLoader {
373375
instrument_id: Option<InstrumentId>,
374376
price_precision: Option<u8>,
375377
) -> anyhow::Result<Vec<OrderBookDepth10>> {
376-
self.read_records::<dbn::Mbp10Msg>(filepath, instrument_id, price_precision, false)?
378+
self.read_records::<dbn::Mbp10Msg>(filepath, instrument_id, price_precision, false, None)?
377379
.filter_map(|result| match result {
378380
Ok((Some(item1), _)) => {
379381
if let Data::Depth10(depth) = item1 {
@@ -397,7 +399,7 @@ impl DatabentoDataLoader {
397399
instrument_id: Option<InstrumentId>,
398400
price_precision: Option<u8>,
399401
) -> anyhow::Result<Vec<QuoteTick>> {
400-
self.read_records::<dbn::Mbp1Msg>(filepath, instrument_id, price_precision, false)?
402+
self.read_records::<dbn::Mbp1Msg>(filepath, instrument_id, price_precision, false, None)?
401403
.filter_map(|result| match result {
402404
Ok((Some(item1), _)) => {
403405
if let Data::Quote(quote) = item1 {
@@ -421,7 +423,7 @@ impl DatabentoDataLoader {
421423
instrument_id: Option<InstrumentId>,
422424
price_precision: Option<u8>,
423425
) -> anyhow::Result<Vec<QuoteTick>> {
424-
self.read_records::<dbn::BboMsg>(filepath, instrument_id, price_precision, false)?
426+
self.read_records::<dbn::BboMsg>(filepath, instrument_id, price_precision, false, None)?
425427
.filter_map(|result| match result {
426428
Ok((Some(item1), _)) => {
427429
if let Data::Quote(quote) = item1 {
@@ -445,7 +447,7 @@ impl DatabentoDataLoader {
445447
instrument_id: Option<InstrumentId>,
446448
price_precision: Option<u8>,
447449
) -> anyhow::Result<Vec<TradeTick>> {
448-
self.read_records::<dbn::TbboMsg>(filepath, instrument_id, price_precision, false)?
450+
self.read_records::<dbn::TbboMsg>(filepath, instrument_id, price_precision, false, None)?
449451
.filter_map(|result| match result {
450452
Ok((_, maybe_item2)) => {
451453
if let Some(Data::Trade(trade)) = maybe_item2 {
@@ -468,7 +470,7 @@ impl DatabentoDataLoader {
468470
instrument_id: Option<InstrumentId>,
469471
price_precision: Option<u8>,
470472
) -> anyhow::Result<Vec<TradeTick>> {
471-
self.read_records::<dbn::TradeMsg>(filepath, instrument_id, price_precision, false)?
473+
self.read_records::<dbn::TradeMsg>(filepath, instrument_id, price_precision, false, None)?
472474
.filter_map(|result| match result {
473475
Ok((Some(item1), _)) => {
474476
if let Data::Trade(trade) = item1 {
@@ -491,20 +493,27 @@ impl DatabentoDataLoader {
491493
filepath: &Path,
492494
instrument_id: Option<InstrumentId>,
493495
price_precision: Option<u8>,
496+
timestamp_on_close: Option<bool>,
494497
) -> anyhow::Result<Vec<Bar>> {
495-
self.read_records::<dbn::OhlcvMsg>(filepath, instrument_id, price_precision, false)?
496-
.filter_map(|result| match result {
497-
Ok((Some(item1), _)) => {
498-
if let Data::Bar(bar) = item1 {
499-
Some(Ok(bar))
500-
} else {
501-
None
502-
}
498+
self.read_records::<dbn::OhlcvMsg>(
499+
filepath,
500+
instrument_id,
501+
price_precision,
502+
false,
503+
timestamp_on_close,
504+
)?
505+
.filter_map(|result| match result {
506+
Ok((Some(item1), _)) => {
507+
if let Data::Bar(bar) = item1 {
508+
Some(Ok(bar))
509+
} else {
510+
None
503511
}
504-
Ok((None, _)) => None,
505-
Err(e) => Some(Err(e)),
506-
})
507-
.collect()
512+
}
513+
Ok((None, _)) => None,
514+
Err(e) => Some(Err(e)),
515+
})
516+
.collect()
508517
}
509518

510519
/// # Errors
@@ -792,8 +801,97 @@ mod tests {
792801
#[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
793802
fn test_load_bars(loader: DatabentoDataLoader, #[case] path: PathBuf) {
794803
let instrument_id = InstrumentId::from("ESM4.GLBX");
795-
let bars = loader.load_bars(&path, Some(instrument_id), None).unwrap();
804+
let bars = loader
805+
.load_bars(&path, Some(instrument_id), None, None)
806+
.unwrap();
807+
808+
assert_eq!(bars.len(), 2);
809+
}
810+
811+
#[rstest]
812+
#[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
813+
fn test_load_bars_timestamp_on_close_true(loader: DatabentoDataLoader, #[case] path: PathBuf) {
814+
let instrument_id = InstrumentId::from("ESM4.GLBX");
815+
let bars = loader
816+
.load_bars(&path, Some(instrument_id), None, Some(true))
817+
.unwrap();
796818

797819
assert_eq!(bars.len(), 2);
820+
821+
// When bars_timestamp_on_close is true, both ts_event and ts_init should be equal (close time)
822+
for bar in &bars {
823+
assert_eq!(
824+
bar.ts_event, bar.ts_init,
825+
"ts_event and ts_init should be equal when bars_timestamp_on_close=true"
826+
);
827+
// For 1-second bars, ts_event should be 1 second after the open time
828+
// This confirms the bar is timestamped at close
829+
}
830+
}
831+
832+
#[rstest]
833+
#[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
834+
fn test_load_bars_timestamp_on_close_false(loader: DatabentoDataLoader, #[case] path: PathBuf) {
835+
let instrument_id = InstrumentId::from("ESM4.GLBX");
836+
let bars = loader
837+
.load_bars(&path, Some(instrument_id), None, Some(false))
838+
.unwrap();
839+
840+
assert_eq!(bars.len(), 2);
841+
842+
// When bars_timestamp_on_close is false, both ts_event and ts_init should be equal (open time)
843+
for bar in &bars {
844+
assert_eq!(
845+
bar.ts_event, bar.ts_init,
846+
"ts_event and ts_init should be equal when bars_timestamp_on_close=false"
847+
);
848+
}
849+
}
850+
851+
#[rstest]
852+
#[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"), 0)]
853+
#[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"), 1)]
854+
fn test_load_bars_timestamp_comparison(
855+
loader: DatabentoDataLoader,
856+
#[case] path: PathBuf,
857+
#[case] bar_index: usize,
858+
) {
859+
let instrument_id = InstrumentId::from("ESM4.GLBX");
860+
861+
let bars_close = loader
862+
.load_bars(&path, Some(instrument_id), None, Some(true))
863+
.unwrap();
864+
865+
let bars_open = loader
866+
.load_bars(&path, Some(instrument_id), None, Some(false))
867+
.unwrap();
868+
869+
assert_eq!(bars_close.len(), bars_open.len());
870+
assert_eq!(bars_close.len(), 2);
871+
872+
let bar_close = &bars_close[bar_index];
873+
let bar_open = &bars_open[bar_index];
874+
875+
// Bars should have the same OHLCV data
876+
assert_eq!(bar_close.open, bar_open.open);
877+
assert_eq!(bar_close.high, bar_open.high);
878+
assert_eq!(bar_close.low, bar_open.low);
879+
assert_eq!(bar_close.close, bar_open.close);
880+
assert_eq!(bar_close.volume, bar_open.volume);
881+
882+
// The close-timestamped bar should have later timestamp than open-timestamped bar
883+
// For 1-second bars, this should be exactly 1 second difference
884+
assert!(
885+
bar_close.ts_event > bar_open.ts_event,
886+
"Close-timestamped bar should have later timestamp than open-timestamped bar"
887+
);
888+
889+
// The difference should be exactly 1 second (1_000_000_000 nanoseconds) for 1s bars
890+
const ONE_SECOND_NS: u64 = 1_000_000_000;
891+
assert_eq!(
892+
bar_close.ts_event.as_u64() - bar_open.ts_event.as_u64(),
893+
ONE_SECOND_NS,
894+
"Timestamp difference should be exactly 1 second for 1s bars"
895+
);
798896
}
799897
}

crates/adapters/databento/src/python/historical.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ impl DatabentoHistoricalClient {
318318
price_precision,
319319
None,
320320
false, // Don't include trades
321+
true,
321322
)
322323
.map_err(to_pyvalue_err)?;
323324

@@ -421,6 +422,7 @@ impl DatabentoHistoricalClient {
421422
price_precision,
422423
None,
423424
false, // Not applicable (trade will be decoded regardless)
425+
true,
424426
)
425427
.map_err(to_pyvalue_err)?;
426428

@@ -437,7 +439,7 @@ impl DatabentoHistoricalClient {
437439
}
438440

439441
#[pyo3(name = "get_range_bars")]
440-
#[pyo3(signature = (dataset, instrument_ids, aggregation, start, end=None, limit=None, price_precision=None))]
442+
#[pyo3(signature = (dataset, instrument_ids, aggregation, start, end=None, limit=None, price_precision=None, timestamp_on_close=true))]
441443
#[allow(clippy::too_many_arguments)]
442444
fn py_get_range_bars<'py>(
443445
&self,
@@ -449,6 +451,7 @@ impl DatabentoHistoricalClient {
449451
end: Option<u64>,
450452
limit: Option<u64>,
451453
price_precision: Option<u8>,
454+
timestamp_on_close: bool,
452455
) -> PyResult<Bound<'py, PyAny>> {
453456
let client = self.inner.clone();
454457
let mut symbol_venue_map = self.symbol_venue_map.write().unwrap();
@@ -512,6 +515,7 @@ impl DatabentoHistoricalClient {
512515
price_precision,
513516
None,
514517
false, // Not applicable
518+
timestamp_on_close,
515519
)
516520
.map_err(to_pyvalue_err)?;
517521

0 commit comments

Comments
 (0)