Skip to content

Commit

Permalink
Add flags to crank less aggressively (#128)
Browse files Browse the repository at this point in the history

* Wait for some time or sufficient events to crank

* Rust fmt

* Fixes

* Fix sign
  • Loading branch information
nathanielparke authored May 28, 2021
1 parent 6f446cf commit 60f9a32
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 59 additions & 13 deletions dex/crank/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
#![allow(dead_code)]

use std::borrow::Cow;
use std::cmp::min;
use std::cmp::{max, min};
use std::collections::BTreeSet;
use std::convert::identity;
use std::mem::size_of;
use std::num::NonZeroU64;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::{thread, time};

use anyhow::{format_err, Result};
Expand Down Expand Up @@ -128,6 +128,10 @@ pub enum Command {
num_accounts: Option<usize>,
#[clap(long)]
log_directory: String,
#[clap(long)]
max_q_length: Option<u64>,
#[clap(long)]
max_wait_for_events_delay: Option<u64>,
},
MatchOrders {
#[clap(long, short)]
Expand Down Expand Up @@ -262,6 +266,8 @@ pub fn start(ctx: Option<Context>, opts: Opts) -> Result<()> {
events_per_worker,
ref num_accounts,
ref log_directory,
ref max_q_length,
ref max_wait_for_events_delay,
} => {
init_logger(log_directory);
consume_events_loop(
Expand All @@ -274,6 +280,8 @@ pub fn start(ctx: Option<Context>, opts: Opts) -> Result<()> {
num_workers,
events_per_worker,
num_accounts.unwrap_or(32),
max_q_length.unwrap_or(1),
max_wait_for_events_delay.unwrap_or(60),
)?;
}
Command::MonitorQueue {
Expand Down Expand Up @@ -498,20 +506,37 @@ fn consume_events_loop(
num_workers: usize,
events_per_worker: usize,
num_accounts: usize,
max_q_length: u64,
max_wait_for_events_delay: u64,
) -> Result<()> {
info!("Getting market keys ...");
let client = opts.client();
let market_keys = get_keys_for_market(&client, &program_id, &market)?;
info!("{:#?}", market_keys);
let pool = threadpool::ThreadPool::new(num_workers);
let max_slot_height_mutex = Arc::new(Mutex::new(0_u64));
let mut last_cranked_at = std::time::Instant::now()
.checked_sub(std::time::Duration::from_secs(max_wait_for_events_delay))
.unwrap_or(std::time::Instant::now());

loop {
thread::sleep(time::Duration::from_millis(300));
thread::sleep(time::Duration::from_millis(1000));

let loop_start = std::time::Instant::now();
let start_time = std::time::Instant::now();
let event_q_data = client
.get_account_with_commitment(&market_keys.event_q, CommitmentConfig::recent())?
let event_q_value_and_context =
client.get_account_with_commitment(&market_keys.event_q, CommitmentConfig::recent())?;
let event_q_slot = event_q_value_and_context.context.slot;
let max_slot_height = max_slot_height_mutex.lock().unwrap();
if event_q_slot <= *max_slot_height {
info!(
"Skipping crank. Already cranked for slot. Event queue slot: {}, Max seen slot: {}",
event_q_slot, max_slot_height
);
continue;
}
drop(max_slot_height);
let event_q_data = event_q_value_and_context
.value
.ok_or(format_err!("Failed to retrieve account"))?
.data;
Expand All @@ -533,6 +558,18 @@ fn consume_events_loop(

if event_q_len == 0 {
continue;
} else if std::time::Duration::from_secs(max_wait_for_events_delay)
.gt(&last_cranked_at.elapsed())
&& (event_q_len as u64) < max_q_length
{
info!(
"Skipping crank. Last cranked {} seconds ago and queue only has {} events. \
Event queue slot: {}",
last_cranked_at.elapsed().as_secs(),
event_q_len,
event_q_slot
);
continue;
} else {
info!(
"Total event queue length: {}, market {}, coin {}, pc {}",
Expand Down Expand Up @@ -591,6 +628,7 @@ fn consume_events_loop(
let client = opts.client();
let account_metas = account_metas.clone();
let event_q = *market_keys.event_q;
let max_slot_height_mutex_clone = Arc::clone(&max_slot_height_mutex);
pool.execute(move || {
consume_events_wrapper(
&client,
Expand All @@ -600,14 +638,16 @@ fn consume_events_loop(
thread_num,
events_per_worker,
event_q,
max_slot_height_mutex_clone,
event_q_slot,
)
});
}
pool.join();
let loop_end = std::time::Instant::now();
last_cranked_at = std::time::Instant::now();
info!(
"Total loop time took {}",
loop_end.duration_since(loop_start).as_millis()
last_cranked_at.duration_since(loop_start).as_millis()
);
}
}
Expand All @@ -621,6 +661,8 @@ fn consume_events_wrapper(
thread_num: usize,
to_consume: usize,
event_q: Pubkey,
max_slot_height_mutex: Arc<Mutex<u64>>,
slot: u64,
) {
let start = std::time::Instant::now();
let result = consume_events_once(
Expand All @@ -633,12 +675,16 @@ fn consume_events_wrapper(
event_q,
);
match result {
Ok(signature) => info!(
"[thread {}] Successfully consumed events after {:?}: {}.",
thread_num,
start.elapsed(),
signature
),
Ok(signature) => {
info!(
"[thread {}] Successfully consumed events after {:?}: {}.",
thread_num,
start.elapsed(),
signature
);
let mut max_slot_height = max_slot_height_mutex.lock().unwrap();
*max_slot_height = max(slot, *max_slot_height);
}
Err(err) => {
error!("[thread {}] Received error: {:?}", thread_num, err);
}
Expand Down

0 comments on commit 60f9a32

Please sign in to comment.