Skip to content

Commit 4191f67

Browse files
committed
fix precommit
1 parent 6d50216 commit 4191f67

File tree

16 files changed

+166
-193
lines changed

16 files changed

+166
-193
lines changed

nautilus_core/cli/src/database/postgres.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313
// limitations under the License.
1414
// -------------------------------------------------------------------------------------------------
1515

16-
use nautilus_infrastructure::sql::pg::{connect_pg, drop_postgres, get_postgres_connect_options, init_postgres};
17-
use crate::opt::{DatabaseCommand, DatabaseOpt};
16+
use nautilus_infrastructure::sql::pg::{
17+
connect_pg, drop_postgres, get_postgres_connect_options, init_postgres,
18+
};
1819

20+
use crate::opt::{DatabaseCommand, DatabaseOpt};
1921

2022
pub async fn run_database_command(opt: DatabaseOpt) -> anyhow::Result<()> {
2123
let command = opt.command.clone();

nautilus_core/infrastructure/src/python/sql/cache_database.rs

Lines changed: 26 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,16 @@
1414
// -------------------------------------------------------------------------------------------------
1515

1616
use std::collections::HashMap;
17-
use pyo3::prelude::*;
18-
use nautilus_core::python::{to_pyruntime_err};
17+
1918
use nautilus_common::runtime::get_runtime;
19+
use nautilus_core::python::to_pyruntime_err;
2020
use nautilus_model::types::currency::Currency;
21-
use crate::sql::cache_database::PostgresCacheDatabase;
22-
use crate::sql::pg::delete_nautilus_postgres_tables;
23-
use crate::sql::queries::DatabaseQueries;
21+
use pyo3::prelude::*;
2422

23+
use crate::sql::{
24+
cache_database::PostgresCacheDatabase, pg::delete_nautilus_postgres_tables,
25+
queries::DatabaseQueries,
26+
};
2527

2628
#[pymethods]
2729
impl PostgresCacheDatabase {
@@ -32,7 +34,7 @@ impl PostgresCacheDatabase {
3234
port: Option<u16>,
3335
username: Option<String>,
3436
password: Option<String>,
35-
database: Option<String>
37+
database: Option<String>,
3638
) -> PyResult<Self> {
3739
let result = get_runtime().block_on(async {
3840
PostgresCacheDatabase::connect(host, port, username, password, database).await
@@ -41,79 +43,48 @@ impl PostgresCacheDatabase {
4143
}
4244

4345
#[pyo3(name = "load")]
44-
fn py_load<'py>(
45-
slf: PyRef<'_, Self>,
46-
) -> PyResult<HashMap<String, Vec<u8>>> {
47-
let result = get_runtime().block_on(async {
48-
slf.load().await
49-
});
46+
fn py_load(slf: PyRef<'_, Self>) -> PyResult<HashMap<String, Vec<u8>>> {
47+
let result = get_runtime().block_on(async { slf.load().await });
5048
result.map_err(to_pyruntime_err)
5149
}
5250

53-
5451
#[pyo3(name = "load_currency")]
55-
fn py_load_currency(
56-
slf: PyRef<'_, Self>,
57-
code: &str,
58-
) -> PyResult<Option<Currency>> {
59-
let result = get_runtime().block_on(async {
60-
DatabaseQueries::load_currency(&slf.pool, code).await
61-
});
52+
fn py_load_currency(slf: PyRef<'_, Self>, code: &str) -> PyResult<Option<Currency>> {
53+
let result =
54+
get_runtime().block_on(async { DatabaseQueries::load_currency(&slf.pool, code).await });
6255
result.map_err(to_pyruntime_err)
6356
}
6457

6558
#[pyo3(name = "load_currencies")]
66-
fn py_load_currencies<'py>(
67-
slf: PyRef<'_, Self>,
68-
) -> PyResult<Vec<Currency>> {
69-
let result = get_runtime().block_on(async {
70-
DatabaseQueries::load_currencies(&slf.pool).await
71-
});
59+
fn py_load_currencies(slf: PyRef<'_, Self>) -> PyResult<Vec<Currency>> {
60+
let result =
61+
get_runtime().block_on(async { DatabaseQueries::load_currencies(&slf.pool).await });
7262
result.map_err(to_pyruntime_err)
7363
}
7464

7565
#[pyo3(name = "add")]
76-
fn py_add(
77-
slf: PyRef<'_, Self>,
78-
key: String,
79-
value: Vec<u8>
80-
) -> PyResult<()> {
81-
let result = get_runtime().block_on(async {
82-
slf.add(key,value).await
83-
});
66+
fn py_add(slf: PyRef<'_, Self>, key: String, value: Vec<u8>) -> PyResult<()> {
67+
let result = get_runtime().block_on(async { slf.add(key, value).await });
8468
result.map_err(to_pyruntime_err)
8569
}
8670

8771
#[pyo3(name = "add_currency")]
88-
fn py_add_currency(
89-
slf: PyRef<'_, Self>,
90-
currency: Currency,
91-
) -> PyResult<()> {
92-
let result = get_runtime().block_on(async {
93-
slf.add_currency(currency).await
94-
});
72+
fn py_add_currency(slf: PyRef<'_, Self>, currency: Currency) -> PyResult<()> {
73+
let result = get_runtime().block_on(async { slf.add_currency(currency).await });
9574
result.map_err(to_pyruntime_err)
9675
}
9776

9877
#[pyo3(name = "flush_db")]
99-
fn py_drop_schema(
100-
slf: PyRef<'_, Self>,
101-
) -> PyResult<()> {
102-
let result = get_runtime().block_on(async {
103-
delete_nautilus_postgres_tables(&slf.pool)
104-
.await
105-
});
78+
fn py_drop_schema(slf: PyRef<'_, Self>) -> PyResult<()> {
79+
let result =
80+
get_runtime().block_on(async { delete_nautilus_postgres_tables(&slf.pool).await });
10681
result.map_err(to_pyruntime_err)
10782
}
10883

10984
#[pyo3(name = "truncate")]
110-
fn py_truncate(
111-
slf: PyRef<'_, Self>,
112-
table: String
113-
) -> PyResult<()> {
114-
let result = get_runtime().block_on(async {
115-
DatabaseQueries::truncate(&slf.pool, table).await
116-
});
85+
fn py_truncate(slf: PyRef<'_, Self>, table: String) -> PyResult<()> {
86+
let result =
87+
get_runtime().block_on(async { DatabaseQueries::truncate(&slf.pool, table).await });
11788
result.map_err(to_pyruntime_err)
11889
}
11990
}

nautilus_core/infrastructure/src/python/sql/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,4 @@
1313
// limitations under the License.
1414
// -------------------------------------------------------------------------------------------------
1515

16-
pub mod cache_database;
16+
pub mod cache_database;

nautilus_core/infrastructure/src/sql/cache_database.rs

Lines changed: 40 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,23 @@
1313
// limitations under the License.
1414
// -------------------------------------------------------------------------------------------------
1515

16-
use std::collections::{HashMap, VecDeque};
17-
use tokio::sync::mpsc::{Receiver,Sender,channel};
18-
use std::time::{Duration, Instant};
19-
use sqlx::{PgPool};
20-
use sqlx::postgres::PgConnectOptions;
21-
use tokio::sync::mpsc::error::TryRecvError;
22-
use tokio::time::sleep;
23-
use nautilus_model::types::currency::Currency;
24-
use crate::sql::models::general::GeneralRow;
25-
use crate::sql::pg::{connect_pg, get_postgres_connect_options};
26-
use crate::sql::queries::DatabaseQueries;
16+
use std::{
17+
collections::{HashMap, VecDeque},
18+
time::{Duration, Instant},
19+
};
2720

21+
use nautilus_model::types::currency::Currency;
22+
use sqlx::{postgres::PgConnectOptions, PgPool};
23+
use tokio::{
24+
sync::mpsc::{channel, error::TryRecvError, Receiver, Sender},
25+
time::sleep,
26+
};
27+
28+
use crate::sql::{
29+
models::general::GeneralRow,
30+
pg::{connect_pg, get_postgres_connect_options},
31+
queries::DatabaseQueries,
32+
};
2833

2934
#[derive(Debug)]
3035
#[cfg_attr(
@@ -36,28 +41,25 @@ pub struct PostgresCacheDatabase {
3641
tx: Sender<DatabaseQuery>,
3742
}
3843

39-
4044
#[derive(Debug, Clone)]
41-
pub enum DatabaseQuery{
42-
Add(String,Vec<u8>),
45+
pub enum DatabaseQuery {
46+
Add(String, Vec<u8>),
4347
AddCurrency(Currency),
4448
}
4549

46-
47-
4850
fn get_buffer_interval() -> Duration {
4951
Duration::from_millis(0)
5052
}
5153

5254
async fn drain_buffer(pool: &PgPool, buffer: &mut VecDeque<DatabaseQuery>) {
5355
for cmd in buffer.drain(..) {
5456
match cmd {
55-
DatabaseQuery::Add(key,value) => {
56-
DatabaseQueries ::add(pool,key,value).await.unwrap();
57-
},
57+
DatabaseQuery::Add(key, value) => {
58+
DatabaseQueries::add(pool, key, value).await.unwrap();
59+
}
5860
DatabaseQuery::AddCurrency(currency) => {
59-
DatabaseQueries::add_currency(pool,currency).await.unwrap();
60-
},
61+
DatabaseQueries::add_currency(pool, currency).await.unwrap();
62+
}
6163
}
6264
}
6365
}
@@ -68,9 +70,10 @@ impl PostgresCacheDatabase {
6870
port: Option<u16>,
6971
username: Option<String>,
7072
password: Option<String>,
71-
database: Option<String>
72-
) -> Result<Self,sqlx::Error> {
73-
let pg_connect_options = get_postgres_connect_options(host,port,username,password,database).unwrap();
73+
database: Option<String>,
74+
) -> Result<Self, sqlx::Error> {
75+
let pg_connect_options =
76+
get_postgres_connect_options(host, port, username, password, database).unwrap();
7477
let pool = connect_pg(pg_connect_options.clone().into()).await.unwrap();
7578
let (tx, rx) = channel::<DatabaseQuery>(1000);
7679
// spawn a thread to handle messages
@@ -80,10 +83,7 @@ impl PostgresCacheDatabase {
8083
Ok(PostgresCacheDatabase { pool, tx })
8184
}
8285

83-
async fn handle_message(
84-
mut rx: Receiver<DatabaseQuery>,
85-
pg_connect_options: PgConnectOptions
86-
){
86+
async fn handle_message(mut rx: Receiver<DatabaseQuery>, pg_connect_options: PgConnectOptions) {
8787
let pool = connect_pg(pg_connect_options).await.unwrap();
8888
// Buffering
8989
let mut buffer: VecDeque<DatabaseQuery> = VecDeque::new();
@@ -106,13 +106,13 @@ impl PostgresCacheDatabase {
106106
}
107107
}
108108
// rain any remaining message
109-
if !buffer.is_empty(){
110-
drain_buffer(&pool,&mut buffer).await;
109+
if !buffer.is_empty() {
110+
drain_buffer(&pool, &mut buffer).await;
111111
}
112112
}
113113

114-
pub async fn load(&self) -> Result<HashMap<String, Vec<u8>>,sqlx::Error> {
115-
let query = sqlx::query_as::<_,GeneralRow>("SELECT * FROM general");
114+
pub async fn load(&self) -> Result<HashMap<String, Vec<u8>>, sqlx::Error> {
115+
let query = sqlx::query_as::<_, GeneralRow>("SELECT * FROM general");
116116
let result = query.fetch_all(&self.pool).await;
117117
match result {
118118
Ok(rows) => {
@@ -127,20 +127,18 @@ impl PostgresCacheDatabase {
127127
}
128128
}
129129
}
130-
131-
132130

133131
pub async fn add(&self, key: String, value: Vec<u8>) -> anyhow::Result<()> {
134-
let query = DatabaseQuery::Add(key,value);
135-
self.tx.send(query)
136-
.await
137-
.map_err(|err| anyhow::anyhow!("Failed to send query to database message handler: {err}"))
132+
let query = DatabaseQuery::Add(key, value);
133+
self.tx.send(query).await.map_err(|err| {
134+
anyhow::anyhow!("Failed to send query to database message handler: {err}")
135+
})
138136
}
139137

140138
pub async fn add_currency(&self, currency: Currency) -> anyhow::Result<()> {
141139
let query = DatabaseQuery::AddCurrency(currency);
142-
self.tx.send(query)
143-
.await
144-
.map_err(|err| anyhow::anyhow!("Failed to query add_currency to database message handler: {err}"))
140+
self.tx.send(query).await.map_err(|err| {
141+
anyhow::anyhow!("Failed to query add_currency to database message handler: {err}")
142+
})
145143
}
146-
}
144+
}

nautilus_core/infrastructure/src/sql/mod.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,9 @@
1414
// -------------------------------------------------------------------------------------------------
1515

1616
// Be careful about ordering and foreign key constraints when deleting data.
17-
pub const NAUTILUS_TABLES: [&str; 2] = [
18-
"general",
19-
"currency",
20-
];
17+
pub const NAUTILUS_TABLES: [&str; 2] = ["general", "currency"];
2118

19+
pub mod cache_database;
2220
pub mod models;
2321
pub mod pg;
24-
pub mod cache_database;
25-
pub mod queries;
22+
pub mod queries;

nautilus_core/infrastructure/src/sql/models/general.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,8 @@
1313
// limitations under the License.
1414
// -------------------------------------------------------------------------------------------------
1515

16-
1716
#[derive(Debug, sqlx::FromRow)]
1817
pub struct GeneralRow {
1918
pub key: String,
2019
pub value: Vec<u8>,
21-
}
20+
}

nautilus_core/infrastructure/src/sql/models/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,6 @@
1313
// limitations under the License.
1414
// -------------------------------------------------------------------------------------------------
1515

16-
pub mod instruments;
1716
pub mod general;
17+
pub mod instruments;
1818
pub mod types;

nautilus_core/infrastructure/src/sql/models/types.rs

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,30 @@
1414
// -------------------------------------------------------------------------------------------------
1515

1616
use std::str::FromStr;
17-
use sqlx::{FromRow, Row};
18-
use sqlx::postgres::PgRow;
19-
use nautilus_model::enums::CurrencyType;
20-
use nautilus_model::types::currency::Currency;
17+
18+
use nautilus_model::{enums::CurrencyType, types::currency::Currency};
19+
use sqlx::{postgres::PgRow, FromRow, Row};
2120

2221
pub struct CurrencyModel(pub Currency);
2322

24-
impl <'r> FromRow<'r, PgRow> for CurrencyModel {
23+
impl<'r> FromRow<'r, PgRow> for CurrencyModel {
2524
fn from_row(row: &'r PgRow) -> Result<Self, sqlx::Error> {
26-
let code = row.try_get::<String,_>("code")?;
27-
let precision = row.try_get::<i32,_>("precision")?;
28-
let iso4217 = row.try_get::<i32,_>("iso4217")?;
29-
let name = row.try_get::<String,_>("name")?;
30-
let currency_type = row.try_get::<String,_>("currency_type")
25+
let code = row.try_get::<String, _>("code")?;
26+
let precision = row.try_get::<i32, _>("precision")?;
27+
let iso4217 = row.try_get::<i32, _>("iso4217")?;
28+
let name = row.try_get::<String, _>("name")?;
29+
let currency_type = row
30+
.try_get::<String, _>("currency_type")
3131
.map(|res| CurrencyType::from_str(res.as_str()).unwrap())?;
32-
32+
3333
let currency = Currency::new(
34-
code.as_str(),
35-
precision as u8,
36-
iso4217 as u16,
37-
name.as_str(),
38-
currency_type,
39-
).unwrap();
34+
code.as_str(),
35+
precision as u8,
36+
iso4217 as u16,
37+
name.as_str(),
38+
currency_type,
39+
)
40+
.unwrap();
4041
Ok(CurrencyModel(currency))
4142
}
4243
}

0 commit comments

Comments
 (0)