Skip to content

Commit

Permalink
Have one thread per chain on benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-ds committed Feb 20, 2025
1 parent 56771fa commit 2bc26c1
Show file tree
Hide file tree
Showing 9 changed files with 461 additions and 319 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ colored = "2.1.0"
comfy-table = "7.1.0"
convert_case = "0.6.0"
criterion = { version = "0.5.1", default-features = false }
crossbeam-channel = "0.5.14"
custom_debug_derive = "0.6.1"
dashmap = "5.5.3"
derive_more = "1.0.0"
Expand Down
8 changes: 7 additions & 1 deletion linera-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ version.workspace = true

[features]
test = ["linera-views/test", "linera-execution/test"]
benchmark = ["linera-base/test", "dep:linera-sdk", "dep:tokio-util"]
benchmark = [
"linera-base/test",
"dep:linera-sdk",
"dep:tokio-util",
"dep:crossbeam-channel",
]
wasmer = [
"linera-core/wasmer",
"linera-execution/wasmer",
Expand Down Expand Up @@ -55,6 +60,7 @@ bcs.workspace = true
cfg-if.workspace = true
chrono = { workspace = true, features = ["clock"] }
clap.workspace = true
crossbeam-channel = { workspace = true, optional = true }
derive_more = { workspace = true, features = ["deref", "deref_mut"] }
dirs.workspace = true
futures.workspace = true
Expand Down
335 changes: 335 additions & 0 deletions linera-client/src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
// Copyright (c) Zefchain Labs, Inc.

Check warning on line 1 in linera-client/src/benchmark.rs

View workflow job for this annotation

GitHub Actions / lint-cargo-fmt

Diff in /home/runner/work/linera-protocol/linera-protocol/linera-client/src/benchmark.rs
// SPDX-License-Identifier: Apache-2.0

use crate::Error;
use linera_base::{
crypto::AccountSecretKey, data_types::Timestamp, hashed::Hashed, identifiers::ChainId,
identifiers::Owner, listen_for_shutdown_signals, time::Instant,
};
use linera_chain::{
data_types::{BlockProposal, ProposedBlock, SignatureAggregator, Vote},
types::{CertificateValue, ConfirmedBlock, GenericCertificate},

Check warning on line 11 in linera-client/src/benchmark.rs

View workflow job for this annotation

GitHub Actions / lint-cargo-fmt

Diff in /home/runner/work/linera-protocol/linera-protocol/linera-client/src/benchmark.rs
};
use linera_core::{
client::ChainClient, local_node::LocalNodeClient, node::CrossChainMessageDelivery,
node::ValidatorNode,
};
use linera_execution::{
committee::{Committee, Epoch},
system::SystemOperation,
Operation,
};

Check warning on line 21 in linera-client/src/benchmark.rs

View workflow job for this annotation

GitHub Actions / lint-cargo-fmt

Diff in /home/runner/work/linera-protocol/linera-protocol/linera-client/src/benchmark.rs
use linera_rpc::node_provider::NodeProvider;
use linera_storage::Storage;
use std::collections::HashMap;
use tokio::{runtime::Handle, task, time};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn, Instrument as _};

Check warning on line 28 in linera-client/src/benchmark.rs

View workflow job for this annotation

GitHub Actions / lint-cargo-fmt

Diff in /home/runner/work/linera-protocol/linera-protocol/linera-client/src/benchmark.rs
pub struct Benchmark<Storage>
where
Storage: linera_storage::Storage,
{
_phantom: std::marker::PhantomData<Storage>,
}

