diff --git a/Cargo.lock b/Cargo.lock index af63e984..77987299 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2950,7 +2950,7 @@ dependencies = [ [[package]] name = "serum_dex" -version = "0.2.0" +version = "0.3.0" dependencies = [ "arrayref", "bincode", diff --git a/dex/crank/src/lib.rs b/dex/crank/src/lib.rs index 46b2f1b4..8c8f10c6 100644 --- a/dex/crank/src/lib.rs +++ b/dex/crank/src/lib.rs @@ -2,12 +2,12 @@ #![allow(dead_code)] use std::borrow::Cow; -use std::cmp::min; +use std::cmp::{max, min}; use std::collections::BTreeSet; use std::convert::identity; use std::mem::size_of; use std::num::NonZeroU64; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::{thread, time}; use anyhow::{format_err, Result}; @@ -128,6 +128,10 @@ pub enum Command { num_accounts: Option, #[clap(long)] log_directory: String, + #[clap(long)] + max_q_length: Option, + #[clap(long)] + max_wait_for_events_delay: Option, }, MatchOrders { #[clap(long, short)] @@ -262,6 +266,8 @@ pub fn start(ctx: Option, opts: Opts) -> Result<()> { events_per_worker, ref num_accounts, ref log_directory, + ref max_q_length, + ref max_wait_for_events_delay, } => { init_logger(log_directory); consume_events_loop( @@ -274,6 +280,8 @@ pub fn start(ctx: Option, opts: Opts) -> Result<()> { num_workers, events_per_worker, num_accounts.unwrap_or(32), + max_q_length.unwrap_or(1), + max_wait_for_events_delay.unwrap_or(60), )?; } Command::MonitorQueue { @@ -498,20 +506,37 @@ fn consume_events_loop( num_workers: usize, events_per_worker: usize, num_accounts: usize, + max_q_length: u64, + max_wait_for_events_delay: u64, ) -> Result<()> { info!("Getting market keys ..."); let client = opts.client(); let market_keys = get_keys_for_market(&client, &program_id, &market)?; info!("{:#?}", market_keys); let pool = threadpool::ThreadPool::new(num_workers); + let max_slot_height_mutex = Arc::new(Mutex::new(0_u64)); + let mut last_cranked_at = std::time::Instant::now() + .checked_sub(std::time::Duration::from_secs(max_wait_for_events_delay)) + .unwrap_or(std::time::Instant::now()); loop { - thread::sleep(time::Duration::from_millis(300)); + thread::sleep(time::Duration::from_millis(1000)); let loop_start = std::time::Instant::now(); let start_time = std::time::Instant::now(); - let event_q_data = client - .get_account_with_commitment(&market_keys.event_q, CommitmentConfig::recent())? + let event_q_value_and_context = + client.get_account_with_commitment(&market_keys.event_q, CommitmentConfig::recent())?; + let event_q_slot = event_q_value_and_context.context.slot; + let max_slot_height = max_slot_height_mutex.lock().unwrap(); + if event_q_slot <= *max_slot_height { + info!( + "Skipping crank. Already cranked for slot. Event queue slot: {}, Max seen slot: {}", + event_q_slot, max_slot_height + ); + continue; + } + drop(max_slot_height); + let event_q_data = event_q_value_and_context .value .ok_or(format_err!("Failed to retrieve account"))? .data; @@ -533,6 +558,18 @@ fn consume_events_loop( if event_q_len == 0 { continue; + } else if std::time::Duration::from_secs(max_wait_for_events_delay) + .gt(&last_cranked_at.elapsed()) + && (event_q_len as u64) < max_q_length + { + info!( + "Skipping crank. Last cranked {} seconds ago and queue only has {} events. \ + Event queue slot: {}", + last_cranked_at.elapsed().as_secs(), + event_q_len, + event_q_slot + ); + continue; } else { info!( "Total event queue length: {}, market {}, coin {}, pc {}", @@ -591,6 +628,7 @@ fn consume_events_loop( let client = opts.client(); let account_metas = account_metas.clone(); let event_q = *market_keys.event_q; + let max_slot_height_mutex_clone = Arc::clone(&max_slot_height_mutex); pool.execute(move || { consume_events_wrapper( &client, @@ -600,14 +638,16 @@ fn consume_events_loop( thread_num, events_per_worker, event_q, + max_slot_height_mutex_clone, + event_q_slot, ) }); } pool.join(); - let loop_end = std::time::Instant::now(); + last_cranked_at = std::time::Instant::now(); info!( "Total loop time took {}", - loop_end.duration_since(loop_start).as_millis() + last_cranked_at.duration_since(loop_start).as_millis() ); } } @@ -621,6 +661,8 @@ fn consume_events_wrapper( thread_num: usize, to_consume: usize, event_q: Pubkey, + max_slot_height_mutex: Arc>, + slot: u64, ) { let start = std::time::Instant::now(); let result = consume_events_once( @@ -633,12 +675,16 @@ fn consume_events_wrapper( event_q, ); match result { - Ok(signature) => info!( - "[thread {}] Successfully consumed events after {:?}: {}.", - thread_num, - start.elapsed(), - signature - ), + Ok(signature) => { + info!( + "[thread {}] Successfully consumed events after {:?}: {}.", + thread_num, + start.elapsed(), + signature + ); + let mut max_slot_height = max_slot_height_mutex.lock().unwrap(); + *max_slot_height = max(slot, *max_slot_height); + } Err(err) => { error!("[thread {}] Received error: {:?}", thread_num, err); }