Skip to content

Commit af821d3

Browse files
committed
async-sync Rust cache and Postgres adapter refactoring
1 parent f376e51 commit af821d3

File tree

7 files changed

+396
-158
lines changed

7 files changed

+396
-158
lines changed

nautilus_core/common/src/cache/core.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2404,10 +2404,10 @@ impl Cache {
24042404

24052405
/// Returns references to all instrument IDs for the given `venue`.
24062406
#[must_use]
2407-
pub fn instrument_ids(&self, venue: &Venue) -> Vec<&InstrumentId> {
2407+
pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
24082408
self.instruments
24092409
.keys()
2410-
.filter(|i| &i.venue == venue)
2410+
.filter(|i| venue.is_none() || &i.venue == venue.unwrap())
24112411
.collect()
24122412
}
24132413

nautilus_core/common/src/cache/database.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,12 @@ pub trait CacheDatabaseAdapter {
5959

6060
fn load_index_order_client(&mut self) -> anyhow::Result<HashMap<ClientOrderId, ClientId>>;
6161

62-
fn load_currency(&mut self, code: &Ustr) -> anyhow::Result<Currency>;
62+
fn load_currency(&mut self, code: &Ustr) -> anyhow::Result<Option<Currency>>;
6363

64-
fn load_instrument(&mut self, instrument_id: &InstrumentId) -> anyhow::Result<InstrumentAny>;
64+
fn load_instrument(
65+
&mut self,
66+
instrument_id: &InstrumentId,
67+
) -> anyhow::Result<Option<InstrumentAny>>;
6568

6669
fn load_synthetic(
6770
&mut self,
@@ -70,7 +73,7 @@ pub trait CacheDatabaseAdapter {
7073

7174
fn load_account(&mut self, account_id: &AccountId) -> anyhow::Result<Box<dyn Account>>;
7275

73-
fn load_order(&mut self, client_order_id: &ClientOrderId) -> anyhow::Result<OrderAny>;
76+
fn load_order(&mut self, client_order_id: &ClientOrderId) -> anyhow::Result<Option<OrderAny>>;
7477

7578
fn load_position(&mut self, position_id: &PositionId) -> anyhow::Result<Position>;
7679

nautilus_core/infrastructure/src/python/sql/cache_database.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
use std::collections::HashMap;
1717

18-
use nautilus_common::runtime::get_runtime;
18+
use nautilus_common::{cache::database::CacheDatabaseAdapter, runtime::get_runtime};
1919
use nautilus_core::python::to_pyruntime_err;
2020
use nautilus_model::{
2121
identifiers::{ClientOrderId, InstrumentId},
@@ -70,15 +70,13 @@ impl PostgresCacheDatabase {
7070
}
7171

7272
#[pyo3(name = "add")]
73-
fn py_add(slf: PyRef<'_, Self>, key: String, value: Vec<u8>) -> PyResult<()> {
74-
let result = get_runtime().block_on(async { slf.add(key, value).await });
75-
result.map_err(to_pyruntime_err)
73+
fn py_add(mut slf: PyRefMut<'_, Self>, key: String, value: Vec<u8>) -> PyResult<()> {
74+
slf.add(key, value).map_err(to_pyruntime_err)
7675
}
7776

7877
#[pyo3(name = "add_currency")]
79-
fn py_add_currency(slf: PyRef<'_, Self>, currency: Currency) -> PyResult<()> {
80-
let result = get_runtime().block_on(async { slf.add_currency(currency).await });
81-
result.map_err(to_pyruntime_err)
78+
fn py_add_currency(mut slf: PyRefMut<'_, Self>, currency: Currency) -> PyResult<()> {
79+
slf.add_currency(&currency).map_err(to_pyruntime_err)
8280
}
8381

8482
#[pyo3(name = "load_instrument")]
@@ -88,7 +86,7 @@ impl PostgresCacheDatabase {
8886
py: Python<'_>,
8987
) -> PyResult<Option<PyObject>> {
9088
get_runtime().block_on(async {
91-
let result = DatabaseQueries::load_instrument(&slf.pool, instrument_id)
89+
let result = DatabaseQueries::load_instrument(&slf.pool, &instrument_id)
9290
.await
9391
.unwrap();
9492
match result {
@@ -116,27 +114,29 @@ impl PostgresCacheDatabase {
116114

117115
#[pyo3(name = "add_instrument")]
118116
fn py_add_instrument(
119-
slf: PyRef<'_, Self>,
117+
mut slf: PyRefMut<'_, Self>,
120118
instrument: PyObject,
121119
py: Python<'_>,
122120
) -> PyResult<()> {
123121
let instrument_any = pyobject_to_instrument_any(py, instrument)?;
124-
let result = get_runtime().block_on(async { slf.add_instrument(instrument_any).await });
125-
result.map_err(to_pyruntime_err)
122+
slf.add_instrument(&instrument_any)
123+
.map_err(to_pyruntime_err)
126124
}
127125

128126
#[pyo3(name = "add_order")]
129-
fn py_add_order(slf: PyRef<'_, Self>, order: PyObject, py: Python<'_>) -> PyResult<()> {
127+
fn py_add_order(mut slf: PyRefMut<'_, Self>, order: PyObject, py: Python<'_>) -> PyResult<()> {
130128
let order_any = convert_pyobject_to_order_any(py, order)?;
131-
let result = get_runtime().block_on(async { slf.add_order(order_any).await });
132-
result.map_err(to_pyruntime_err)
129+
slf.add_order(&order_any).map_err(to_pyruntime_err)
133130
}
134131

135132
#[pyo3(name = "update_order")]
136-
fn py_update_order(slf: PyRef<'_, Self>, order: PyObject, py: Python<'_>) -> PyResult<()> {
133+
fn py_update_order(
134+
mut slf: PyRefMut<'_, Self>,
135+
order: PyObject,
136+
py: Python<'_>,
137+
) -> PyResult<()> {
137138
let order_any = convert_pyobject_to_order_any(py, order)?;
138-
let result = get_runtime().block_on(async { slf.update_order(order_any).await });
139-
result.map_err(to_pyruntime_err)
139+
slf.update_order(&order_any).map_err(to_pyruntime_err)
140140
}
141141

142142
#[pyo3(name = "load_order")]

nautilus_core/infrastructure/src/redis/cache.rs

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -703,10 +703,16 @@ impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
703703
for key in self.database.keys(&format!("{CURRENCIES}*"))? {
704704
let parts: Vec<&str> = key.as_str().rsplitn(2, ':').collect();
705705
let currency_code = Ustr::from(parts.first().unwrap());
706-
let currency = self.load_currency(&currency_code)?;
707-
currencies.insert(currency_code, currency);
706+
let result = self.load_currency(&currency_code)?;
707+
match result {
708+
Some(currency) => {
709+
currencies.insert(currency_code, currency);
710+
}
711+
None => {
712+
error!("Currency not found: {currency_code}");
713+
}
714+
}
708715
}
709-
710716
Ok(currencies)
711717
}
712718

@@ -716,8 +722,15 @@ impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
716722
for key in self.database.keys(&format!("{INSTRUMENTS}*"))? {
717723
let parts: Vec<&str> = key.as_str().rsplitn(2, ':').collect();
718724
let instrument_id = InstrumentId::from_str(parts.first().unwrap())?;
719-
let instrument = self.load_instrument(&instrument_id)?;
720-
instruments.insert(instrument_id, instrument);
725+
let result = self.load_instrument(&instrument_id)?;
726+
match result {
727+
Some(instrument) => {
728+
instruments.insert(instrument_id, instrument);
729+
}
730+
None => {
731+
error!("Instrument not found: {instrument_id}");
732+
}
733+
}
721734
}
722735

723736
Ok(instruments)
@@ -755,10 +768,16 @@ impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
755768
for key in self.database.keys(&format!("{ORDERS}*"))? {
756769
let parts: Vec<&str> = key.as_str().rsplitn(2, ':').collect();
757770
let client_order_id = ClientOrderId::from(*parts.first().unwrap());
758-
let order = self.load_order(&client_order_id)?;
759-
orders.insert(client_order_id, order);
771+
let result = self.load_order(&client_order_id)?;
772+
match result {
773+
Some(order) => {
774+
orders.insert(client_order_id, order);
775+
}
776+
None => {
777+
error!("Order not found: {client_order_id}");
778+
}
779+
}
760780
}
761-
762781
Ok(orders)
763782
}
764783

@@ -783,11 +802,14 @@ impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
783802
todo!()
784803
}
785804

786-
fn load_currency(&mut self, code: &Ustr) -> anyhow::Result<Currency> {
805+
fn load_currency(&mut self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
787806
todo!()
788807
}
789808

790-
fn load_instrument(&mut self, instrument_id: &InstrumentId) -> anyhow::Result<InstrumentAny> {
809+
fn load_instrument(
810+
&mut self,
811+
instrument_id: &InstrumentId,
812+
) -> anyhow::Result<Option<InstrumentAny>> {
791813
todo!()
792814
}
793815

@@ -802,7 +824,7 @@ impl CacheDatabaseAdapter for RedisCacheDatabaseAdapter {
802824
todo!()
803825
}
804826

805-
fn load_order(&mut self, client_order_id: &ClientOrderId) -> anyhow::Result<OrderAny> {
827+
fn load_order(&mut self, client_order_id: &ClientOrderId) -> anyhow::Result<Option<OrderAny>> {
806828
todo!()
807829
}
808830

0 commit comments

Comments
 (0)