impl<S> Benchmark<S>
where
S: Storage + Clone + Send + Sync + 'static,
{
#[allow(clippy::too_many_arguments)]
pub async fn run_benchmark(
num_chains: usize,
transactions_per_block: usize,
bps: Option<usize>,
chain_clients: HashMap<ChainId, ChainClient<NodeProvider, S>>,
epoch: Epoch,
blocks_infos: Vec<(ChainId, Vec<Operation>, AccountSecretKey)>,
clients: Vec<linera_rpc::Client>,
committee: Committee,
local_node: LocalNodeClient<S>,
) -> Result<(), Error> {
let shutdown_notifier = CancellationToken::new();
tokio::spawn(listen_for_shutdown_signals(shutdown_notifier.clone()));

// The bps control task will control the BPS from the threads. `crossbeam_channel` is used
// for two reasons:
// 1. it allows bounded channels with zero sized buffers.
// 2. it blocks the current thread if the message can't be sent, which is exactly
// what we want to happen here. `tokio::sync::mpsc` doesn't do that. `std::sync::mpsc`
// does, but it is slower than `crossbeam_channel`.
// Number 1 is the main reason. `tokio::sync::mpsc` doesn't allow 0 sized buffers.
// With a channel with a buffer of size 1 or larger, even if we have already reached
// the desired BPS, the tasks would continue sending block proposals until the channel's
// buffer is filled, which would cause us to not properly control the BPS rate.
let (sender, receiver) = crossbeam_channel::bounded(0);
let bps_control_task = tokio::spawn(async move {
let mut recv_count = 0;
let mut start = time::Instant::now();
while let Ok(()) = receiver.recv() {
recv_count += 1;
if recv_count == num_chains {
let elapsed = start.elapsed();
if let Some(bps) = bps {
if elapsed > time::Duration::from_secs(1) {
warn!(
"Failed to achieve {} BPS/{} TPS in {} ms",
bps,
bps * transactions_per_block,
elapsed.as_millis(),
);
} else {
time::sleep(time::Duration::from_secs(1) - elapsed).await;
info!(
"Achieved {} BPS/{} TPS in {} ms",
bps,
bps * transactions_per_block,
elapsed.as_millis(),
);
}
} else {
let achieved_bps = num_chains as f64 / elapsed.as_secs_f64();
info!(
"Achieved {} BPS/{} TPS in {} ms",
achieved_bps,
achieved_bps * transactions_per_block as f64,
elapsed.as_millis(),
);
}

recv_count = 0;
start = time::Instant::now();
}
}

info!("Exiting logging task...");
});

let mut bps_remainder = bps.unwrap_or_default() % num_chains;
let bps_share = bps.map(|bps| bps / num_chains);

let mut join_set = task::JoinSet::<Result<(), Error>>::new();
for (chain_id, operations, key_pair) in blocks_infos {
let bps_share = if bps_remainder > 0 {
bps_remainder -= 1;
Some(bps_share.unwrap() + 1)
} else {
bps_share
};

let shutdown_notifier = shutdown_notifier.clone();
let sender = sender.clone();
let handle = Handle::current();
let clients = clients.clone();
let committee = committee.clone();
let local_node = local_node.clone();
let chain_client = chain_clients[&chain_id].clone();
let short_chain_id = format!("{:?}", chain_id);
join_set.spawn_blocking(move || {
handle.block_on(
async move {
Self::run_benchmark_internal(
bps_share,
chain_id,
operations,
key_pair,
clients,
epoch,
chain_client,
shutdown_notifier,
sender,
committee,
local_node,
)
.await?;

Ok(())
}
.instrument(tracing::info_span!(
"benchmark_chain_id",
chain_id = short_chain_id
)),
)
});
}

join_set
.join_all()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
drop(sender);
info!("All benchmark tasks completed");
bps_control_task.await?;

Ok(())
}

#[allow(clippy::too_many_arguments)]
async fn run_benchmark_internal(
bps: Option<usize>,
chain_id: ChainId,
operations: Vec<Operation>,
key_pair: AccountSecretKey,
clients: Vec<linera_rpc::Client>,
epoch: Epoch,
chain_client: ChainClient<NodeProvider, S>,
shutdown_notifier: CancellationToken,
sender: crossbeam_channel::Sender<()>,
committee: Committee,
local_node: LocalNodeClient<S>,
) -> Result<(), Error> {
info!(
"Starting benchmark at target BPS of {:?}, for chain {:?}",
bps, chain_id
);
let mut num_sent_proposals = 0;
loop {
if shutdown_notifier.is_cancelled() {
info!("Shutdown signal received, stopping benchmark");
break;
}
let block = ProposedBlock {
epoch,
chain_id,
incoming_bundles: Vec::new(),
operations: operations.clone(),
previous_block_hash: chain_client.block_hash(),
height: chain_client.next_block_height(),
authenticated_signer: Some(Owner::from(key_pair.public())),
timestamp: chain_client.timestamp().max(Timestamp::now()),
};
let executed_block = local_node
.stage_block_execution(block.clone(), None)
.await?
.0;

let value = Hashed::new(ConfirmedBlock::new(executed_block));
let proposal =
BlockProposal::new_initial(linera_base::data_types::Round::Fast, block, &key_pair);

let mut join_set = task::JoinSet::new();
for client in &clients {
let client = client.clone();
let proposal = proposal.clone();
join_set.spawn(async move { client.handle_block_proposal(proposal).await });
}
let votes = join_set
.join_all()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.filter_map(|response| {
let vote = response.info.manager.pending?;
vote.clone().with_value(value.clone())
})
.collect::<Vec<_>>();

let certificate = Self::make_benchmark_certificate_from_votes(votes, committee.clone());

let mut join_set = task::JoinSet::new();
for client in &clients {
let client = client.clone();
let certificate = certificate.clone();
join_set.spawn(async move {
client
.handle_confirmed_certificate(
certificate,
CrossChainMessageDelivery::NonBlocking,
)
.await
});
}

join_set
.join_all()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;

// Replay the certificate locally.
// No required certificates from other chains: This is only used with benchmark.
chain_client
.process_certificate(certificate.clone())
.await?;

num_sent_proposals += 1;
if let Some(bps) = bps {
if num_sent_proposals == bps {
sender.send(())?;
num_sent_proposals = 0;
}
} else {
break;
}
}

if bps.is_none() {
sender.send(())?;
}

Self::close_benchmark_chain(chain_client).await?;
info!("Exiting task...");
Ok(())
}

/// Tries to aggregate votes into a certificate.
fn make_benchmark_certificate_from_votes<T>(
votes: Vec<Vote<T>>,
committee: Committee,
) -> GenericCertificate<T>
where
T: std::fmt::Debug + CertificateValue,
{
let mut aggregators = HashMap::new();
for vote in votes {
let chain_id = vote.value().inner().chain_id();
trace!(
"Processing vote on {:?}'s block by {:?}",
chain_id,
vote.public_key,
);
let aggregator = aggregators.entry(chain_id).or_insert_with(|| {
SignatureAggregator::new(
vote.value,
linera_base::data_types::Round::Fast,
&committee,
)
});
match aggregator.append(vote.public_key, vote.signature) {
Ok(Some(certificate)) => {
trace!("Found certificate: {:?}", certificate);
return certificate;
}
Ok(None) => {
trace!("Added one vote");
}
Err(error) => {
error!("Failed to aggregate vote: {}", error);
}
}
}

panic!("Could not build a certificate from votes");
}

/// Closes the chain that was created for the benchmark.
async fn close_benchmark_chain(
chain_client: ChainClient<NodeProvider, S>,
) -> Result<(), Error> {
let start = Instant::now();
chain_client
.execute_operation(Operation::System(SystemOperation::CloseChain))
.await?
.expect("Close chain operation should not fail!");

debug!(
"Closed chain {:?} in {} ms",
chain_client.chain_id(),
start.elapsed().as_millis()
);

Ok(())
}
}
Loading

0 comments on commit 2bc26c1

Please sign in to comment.