-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Have one thread per chain on benchmark
- Loading branch information
Showing
8 changed files
with
465 additions
and
319 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,342 @@ | ||
// Copyright (c) Zefchain Labs, Inc. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
use std::collections::HashMap; | ||
|
||
use linera_base::{ | ||
crypto::AccountSecretKey, | ||
data_types::Timestamp, | ||
hashed::Hashed, | ||
identifiers::{ChainId, Owner}, | ||
listen_for_shutdown_signals, | ||
time::Instant, | ||
}; | ||
use linera_chain::{ | ||
data_types::{BlockProposal, ProposedBlock, SignatureAggregator, Vote}, | ||
types::{CertificateValue, ConfirmedBlock, GenericCertificate}, | ||
}; | ||
use linera_core::{ | ||
client::ChainClient, | ||
local_node::LocalNodeClient, | ||
node::{CrossChainMessageDelivery, ValidatorNode}, | ||
}; | ||
use linera_execution::{ | ||
committee::{Committee, Epoch}, | ||
system::SystemOperation, | ||
Operation, | ||
}; | ||
use linera_rpc::node_provider::NodeProvider; | ||
use linera_storage::Storage; | ||
use tokio::{runtime::Handle, task, time}; | ||
use tokio_util::sync::CancellationToken; | ||
use tracing::{debug, error, info, trace, warn, Instrument as _}; | ||
|
||
use crate::Error; | ||
|
||
pub struct Benchmark<Storage> | ||
where | ||
Storage: linera_storage::Storage, | ||
{ | ||
_phantom: std::marker::PhantomData<Storage>, | ||
} | ||
|
||
impl<S> Benchmark<S> | ||
where | ||
S: Storage + Clone + Send + Sync + 'static, | ||
{ | ||
#[allow(clippy::too_many_arguments)] | ||
pub async fn run_benchmark( | ||
num_chains: usize, | ||
transactions_per_block: usize, | ||
bps: Option<usize>, | ||
chain_clients: HashMap<ChainId, ChainClient<NodeProvider, S>>, | ||
epoch: Epoch, | ||
blocks_infos: Vec<(ChainId, Vec<Operation>, AccountSecretKey)>, | ||
clients: Vec<linera_rpc::Client>, | ||
committee: Committee, | ||
local_node: LocalNodeClient<S>, | ||
) -> Result<(), Error> { | ||
let shutdown_notifier = CancellationToken::new(); | ||
tokio::spawn(listen_for_shutdown_signals(shutdown_notifier.clone())); | ||
|
||
// The bps control task will control the BPS from the threads. `crossbeam_channel` is used | ||
// for two reasons: | ||
// 1. it allows bounded channels with zero sized buffers. | ||
// 2. it blocks the current thread if the message can't be sent, which is exactly | ||
// what we want to happen here. `tokio::sync::mpsc` doesn't do that. `std::sync::mpsc` | ||
// does, but it is slower than `crossbeam_channel`. | ||
// Number 1 is the main reason. `tokio::sync::mpsc` doesn't allow 0 sized buffers. | ||
// With a channel with a buffer of size 1 or larger, even if we have already reached | ||
// the desired BPS, the tasks would continue sending block proposals until the channel's | ||
// buffer is filled, which would cause us to not properly control the BPS rate. | ||
let (sender, receiver) = crossbeam_channel::bounded(0); | ||
let bps_control_task = tokio::spawn(async move { | ||
let mut recv_count = 0; | ||
let mut start = time::Instant::now(); | ||
while let Ok(()) = receiver.recv() { | ||
recv_count += 1; | ||
if recv_count == num_chains { | ||
let elapsed = start.elapsed(); | ||
if let Some(bps) = bps { | ||
if elapsed > time::Duration::from_secs(1) { | ||
warn!( | ||
"Failed to achieve {} BPS/{} TPS in {} ms", | ||
bps, | ||
bps * transactions_per_block, | ||
elapsed.as_millis(), | ||
); | ||
} else { | ||
time::sleep(time::Duration::from_secs(1) - elapsed).await; | ||
info!( | ||
"Achieved {} BPS/{} TPS in {} ms", | ||
bps, | ||
bps * transactions_per_block, | ||
elapsed.as_millis(), | ||
); | ||
} | ||
} else { | ||
let achieved_bps = num_chains as f64 / elapsed.as_secs_f64(); | ||
info!( | ||
"Achieved {} BPS/{} TPS in {} ms", | ||
achieved_bps, | ||
achieved_bps * transactions_per_block as f64, | ||
elapsed.as_millis(), | ||
); | ||
} | ||
|
||
recv_count = 0; | ||
start = time::Instant::now(); | ||
} | ||
} | ||
|
||
info!("Exiting logging task..."); | ||
}); | ||
|
||
let mut bps_remainder = bps.unwrap_or_default() % num_chains; | ||
let bps_share = bps.map(|bps| bps / num_chains); | ||
|
||
let mut join_set = task::JoinSet::<Result<(), Error>>::new(); | ||
for (chain_id, operations, key_pair) in blocks_infos { | ||
let bps_share = if bps_remainder > 0 { | ||
bps_remainder -= 1; | ||
Some(bps_share.unwrap() + 1) | ||
} else { | ||
bps_share | ||
}; | ||
|
||
let shutdown_notifier = shutdown_notifier.clone(); | ||
let sender = sender.clone(); | ||
let handle = Handle::current(); | ||
let clients = clients.clone(); | ||
let committee = committee.clone(); | ||
let local_node = local_node.clone(); | ||
let chain_client = chain_clients[&chain_id].clone(); | ||
let short_chain_id = format!("{:?}", chain_id); | ||
join_set.spawn_blocking(move || { | ||
handle.block_on( | ||
async move { | ||
Self::run_benchmark_internal( | ||
bps_share, | ||
chain_id, | ||
operations, | ||
key_pair, | ||
clients, | ||
epoch, | ||
chain_client, | ||
shutdown_notifier, | ||
sender, | ||
committee, | ||
local_node, | ||
) | ||
.await?; | ||
|
||
Ok(()) | ||
} | ||
.instrument(tracing::info_span!( | ||
"benchmark_chain_id", | ||
chain_id = short_chain_id | ||
)), | ||
) | ||
}); | ||
} | ||
|
||
join_set | ||
.join_all() | ||
.await | ||
.into_iter() | ||
.collect::<Result<Vec<_>, _>>()?; | ||
drop(sender); | ||
info!("All benchmark tasks completed"); | ||
bps_control_task.await?; | ||
|
||
Ok(()) | ||
} | ||
|
||
#[allow(clippy::too_many_arguments)] | ||
async fn run_benchmark_internal( | ||
bps: Option<usize>, | ||
chain_id: ChainId, | ||
operations: Vec<Operation>, | ||
key_pair: AccountSecretKey, | ||
clients: Vec<linera_rpc::Client>, | ||
epoch: Epoch, | ||
chain_client: ChainClient<NodeProvider, S>, | ||
shutdown_notifier: CancellationToken, | ||
sender: crossbeam_channel::Sender<()>, | ||
committee: Committee, | ||
local_node: LocalNodeClient<S>, | ||
) -> Result<(), Error> { | ||
info!( | ||
"Starting benchmark at target BPS of {:?}, for chain {:?}", | ||
bps, chain_id | ||
); | ||
let mut num_sent_proposals = 0; | ||
loop { | ||
if shutdown_notifier.is_cancelled() { | ||
info!("Shutdown signal received, stopping benchmark"); | ||
break; | ||
} | ||
let block = ProposedBlock { | ||
epoch, | ||
chain_id, | ||
incoming_bundles: Vec::new(), | ||
operations: operations.clone(), | ||
previous_block_hash: chain_client.block_hash(), | ||
height: chain_client.next_block_height(), | ||
authenticated_signer: Some(Owner::from(key_pair.public())), | ||
timestamp: chain_client.timestamp().max(Timestamp::now()), | ||
}; | ||
let executed_block = local_node | ||
.stage_block_execution(block.clone(), None) | ||
.await? | ||
.0; | ||
|
||
let value = Hashed::new(ConfirmedBlock::new(executed_block)); | ||
let proposal = | ||
BlockProposal::new_initial(linera_base::data_types::Round::Fast, block, &key_pair); | ||
|
||
let mut join_set = task::JoinSet::new(); | ||
for client in &clients { | ||
let client = client.clone(); | ||
let proposal = proposal.clone(); | ||
join_set.spawn(async move { client.handle_block_proposal(proposal).await }); | ||
} | ||
let votes = join_set | ||
.join_all() | ||
.await | ||
.into_iter() | ||
.collect::<Result<Vec<_>, _>>()? | ||
.into_iter() | ||
.filter_map(|response| { | ||
let vote = response.info.manager.pending?; | ||
vote.clone().with_value(value.clone()) | ||
}) | ||
.collect::<Vec<_>>(); | ||
|
||
let certificate = Self::make_benchmark_certificate_from_votes(votes, committee.clone()); | ||
|
||
let mut join_set = task::JoinSet::new(); | ||
for client in &clients { | ||
let client = client.clone(); | ||
let certificate = certificate.clone(); | ||
join_set.spawn(async move { | ||
client | ||
.handle_confirmed_certificate( | ||
certificate, | ||
CrossChainMessageDelivery::NonBlocking, | ||
) | ||
.await | ||
}); | ||
} | ||
|
||
join_set | ||
.join_all() | ||
.await | ||
.into_iter() | ||
.collect::<Result<Vec<_>, _>>()?; | ||
|
||
// Replay the certificate locally. | ||
// No required certificates from other chains: This is only used with benchmark. | ||
chain_client | ||
.process_certificate(certificate.clone()) | ||
.await?; | ||
|
||
num_sent_proposals += 1; | ||
if let Some(bps) = bps { | ||
if num_sent_proposals == bps { | ||
sender.send(())?; | ||
num_sent_proposals = 0; | ||
} | ||
} else { | ||
break; | ||
} | ||
} | ||
|
||
if bps.is_none() { | ||
sender.send(())?; | ||
} | ||
|
||
Self::close_benchmark_chain(chain_client).await?; | ||
info!("Exiting task..."); | ||
Ok(()) | ||
} | ||
|
||
/// Tries to aggregate votes into a certificate. | ||
fn make_benchmark_certificate_from_votes<T>( | ||
votes: Vec<Vote<T>>, | ||
committee: Committee, | ||
) -> GenericCertificate<T> | ||
where | ||
T: std::fmt::Debug + CertificateValue, | ||
{ | ||
let mut aggregators = HashMap::new(); | ||
for vote in votes { | ||
let chain_id = vote.value().inner().chain_id(); | ||
trace!( | ||
"Processing vote on {:?}'s block by {:?}", | ||
chain_id, | ||
vote.public_key, | ||
); | ||
let aggregator = aggregators.entry(chain_id).or_insert_with(|| { | ||
SignatureAggregator::new( | ||
vote.value, | ||
linera_base::data_types::Round::Fast, | ||
&committee, | ||
) | ||
}); | ||
match aggregator.append(vote.public_key, vote.signature) { | ||
Ok(Some(certificate)) => { | ||
trace!("Found certificate: {:?}", certificate); | ||
return certificate; | ||
} | ||
Ok(None) => { | ||
trace!("Added one vote"); | ||
} | ||
Err(error) => { | ||
error!("Failed to aggregate vote: {}", error); | ||
} | ||
} | ||
} | ||
|
||
panic!("Could not build a certificate from votes"); | ||
} | ||
|
||
/// Closes the chain that was created for the benchmark. | ||
async fn close_benchmark_chain( | ||
chain_client: ChainClient<NodeProvider, S>, | ||
) -> Result<(), Error> { | ||
let start = Instant::now(); | ||
chain_client | ||
.execute_operation(Operation::System(SystemOperation::CloseChain)) | ||
.await? | ||
.expect("Close chain operation should not fail!"); | ||
|
||
debug!( | ||
"Closed chain {:?} in {} ms", | ||
chain_client.chain_id(), | ||
start.elapsed().as_millis() | ||
); | ||
|
||
Ok(()) | ||
} | ||
} |
Oops, something went wrong.