From f474fc98599499518f2f3306f745174057fb3d2a Mon Sep 17 00:00:00 2001 From: Rob Date: Tue, 12 Nov 2024 15:46:52 -0500 Subject: [PATCH] synchronously drop tasks --- crates/task-impls/src/consensus/handlers.rs | 10 ++-------- crates/task-impls/src/consensus/mod.rs | 10 +++------- crates/task-impls/src/da.rs | 2 +- crates/task-impls/src/helpers.rs | 7 +------ crates/task-impls/src/network.rs | 16 ++++++++-------- crates/task-impls/src/quorum_proposal/mod.rs | 19 +++++++------------ .../src/quorum_proposal_recv/mod.rs | 11 +++++------ crates/task-impls/src/quorum_vote/mod.rs | 10 +++++----- crates/task-impls/src/request.rs | 4 ++-- crates/task-impls/src/rewind.rs | 2 +- crates/task-impls/src/transactions.rs | 2 +- crates/task-impls/src/upgrade.rs | 2 +- crates/task-impls/src/vid.rs | 2 +- crates/task-impls/src/view_sync.rs | 14 +++++++------- crates/task/src/task.rs | 6 +++--- 15 files changed, 48 insertions(+), 69 deletions(-) diff --git a/crates/task-impls/src/consensus/handlers.rs b/crates/task-impls/src/consensus/handlers.rs index 41db9a2e13..50a6e4ba2a 100644 --- a/crates/task-impls/src/consensus/handlers.rs +++ b/crates/task-impls/src/consensus/handlers.rs @@ -23,9 +23,7 @@ use utils::anytrace::*; use super::ConsensusTaskState; use crate::{ - consensus::Versions, - events::HotShotEvent, - helpers::{broadcast_event, cancel_task}, + consensus::Versions, events::HotShotEvent, helpers::broadcast_event, vote_collection::handle_vote, }; @@ -170,11 +168,7 @@ pub(crate) async fn handle_view_change< }); // Cancel the old timeout task - cancel_task(std::mem::replace( - &mut task_state.timeout_task, - new_timeout_task, - )) - .await; + std::mem::replace(&mut task_state.timeout_task, new_timeout_task).abort(); let consensus_reader = task_state.consensus.read().await; consensus_reader diff --git a/crates/task-impls/src/consensus/mod.rs b/crates/task-impls/src/consensus/mod.rs index 2de584199a..5490ecad8f 100644 --- a/crates/task-impls/src/consensus/mod.rs +++ b/crates/task-impls/src/consensus/mod.rs @@ -28,7 +28,7 @@ use utils::anytrace::Result; use self::handlers::{ handle_quorum_vote_recv, handle_timeout, handle_timeout_vote_recv, handle_view_change, }; -use crate::{events::HotShotEvent, helpers::cancel_task, vote_collection::VoteCollectorsMap}; +use crate::{events::HotShotEvent, vote_collection::VoteCollectorsMap}; /// Event handlers for use in the `handle` method. mod handlers; @@ -167,12 +167,8 @@ impl, V: Versions> TaskState } /// Joins all subtasks. - async fn cancel_subtasks(&mut self) { + fn cancel_subtasks(&mut self) { // Cancel the old timeout task - cancel_task(std::mem::replace( - &mut self.timeout_task, - tokio::spawn(async {}), - )) - .await; + std::mem::replace(&mut self.timeout_task, tokio::spawn(async {})).abort(); } } diff --git a/crates/task-impls/src/da.rs b/crates/task-impls/src/da.rs index 50469b9809..33e36c04f0 100644 --- a/crates/task-impls/src/da.rs +++ b/crates/task-impls/src/da.rs @@ -359,5 +359,5 @@ impl, V: Versions> TaskState self.handle(event, sender.clone()).await } - async fn cancel_subtasks(&mut self) {} + fn cancel_subtasks(&mut self) {} } diff --git a/crates/task-impls/src/helpers.rs b/crates/task-impls/src/helpers.rs index 8fbfd6251d..d66bf44abf 100644 --- a/crates/task-impls/src/helpers.rs +++ b/crates/task-impls/src/helpers.rs @@ -30,7 +30,7 @@ use hotshot_types::{ utils::{Terminator, View, ViewInner}, vote::{Certificate, HasViewNumber}, }; -use tokio::{task::JoinHandle, time::timeout}; +use tokio::time::timeout; use tracing::instrument; use utils::anytrace::*; @@ -640,11 +640,6 @@ pub(crate) async fn validate_proposal_view_and_certs< Ok(()) } -/// Cancel a task -pub async fn cancel_task(task: JoinHandle) { - task.abort(); -} - /// Helper function to send events and log errors pub async fn broadcast_event(event: E, sender: &Sender) { match sender.broadcast_direct(event).await { diff --git a/crates/task-impls/src/network.rs b/crates/task-impls/src/network.rs index 4ffab75b68..22ce352a35 100644 --- a/crates/task-impls/src/network.rs +++ b/crates/task-impls/src/network.rs @@ -12,7 +12,6 @@ use std::{ use async_broadcast::{Receiver, Sender}; use async_lock::RwLock; use async_trait::async_trait; -use futures::future::join_all; use hotshot_task::task::TaskState; use hotshot_types::{ consensus::Consensus, @@ -39,7 +38,7 @@ use utils::anytrace::*; use crate::{ events::{HotShotEvent, HotShotTaskCompleted}, - helpers::{broadcast_event, cancel_task}, + helpers::broadcast_event, }; /// the network message task state @@ -232,7 +231,7 @@ impl< Ok(()) } - async fn cancel_subtasks(&mut self) {} + fn cancel_subtasks(&mut self) {} } impl< @@ -340,13 +339,14 @@ impl< /// Cancel all tasks for previous views pub fn cancel_tasks(&mut self, view: TYPES::View) { let keep = self.transmit_tasks.split_off(&view); - let mut cancel = Vec::new(); + while let Some((_, tasks)) = self.transmit_tasks.pop_first() { - let mut to_cancel = tasks.into_iter().map(cancel_task).collect(); - cancel.append(&mut to_cancel); + for task in tasks { + task.abort(); + } } + self.transmit_tasks = keep; - spawn(async move { join_all(cancel).await }); } /// Parses a `HotShotEvent` and returns a tuple of: (sender's public key, `MessageKind`, `TransmitType`) @@ -801,7 +801,7 @@ pub mod test { Ok(()) } - async fn cancel_subtasks(&mut self) {} + fn cancel_subtasks(&mut self) {} } impl< diff --git a/crates/task-impls/src/quorum_proposal/mod.rs b/crates/task-impls/src/quorum_proposal/mod.rs index 1cc857f335..66f149abcc 100644 --- a/crates/task-impls/src/quorum_proposal/mod.rs +++ b/crates/task-impls/src/quorum_proposal/mod.rs @@ -10,7 +10,6 @@ use async_broadcast::{Receiver, Sender}; use async_lock::RwLock; use async_trait::async_trait; use either::Either; -use futures::future::join_all; use hotshot_task::{ dependency::{AndDependency, EventDependency, OrDependency}, dependency_task::DependencyTask, @@ -34,10 +33,7 @@ use tracing::instrument; use utils::anytrace::*; use self::handlers::{ProposalDependency, ProposalDependencyHandle}; -use crate::{ - events::HotShotEvent, - helpers::{broadcast_event, cancel_task}, -}; +use crate::{events::HotShotEvent, helpers::broadcast_event}; mod handlers; @@ -350,7 +346,7 @@ impl, V: Versions> for view in (*self.latest_proposed_view + 1)..=(*new_view) { if let Some(dependency) = self.proposal_dependencies.remove(&TYPES::View::new(view)) { - cancel_task(dependency).await; + dependency.abort(); } } @@ -527,21 +523,20 @@ impl, V: Versions> )?; } HotShotEvent::ViewChange(view) | HotShotEvent::Timeout(view) => { - self.cancel_tasks(*view).await; + self.cancel_tasks(*view); } _ => {} } Ok(()) } + /// Cancel all tasks the consensus tasks has spawned before the given view - pub async fn cancel_tasks(&mut self, view: TYPES::View) { + pub fn cancel_tasks(&mut self, view: TYPES::View) { let keep = self.proposal_dependencies.split_off(&view); - let mut cancel = Vec::new(); while let Some((_, task)) = self.proposal_dependencies.pop_first() { - cancel.push(cancel_task(task)); + task.abort(); } self.proposal_dependencies = keep; - join_all(cancel).await; } } @@ -560,7 +555,7 @@ impl, V: Versions> TaskState self.handle(event, receiver.clone(), sender.clone()).await } - async fn cancel_subtasks(&mut self) { + fn cancel_subtasks(&mut self) { while let Some((_, handle)) = self.proposal_dependencies.pop_first() { handle.abort(); } diff --git a/crates/task-impls/src/quorum_proposal_recv/mod.rs b/crates/task-impls/src/quorum_proposal_recv/mod.rs index f228edbc87..fc729596f0 100644 --- a/crates/task-impls/src/quorum_proposal_recv/mod.rs +++ b/crates/task-impls/src/quorum_proposal_recv/mod.rs @@ -33,7 +33,7 @@ use vbs::version::Version; use self::handlers::handle_quorum_proposal_recv; use crate::{ events::{HotShotEvent, ProposalMissing}, - helpers::{broadcast_event, cancel_task, parent_leaf_and_state}, + helpers::{broadcast_event, parent_leaf_and_state}, }; /// Event handlers for this task. mod handlers; @@ -108,13 +108,12 @@ impl, V: Versions> /// Cancel all tasks the consensus tasks has spawned before the given view pub fn cancel_tasks(&mut self, view: TYPES::View) { let keep = self.spawned_tasks.split_off(&view); - let mut cancel = Vec::new(); while let Some((_, tasks)) = self.spawned_tasks.pop_first() { - let mut to_cancel = tasks.into_iter().map(cancel_task).collect(); - cancel.append(&mut to_cancel); + for task in tasks { + task.abort(); + } } self.spawned_tasks = keep; - tokio::spawn(async move { join_all(cancel).await }); } /// Handles all consensus events relating to propose and vote-enabling events. @@ -192,7 +191,7 @@ impl, V: Versions> TaskState Ok(()) } - async fn cancel_subtasks(&mut self) { + fn cancel_subtasks(&mut self) { while !self.spawned_tasks.is_empty() { let Some((_, handles)) = self.spawned_tasks.pop_first() else { break; diff --git a/crates/task-impls/src/quorum_vote/mod.rs b/crates/task-impls/src/quorum_vote/mod.rs index 563fa6fa41..90a8ea44a5 100644 --- a/crates/task-impls/src/quorum_vote/mod.rs +++ b/crates/task-impls/src/quorum_vote/mod.rs @@ -37,7 +37,7 @@ use vbs::version::StaticVersionType; use crate::{ events::HotShotEvent, - helpers::{broadcast_event, cancel_task}, + helpers::broadcast_event, quorum_vote::handlers::{handle_quorum_proposal_validated, submit_vote, update_shared_state}, }; @@ -395,7 +395,7 @@ impl, V: Versions> QuorumVoteTaskS // Cancel the old dependency tasks. for view in *self.latest_voted_view..(*new_view) { if let Some(dependency) = self.vote_dependencies.remove(&TYPES::View::new(view)) { - cancel_task(dependency).await; + dependency.abort(); tracing::debug!("Vote dependency removed for view {:?}", view); } } @@ -578,7 +578,7 @@ impl, V: Versions> QuorumVoteTaskS // cancel old tasks let current_tasks = self.vote_dependencies.split_off(&view); while let Some((_, task)) = self.vote_dependencies.pop_last() { - cancel_task(task).await; + task.abort(); } self.vote_dependencies = current_tasks; } @@ -587,7 +587,7 @@ impl, V: Versions> QuorumVoteTaskS // cancel old tasks let current_tasks = self.vote_dependencies.split_off(&view); while let Some((_, task)) = self.vote_dependencies.pop_last() { - cancel_task(task).await; + task.abort(); } self.vote_dependencies = current_tasks; } @@ -720,7 +720,7 @@ impl, V: Versions> TaskState self.handle(event, receiver.clone(), sender.clone()).await } - async fn cancel_subtasks(&mut self) { + fn cancel_subtasks(&mut self) { while let Some((_, handle)) = self.vote_dependencies.pop_last() { handle.abort(); } diff --git a/crates/task-impls/src/request.rs b/crates/task-impls/src/request.rs index 6674ad3e2b..23646c447e 100644 --- a/crates/task-impls/src/request.rs +++ b/crates/task-impls/src/request.rs @@ -75,7 +75,7 @@ pub struct NetworkRequestState> { impl> Drop for NetworkRequestState { fn drop(&mut self) { - futures::executor::block_on(async move { self.cancel_subtasks().await }); + self.cancel_subtasks(); } } @@ -123,7 +123,7 @@ impl> TaskState for NetworkRequest } } - async fn cancel_subtasks(&mut self) { + fn cancel_subtasks(&mut self) { self.shutdown_flag.store(true, Ordering::Relaxed); while !self.spawned_tasks.is_empty() { diff --git a/crates/task-impls/src/rewind.rs b/crates/task-impls/src/rewind.rs index 9ae424b62b..4f62359aeb 100644 --- a/crates/task-impls/src/rewind.rs +++ b/crates/task-impls/src/rewind.rs @@ -45,7 +45,7 @@ impl TaskState for RewindTaskState { Ok(()) } - async fn cancel_subtasks(&mut self) { + fn cancel_subtasks(&mut self) { tracing::info!("Node ID {} Recording {} events", self.id, self.events.len()); let filename = format!("rewind_{}.log", self.id); let mut file = match OpenOptions::new() diff --git a/crates/task-impls/src/transactions.rs b/crates/task-impls/src/transactions.rs index 7d2090f9d5..8e84aa28b5 100644 --- a/crates/task-impls/src/transactions.rs +++ b/crates/task-impls/src/transactions.rs @@ -822,5 +822,5 @@ impl, V: Versions> TaskState self.handle(event, sender.clone()).await } - async fn cancel_subtasks(&mut self) {} + fn cancel_subtasks(&mut self) {} } diff --git a/crates/task-impls/src/upgrade.rs b/crates/task-impls/src/upgrade.rs index 56107613cd..1eb60cf98f 100644 --- a/crates/task-impls/src/upgrade.rs +++ b/crates/task-impls/src/upgrade.rs @@ -336,5 +336,5 @@ impl, V: Versions> TaskState Ok(()) } - async fn cancel_subtasks(&mut self) {} + fn cancel_subtasks(&mut self) {} } diff --git a/crates/task-impls/src/vid.rs b/crates/task-impls/src/vid.rs index 68afabf6d0..3f4e6cda37 100644 --- a/crates/task-impls/src/vid.rs +++ b/crates/task-impls/src/vid.rs @@ -181,5 +181,5 @@ impl> TaskState for VidTaskState, V: Versions> TaskState self.handle(event, sender.clone()).await } - async fn cancel_subtasks(&mut self) {} + fn cancel_subtasks(&mut self) {} } /// State of a view sync replica task @@ -197,7 +197,7 @@ impl, V: Versions> TaskState Ok(()) } - async fn cancel_subtasks(&mut self) {} + fn cancel_subtasks(&mut self) {} } impl, V: Versions> ViewSyncTaskState { @@ -572,7 +572,7 @@ impl, V: Versions> } if let Some(timeout_task) = self.timeout_task.take() { - cancel_task(timeout_task).await; + timeout_task.abort(); } self.timeout_task = Some(spawn({ @@ -665,7 +665,7 @@ impl, V: Versions> .await; if let Some(timeout_task) = self.timeout_task.take() { - cancel_task(timeout_task).await; + timeout_task.abort(); } self.timeout_task = Some(spawn({ let stream = event_stream.clone(); @@ -721,7 +721,7 @@ impl, V: Versions> } if let Some(timeout_task) = self.timeout_task.take() { - cancel_task(timeout_task).await; + timeout_task.abort(); } broadcast_event( @@ -792,7 +792,7 @@ impl, V: Versions> // Shouldn't ever receive a timeout for a relay higher than ours if TYPES::View::new(*round) == self.next_view && *relay == self.relay { if let Some(timeout_task) = self.timeout_task.take() { - cancel_task(timeout_task).await; + timeout_task.abort(); } self.relay += 1; match last_seen_certificate { diff --git a/crates/task/src/task.rs b/crates/task/src/task.rs index 70367b0cef..2b4784d00c 100644 --- a/crates/task/src/task.rs +++ b/crates/task/src/task.rs @@ -28,7 +28,7 @@ pub trait TaskState: Send { type Event: TaskEvent + Clone + Send + Sync; /// Joins all subtasks. - async fn cancel_subtasks(&mut self); + fn cancel_subtasks(&mut self); /// Handles an event, providing direct access to the specific channel we received the event on. async fn handle_event( @@ -77,7 +77,7 @@ impl Task { match self.receiver.recv_direct().await { Ok(input) => { if *input == S::Event::shutdown_event() { - self.state.cancel_subtasks().await; + self.state.cancel_subtasks(); break self.boxed_state(); } @@ -129,7 +129,7 @@ impl ConsensusTaskRegistry { while let Some(handle) = handles.pop() { let mut task_state = handle.await.unwrap(); - task_state.cancel_subtasks().await; + task_state.cancel_subtasks(); } } /// Take a task, run it, and register it