@@ -61,9 +61,10 @@ pub async fn get_pg_cache_database() -> anyhow::Result<PostgresCacheDatabase> {
61
61
mod tests {
62
62
use std:: time:: Duration ;
63
63
64
+ use nautilus_common:: testing:: wait_until_async;
64
65
use nautilus_core:: equality:: entirely_equal;
65
66
use nautilus_model:: {
66
- enums:: { CurrencyType , OrderSide } ,
67
+ enums:: { CurrencyType , OrderSide , OrderStatus } ,
67
68
identifiers:: {
68
69
stubs:: account_id, AccountId , ClientOrderId , InstrumentId , TradeId , VenueOrderId ,
69
70
} ,
@@ -91,7 +92,14 @@ mod tests {
91
92
. add ( String :: from ( "test_id" ) , test_id_value. clone ( ) )
92
93
. await
93
94
. unwrap ( ) ;
94
- tokio:: time:: sleep ( Duration :: from_secs ( 2 ) ) . await ;
95
+ wait_until_async (
96
+ || async {
97
+ let result = pg_cache. load ( ) . await . unwrap ( ) ;
98
+ result. keys ( ) . len ( ) > 0
99
+ } ,
100
+ Duration :: from_secs ( 2 ) ,
101
+ )
102
+ . await ;
95
103
let result = pg_cache. load ( ) . await . unwrap ( ) ;
96
104
assert_eq ! ( result. keys( ) . len( ) , 1 ) ;
97
105
assert_eq ! (
@@ -149,7 +157,15 @@ mod tests {
149
157
. add_instrument ( InstrumentAny :: OptionsContract ( options_contract) )
150
158
. await
151
159
. unwrap ( ) ;
152
- tokio:: time:: sleep ( Duration :: from_secs ( 2 ) ) . await ;
160
+ // Wait for the cache to update
161
+ wait_until_async (
162
+ || async {
163
+ let currencies = pg_cache. load_currencies ( ) . await . unwrap ( ) ;
164
+ currencies. len ( ) >= 4
165
+ } ,
166
+ Duration :: from_secs ( 2 ) ,
167
+ )
168
+ . await ;
153
169
// Check that currency list is correct
154
170
let currencies = pg_cache. load_currencies ( ) . await . unwrap ( ) ;
155
171
assert_eq ! ( currencies. len( ) , 4 ) ;
@@ -261,7 +277,22 @@ mod tests {
261
277
) ;
262
278
pg_cache. add_order ( market_order. clone ( ) ) . await . unwrap ( ) ;
263
279
pg_cache. add_order ( limit_order. clone ( ) ) . await . unwrap ( ) ;
264
- tokio:: time:: sleep ( Duration :: from_secs ( 2 ) ) . await ;
280
+ wait_until_async (
281
+ || async {
282
+ pg_cache
283
+ . load_order ( & market_order. client_order_id ( ) )
284
+ . await
285
+ . unwrap ( )
286
+ . is_some ( )
287
+ && pg_cache
288
+ . load_order ( & limit_order. client_order_id ( ) )
289
+ . await
290
+ . unwrap ( )
291
+ . is_some ( )
292
+ } ,
293
+ Duration :: from_secs ( 2 ) ,
294
+ )
295
+ . await ;
265
296
let market_order_result = pg_cache
266
297
. load_order ( & market_order. client_order_id ( ) )
267
298
. await
@@ -320,7 +351,17 @@ mod tests {
320
351
) ;
321
352
market_order. apply ( filled) . unwrap ( ) ;
322
353
pg_cache. update_order ( market_order. clone ( ) ) . await . unwrap ( ) ;
323
- tokio:: time:: sleep ( Duration :: from_secs ( 2 ) ) . await ;
354
+ wait_until_async (
355
+ || async {
356
+ let result = pg_cache
357
+ . load_order ( & market_order. client_order_id ( ) )
358
+ . await
359
+ . unwrap ( ) ;
360
+ result. is_some ( ) && result. unwrap ( ) . status ( ) == OrderStatus :: Filled
361
+ } ,
362
+ Duration :: from_secs ( 2 ) ,
363
+ )
364
+ . await ;
324
365
// Assert
325
366
let market_order_result = pg_cache
326
367
. load_order ( & market_order. client_order_id ( ) )
0 commit comments