Skip to content

Commit

Permalink
Use tokio::task::JoinSet directly in mass_broadcast (#3263)
Browse files Browse the repository at this point in the history
## Motivation

Using `futures::future::join_all` should not be necessary here

## Proposal

Use `tokio::task::JoinSet` directly instead, and capture results.

## Test Plan

CI

## Release Plan

- Nothing to do / These changes follow the usual release cycle.
  • Loading branch information
ndr-ds authored Feb 6, 2025
1 parent 8ef379f commit 2362f1f
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions linera-client/src/client_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use {
},
linera_sdk::abis::fungible,
std::{collections::HashMap, iter},
tokio::task,
tracing::{error, trace},
};
#[cfg(feature = "fs")]
Expand Down Expand Up @@ -847,11 +848,10 @@ where
) -> Vec<RpcMessage> {
let time_start = Instant::now();
info!("Broadcasting {} {}", proposals.len(), phase);
let mut join_set = JoinSet::new();
let mut handles = Vec::new();
let mut join_set = task::JoinSet::new();
for client in self.make_validator_mass_clients() {
let proposals = proposals.clone();
let handle = join_set.spawn_task(async move {
join_set.spawn(async move {
debug!("Sending {} requests", proposals.len());
let responses = client
.send(proposals, max_in_flight)
Expand All @@ -860,14 +860,13 @@ where
debug!("Done sending requests");
responses
});
handles.push(handle);
}
let responses = futures::future::join_all(handles)
let responses = join_set
.join_all()
.await
.into_iter()
.flatten()
.flatten()
.collect::<Vec<RpcMessage>>();
.collect::<Vec<_>>();
let time_elapsed = time_start.elapsed();
info!(
"Received {} responses in {} ms.",
Expand Down

0 comments on commit 2362f1f

Please sign in to comment.