Skip to content

Commit

Permalink
Have one thread per chain on benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-ds committed Feb 19, 2025
1 parent 10e8da9 commit b6acbe6
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 114 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ colored = "2.1.0"
comfy-table = "7.1.0"
convert_case = "0.6.0"
criterion = { version = "0.5.1", default-features = false }
crossbeam-channel = "0.5.14"
custom_debug_derive = "0.6.1"
dashmap = "5.5.3"
derive_more = "1.0.0"
Expand Down
8 changes: 7 additions & 1 deletion linera-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ version.workspace = true

[features]
test = ["linera-views/test", "linera-execution/test"]
benchmark = ["linera-base/test", "dep:linera-sdk", "dep:tokio-util"]
benchmark = [
"linera-base/test",
"dep:linera-sdk",
"dep:tokio-util",
"dep:crossbeam-channel",
]
wasmer = [
"linera-core/wasmer",
"linera-execution/wasmer",
Expand Down Expand Up @@ -55,6 +60,7 @@ bcs.workspace = true
cfg-if.workspace = true
chrono = { workspace = true, features = ["clock"] }
clap.workspace = true
crossbeam-channel = { workspace = true, optional = true }
derive_more = { workspace = true, features = ["deref", "deref_mut"] }
dirs.workspace = true
futures.workspace = true
Expand Down
124 changes: 39 additions & 85 deletions linera-client/src/client_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,12 @@ use {
data_types::Amount,
hashed::Hashed,
identifiers::{AccountOwner, ApplicationId, Owner},
listen_for_shutdown_signals,
},
linera_chain::{
data_types::{BlockProposal, ExecutedBlock, ProposedBlock, SignatureAggregator, Vote},
types::{CertificateValue, ConfirmedBlock, GenericCertificate},
},
linera_core::{client::ChainClientError, data_types::ChainInfoQuery, node::ValidatorNode},
linera_core::{data_types::ChainInfoQuery, node::ValidatorNode},
linera_execution::{
committee::Epoch,
system::{OpenChainConfig, Recipient, SystemOperation, OPEN_CHAIN_MESSAGE_INDEX},
Expand All @@ -49,7 +48,7 @@ use {
std::{collections::HashMap, iter},
tokio::task,
tokio_util::sync::CancellationToken,
tracing::{error, trace, warn},
tracing::{error, trace},
};
#[cfg(feature = "fs")]
use {
Expand Down Expand Up @@ -585,28 +584,30 @@ where
{
#[allow(clippy::too_many_arguments)]
pub async fn run_benchmark(
&mut self,
&self,
bps: Option<usize>,
blocks_infos_iter: impl Iterator<Item = &(ChainId, Vec<Operation>, AccountSecretKey)>,
chain_id: ChainId,
operations: Vec<Operation>,
key_pair: AccountSecretKey,
clients: Vec<linera_rpc::Client>,
transactions_per_block: usize,
epoch: Epoch,
chain_clients: HashMap<ChainId, ChainClient<NodeProvider, S>>,
chain_client: ChainClient<NodeProvider, S>,
shutdown_notifier: CancellationToken,
sender: crossbeam_channel::Sender<()>,
) -> Result<(), Error> {
let shutdown_notifier = CancellationToken::new();
tokio::spawn(listen_for_shutdown_signals(shutdown_notifier.clone()));

info!(
"Starting benchmark at target BPS of {:?}, for chain {:?}",
bps, chain_id
);
let mut num_sent_proposals = 0;
let mut start = Instant::now();
for (chain_id, operations, key_pair) in blocks_infos_iter {
loop {
if shutdown_notifier.is_cancelled() {
info!("Shutdown signal received, stopping benchmark");
break;
}
let chain_client = &chain_clients[chain_id];
let block = ProposedBlock {
epoch,
chain_id: *chain_id,
chain_id,
incoming_bundles: Vec::new(),
operations: operations.clone(),
previous_block_hash: chain_client.block_hash(),
Expand All @@ -617,7 +618,7 @@ where
let executed_block = self.stage_block_execution(block.clone(), None).await?;
let value = Hashed::new(ConfirmedBlock::new(executed_block));
let proposal =
BlockProposal::new_initial(linera_base::data_types::Round::Fast, block, key_pair);
BlockProposal::new_initial(linera_base::data_types::Round::Fast, block, &key_pair);

let mut join_set = task::JoinSet::new();
for client in &clients {
Expand All @@ -637,14 +638,8 @@ where
})
.collect::<Vec<_>>();

let certificates = self.make_benchmark_certificates_from_votes(votes);
assert_eq!(
certificates.len(),
1,
"Unable to build all the expected certificates from received votes"
);
let certificate = self.make_benchmark_certificate_from_votes(votes);

let certificate = &certificates[0];
let mut join_set = task::JoinSet::new();
for client in &clients {
let client = client.clone();
Expand Down Expand Up @@ -679,40 +674,20 @@ where
num_sent_proposals += 1;
if let Some(bps) = bps {
if num_sent_proposals == bps {
let elapsed = start.elapsed();
if elapsed > Duration::from_secs(1) {
warn!(
"Failed to achieve {} BPS/{} TPS, took {} ms",
bps,
bps * transactions_per_block,
elapsed.as_millis()
);
} else {
tokio::time::sleep(Duration::from_secs(1) - elapsed).await;
info!(
"Achieved {} BPS/{} TPS in {} ms",
bps,
bps * transactions_per_block,
elapsed.as_millis()
);
}
start = Instant::now();
sender.send(())?;
num_sent_proposals = 0;
}
} else {
break;
}
}

if bps.is_none() {
let elapsed = start.elapsed();
let bps = num_sent_proposals as f64 / elapsed.as_secs_f64();
info!(
"Achieved {} BPS/{} TPS",
bps,
bps * transactions_per_block as f64
);
sender.send(())?;
}

self.close_benchmark_chains(chain_clients).await?;
self.close_benchmark_chain(chain_client).await?;
info!("Exiting task...");
Ok(())
}

Expand Down Expand Up @@ -835,35 +810,20 @@ where
Ok((benchmark_chains, chain_clients))
}

/// Closes the chains that were created for the benchmark.
pub async fn close_benchmark_chains(
&mut self,
chain_clients: HashMap<ChainId, ChainClient<NodeProvider, S>>,
/// Closes the chain that was created for the benchmark.
pub async fn close_benchmark_chain(
&self,
chain_client: ChainClient<NodeProvider, S>,
) -> Result<(), Error> {
let num_chains = chain_clients.len();

let mut join_set = task::JoinSet::new();
for chain_client in chain_clients.values() {
let chain_client = chain_client.clone();
join_set.spawn(async move {
chain_client
.execute_operation(Operation::System(SystemOperation::CloseChain))
.await?
.expect("Close chain operation should not fail!");
Ok::<_, ChainClientError>(())
});
}

let start = Instant::now();
join_set
.join_all()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
chain_client
.execute_operation(Operation::System(SystemOperation::CloseChain))
.await?
.expect("Close chain operation should not fail!");

info!(
"Closed {} chains in {} ms",
num_chains,
"Closed chain {:?} in {} ms",
chain_client.chain_id(),
start.elapsed().as_millis()
);

Expand Down Expand Up @@ -1025,24 +985,18 @@ where
blocks_infos
}

/// Tries to aggregate votes into certificates.
pub fn make_benchmark_certificates_from_votes<T>(
/// Tries to aggregate votes into a certificate.
pub fn make_benchmark_certificate_from_votes<T>(
&self,
votes: Vec<Vote<T>>,
) -> Vec<GenericCertificate<T>>
) -> GenericCertificate<T>
where
T: std::fmt::Debug + CertificateValue,
{
let committee = self.wallet.genesis_config().create_committee();
let mut aggregators = HashMap::new();
let mut certificates = Vec::new();
let mut done_senders = HashSet::new();
for vote in votes {
// We aggregate votes indexed by sender.
let chain_id = vote.value().inner().chain_id();
if done_senders.contains(&chain_id) {
continue;
}
trace!(
"Processing vote on {:?}'s block by {:?}",
chain_id,
Expand All @@ -1058,8 +1012,7 @@ where
match aggregator.append(vote.validator, vote.signature) {
Ok(Some(certificate)) => {
trace!("Found certificate: {:?}", certificate);
certificates.push(certificate);
done_senders.insert(chain_id);
return certificate;
}
Ok(None) => {
trace!("Added one vote");
Expand All @@ -1069,7 +1022,8 @@ where
}
}
}
certificates

panic!("Could not build a certificate from votes");
}

pub async fn update_wallet_from_certificate(&mut self, certificate: ConfirmedBlockCertificate) {
Expand Down
3 changes: 3 additions & 0 deletions linera-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub(crate) enum Inner {
LocalNode(#[from] linera_core::local_node::LocalNodeError),
#[error("remote node operation failed: {0}")]
RemoteNode(#[from] linera_core::node::NodeError),
#[cfg(feature = "benchmark")]
#[error("failed to send message: {0}")]
SendError(#[from] crossbeam_channel::SendError<()>),
}

thiserror_context::impl_context!(Error(Inner));
Expand Down
2 changes: 2 additions & 0 deletions linera-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ benchmark = [
"linera-base/test",
"linera-client/benchmark",
"linera-chain/benchmark",
"dep:crossbeam-channel",
]
wasmer = ["linera-client/wasmer", "linera-execution/wasmer", "linera-storage/wasmer"]
wasmtime = [
Expand Down Expand Up @@ -76,6 +77,7 @@ clap-markdown.workspace = true
colored.workspace = true
comfy-table.workspace = true
convert_case.workspace = true
crossbeam-channel = { workspace = true, optional = true }
current_platform = "0.2.0"
fs-err = { workspace = true, features = ["tokio"] }
fs_extra = { workspace = true, optional = true }
Expand Down
Loading

0 comments on commit b6acbe6

Please sign in to comment.