Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prevent messages from taking too long to be sent #902

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 39 additions & 31 deletions chain-signatures/node/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use std::time::{Duration, Instant};
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;

use near_account_id::AccountId;

#[derive(Debug, Clone, clap::Parser)]
#[group(id = "message_options")]
pub struct Options {
Expand Down Expand Up @@ -46,7 +48,7 @@ pub enum SendError {

async fn send_encrypted<U: IntoUrl>(
from: Participant,
client: &Client,
client: Client,
url: U,
message: Vec<Ciphered>,
request_timeout: Duration,
Expand Down Expand Up @@ -97,14 +99,16 @@ async fn send_encrypted<U: IntoUrl>(
pub struct MessageQueue {
deque: VecDeque<(ParticipantInfo, MpcMessage, Instant)>,
seen_counts: HashSet<String>,
account_id: AccountId,
message_options: Options,
}

impl MessageQueue {
pub fn new(options: Options) -> Self {
pub fn new(id: &AccountId, options: Options) -> Self {
Self {
deque: VecDeque::default(),
seen_counts: HashSet::default(),
deque: VecDeque::new(),
seen_counts: HashSet::new(),
account_id: id.clone(),
message_options: options,
}
}
Expand Down Expand Up @@ -162,54 +166,58 @@ impl MessageQueue {
encrypted.push((encrypted_msg, (info, msg, instant)));
}

let mut compacted = 0;
let mut tasks = tokio::task::JoinSet::new();
for (id, encrypted) in encrypted {
for partition in partition_ciphered_256kb(encrypted) {
let (encrypted_partition, msgs): (Vec<_>, Vec<_>) = partition.into_iter().unzip();
let (encrypted_partition, _msgs): (Vec<_>, Vec<_>) = partition.into_iter().unzip();
// guaranteed to unwrap due to our previous loop check:
let info = participants.get(&Participant::from(id)).unwrap();
let id = Participant::from(id);
let info = participants.get(&id).unwrap();
let account_id = &info.account_id;

let start = Instant::now();
crate::metrics::NUM_SEND_ENCRYPTED_TOTAL
.with_label_values(&[account_id.as_str()])
.inc();
if let Err(err) = send_encrypted(

tasks.spawn(send_encrypted(
from,
client,
&info.url,
client.clone(),
info.url.clone(),
encrypted_partition,
Duration::from_millis(self.message_options.timeout),
)
.await
{
crate::metrics::NUM_SEND_ENCRYPTED_FAILURE
.with_label_values(&[account_id.as_str()])
.inc();
crate::metrics::FAILED_SEND_ENCRYPTED_LATENCY
.with_label_values(&[account_id.as_str()])
.observe(start.elapsed().as_millis() as f64);

// since we failed, put back all the messages related to this
failed.extend(msgs);
));
}
}

let mut compacted = 0;
while let Some(result) = tasks.join_next().await {
match result {
Ok(result) => {
let Err(err) = result else {
compacted += 1;
continue;
};
errors.push(err);
} else {
compacted += msgs.len();
crate::metrics::SEND_ENCRYPTED_LATENCY
.with_label_values(&[account_id.as_str()])
.observe(start.elapsed().as_millis() as f64);
}
Err(err) => {
tracing::error!(?err, "message queue task failure");
}
}
}

if uncompacted > 0 {
let elapsed = outer.elapsed();
if elapsed > Duration::from_millis(100) && uncompacted > 0 {
tracing::info!(
uncompacted,
compacted,
"{from:?} sent messages in {:?};",
outer.elapsed()
"{from:?} sent messages in {:?}",
elapsed,
);
}
crate::metrics::SEND_ENCRYPTED_LATENCY
.with_label_values(&[self.account_id.as_str()])
.observe(elapsed.as_millis() as f64);

// only add the participant count if it hasn't been seen before.
let counts = format!("{participant_counter:?}");
if !participant_counter.is_empty() && self.seen_counts.insert(counts.clone()) {
Expand Down
19 changes: 0 additions & 19 deletions chain-signatures/node/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,6 @@ pub(crate) static PROTOCOL_LATENCY_ITER_MESSAGE: Lazy<HistogramVec> = Lazy::new(
.unwrap()
});

pub(crate) static NUM_SEND_ENCRYPTED_FAILURE: Lazy<CounterVec> = Lazy::new(|| {
try_create_counter_vec(
"multichain_send_encrypted_failure",
"number of successful send encrypted",
&["node_account_id"],
)
.unwrap()
});

pub(crate) static NUM_SEND_ENCRYPTED_TOTAL: Lazy<CounterVec> = Lazy::new(|| {
try_create_counter_vec(
"multichain_send_encrypted_total",
Expand All @@ -325,16 +316,6 @@ pub(crate) static NUM_SEND_ENCRYPTED_TOTAL: Lazy<CounterVec> = Lazy::new(|| {
.unwrap()
});

pub(crate) static FAILED_SEND_ENCRYPTED_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"multichain_failed_send_encrypted_ms",
"Latency of failed send encrypted.",
&["node_account_id"],
Some(exponential_buckets(0.5, 1.5, 20).unwrap()),
)
.unwrap()
});

pub(crate) static NUM_TOTAL_HISTORICAL_SIGNATURE_GENERATORS: Lazy<CounterVec> = Lazy::new(|| {
try_create_counter_vec(
"multichain_num_total_historical_signature_generators",
Expand Down
3 changes: 3 additions & 0 deletions chain-signatures/node/src/protocol/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ impl ConsensusProtocol for StartedState {
),
)),
messages: Arc::new(RwLock::new(MessageQueue::new(
ctx.my_account_id(),
ctx.message_options().clone(),
))),
}))
Expand Down Expand Up @@ -229,6 +230,7 @@ impl ConsensusProtocol for StartedState {
threshold: contract_state.threshold,
protocol,
messages: Arc::new(RwLock::new(MessageQueue::new(
ctx.my_account_id(),
ctx.message_options().clone(),
))),
}))
Expand Down Expand Up @@ -767,6 +769,7 @@ async fn start_resharing<C: ConsensusCtx>(
public_key: contract_state.public_key,
protocol,
messages: Arc::new(RwLock::new(MessageQueue::new(
ctx.my_account_id(),
ctx.message_options().clone(),
))),
}))
Expand Down
7 changes: 5 additions & 2 deletions chain-signatures/node/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,15 +249,17 @@ impl MpcSignProtocol {
crate::metrics::PROTOCOL_ITER_CNT
.with_label_values(&[my_account_id.as_str()])
.inc();

let msg_time = Instant::now();
let mut msg_count = 0;
loop {
let msg_result = self.receiver.try_recv();
match msg_result {
Ok(msg) => {
tracing::debug!("received a new message");
msg_count += 1;
queue.push(msg);
}
Err(TryRecvError::Empty) => {
tracing::debug!("no new messages received");
break;
}
Err(TryRecvError::Disconnected) => {
Expand All @@ -266,6 +268,7 @@ impl MpcSignProtocol {
}
}
}
tracing::debug!("received {msg_count} messages in {:?}", msg_time.elapsed());

let contract_state = if last_state_update.elapsed() > Duration::from_secs(1) {
let contract_state = match rpc_client::fetch_mpc_contract_state(
Expand Down
Loading