From 00082dd3a9db56a8328f7e802dea278de6bbdb3c Mon Sep 17 00:00:00 2001 From: Lukasz Rzasik Date: Fri, 28 Feb 2025 13:10:52 +0100 Subject: [PATCH 1/3] wait_for_next_epoch_qc returns Result instead of Option --- hotshot-task-impls/src/consensus/handlers.rs | 9 +++--- hotshot-task-impls/src/consensus/mod.rs | 11 +++---- hotshot-task-impls/src/helpers.rs | 18 ++++++----- .../src/quorum_proposal/handlers.rs | 31 +++++++++++-------- 4 files changed, 38 insertions(+), 31 deletions(-) diff --git a/hotshot-task-impls/src/consensus/handlers.rs b/hotshot-task-impls/src/consensus/handlers.rs index 64b8e19960..45e4769259 100644 --- a/hotshot-task-impls/src/consensus/handlers.rs +++ b/hotshot-task-impls/src/consensus/handlers.rs @@ -173,7 +173,7 @@ pub async fn send_high_qc, V: Versions> ConsensusTaskSt tracing::debug!("We formed QC but not eQC. Do nothing"); return Ok(()); } - if wait_for_next_epoch_qc( + wait_for_next_epoch_qc( quorum_cert, &self.consensus, self.timeout, @@ -163,11 +163,10 @@ impl, V: Versions> ConsensusTaskSt &receiver, ) .await - .is_none() - { - tracing::warn!("We formed eQC but we don't have corresponding next epoch eQC."); - return Ok(()); - } + .context(warn!( + "We formed eQC but we don't have corresponding next epoch eQC." + ))?; + let cert_block_number = self .consensus .read() diff --git a/hotshot-task-impls/src/helpers.rs b/hotshot-task-impls/src/helpers.rs index 4b78a68922..314c5c5e93 100644 --- a/hotshot-task-impls/src/helpers.rs +++ b/hotshot-task-impls/src/helpers.rs @@ -873,12 +873,12 @@ pub async fn wait_for_next_epoch_qc( timeout: u64, view_start_time: Instant, receiver: &Receiver>>, -) -> Option> { +) -> Result> { tracing::debug!("getting the next epoch QC"); if let Some(next_epoch_qc) = consensus.read().await.next_epoch_high_qc() { if next_epoch_qc.data.leaf_commit == high_qc.data.leaf_commit { // We have it already, no reason to wait - return Some(next_epoch_qc.clone()); + return Ok(next_epoch_qc.clone()); } }; @@ -887,11 +887,13 @@ pub async fn wait_for_next_epoch_qc( // TODO configure timeout let Some(time_spent) = Instant::now().checked_duration_since(view_start_time) else { // Shouldn't be possible, now must be after the start - return None; + return Err(warn!( + "Now is earlier than the view start time. Shouldn't be possible." + )); }; let Some(time_left) = wait_duration.checked_sub(time_spent) else { // No time left - return None; + return Err(warn!("Run out of time waiting for the next epoch QC.")); }; let receiver = receiver.clone(); let Ok(Some(event)) = tokio::time::timeout(time_left, async move { @@ -915,16 +917,16 @@ pub async fn wait_for_next_epoch_qc( // Check again, there is a chance we missed it if let Some(next_epoch_qc) = consensus.read().await.next_epoch_high_qc() { if next_epoch_qc.data.leaf_commit == high_qc.data.leaf_commit { - return Some(next_epoch_qc.clone()); + return Ok(next_epoch_qc.clone()); } }; - return None; + return Err(warn!("Error while waiting for the next epoch QC.")); }; let HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) = event.as_ref() else { // this shouldn't happen - return None; + return Err(warn!("Received event is not NextEpochQc2Formed but we checked it earlier. Shouldn't be possible.")); }; - Some(next_epoch_qc.clone()) + Ok(next_epoch_qc.clone()) } /// Validates qc's signatures and, if provided, validates next_epoch_qc's signatures and whether it diff --git a/hotshot-task-impls/src/quorum_proposal/handlers.rs b/hotshot-task-impls/src/quorum_proposal/handlers.rs index 4ac06ff2f3..0d08ea1b6f 100644 --- a/hotshot-task-impls/src/quorum_proposal/handlers.rs +++ b/hotshot-task-impls/src/quorum_proposal/handlers.rs @@ -13,6 +13,11 @@ use std::{ time::{Duration, Instant}, }; +use crate::{ + events::HotShotEvent, + helpers::{broadcast_event, parent_leaf_and_state, wait_for_next_epoch_qc}, + quorum_proposal::{UpgradeLock, Versions}, +}; use anyhow::{ensure, Context, Result}; use async_broadcast::{Receiver, Sender}; use async_lock::RwLock; @@ -34,12 +39,6 @@ use hotshot_utils::anytrace::*; use tracing::instrument; use vbs::version::StaticVersionType; -use crate::{ - events::HotShotEvent, - helpers::{broadcast_event, parent_leaf_and_state, wait_for_next_epoch_qc}, - quorum_proposal::{UpgradeLock, Versions}, -}; - /// Proposal dependency types. These types represent events that precipitate a proposal. #[derive(PartialEq, Debug)] pub(crate) enum ProposalDependency { @@ -329,14 +328,20 @@ impl ProposalDependencyHandle { let next_epoch_qc = if self.upgrade_lock.epochs_enabled(self.view_number).await && is_high_qc_for_last_block { - wait_for_next_epoch_qc( - &parent_qc, - &self.consensus, - self.timeout, - self.view_start_time, - &self.receiver, + Some( + wait_for_next_epoch_qc( + &parent_qc, + &self.consensus, + self.timeout, + self.view_start_time, + &self.receiver, + ) + .await + .context( + "Jusify QC on our proposal is for the last block in the epoch \ + but we don't have the corresponding next epoch QC. Do not propose.", + )?, ) - .await } else { None }; From c31f7fcca05cc77f3f5248d195075d420bc4102b Mon Sep 17 00:00:00 2001 From: Lukasz Rzasik Date: Tue, 4 Mar 2025 12:42:54 +0100 Subject: [PATCH 2/3] Add double VID check --- hotshot-task-impls/src/da.rs | 9 +- hotshot-task-impls/src/helpers.rs | 123 ++++++++++++++++- .../src/quorum_vote/handlers.rs | 1 + hotshot-task-impls/src/quorum_vote/mod.rs | 126 +++++++++++++++--- hotshot-task-impls/src/request.rs | 4 +- hotshot-task-impls/src/response.rs | 11 +- hotshot-testing/src/spinning_task.rs | 8 +- hotshot-types/src/consensus.rs | 28 ++-- hotshot/src/tasks/task_state.rs | 1 + 9 files changed, 271 insertions(+), 40 deletions(-) diff --git a/hotshot-task-impls/src/da.rs b/hotshot-task-impls/src/da.rs index 9ce863d193..ccd5bc8154 100644 --- a/hotshot-task-impls/src/da.rs +++ b/hotshot-task-impls/src/da.rs @@ -292,12 +292,17 @@ impl, V: Versions> DaTaskState. +use crate::{events::HotShotEvent, quorum_proposal_recv::ValidationInfo, request::REQUEST_TIMEOUT}; use async_broadcast::{Receiver, SendError, Sender}; use async_lock::RwLock; use committable::{Commitment, Committable}; use either::Either; use hotshot_task::dependency::{Dependency, EventDependency}; +use hotshot_types::data::VidDisperseShare; +use hotshot_types::simple_certificate::DaCertificate2; use hotshot_types::{ consensus::OuterConsensus, data::{Leaf2, QuorumProposalWrapper, ViewChangeEvidence2}, @@ -32,15 +35,13 @@ use hotshot_types::{ }; use hotshot_utils::anytrace::*; use std::{ - collections::{HashMap, HashSet}, + collections::HashSet, sync::Arc, time::{Duration, Instant}, }; use tokio::time::timeout; use tracing::instrument; -use crate::{events::HotShotEvent, quorum_proposal_recv::ValidationInfo, request::REQUEST_TIMEOUT}; - /// Trigger a request to the network for a proposal for a view and wait for the response or timeout. #[instrument(skip_all)] #[allow(clippy::too_many_arguments)] @@ -383,6 +384,7 @@ pub async fn decide_from_proposal( public_key: &TYPES::SignatureKey, with_epochs: bool, membership: &Arc>, + epoch_height: u64, ) -> LeafChainTraversalOutcome { let consensus_reader = consensus.read().await; let existing_upgrade_cert_reader = existing_upgrade_cert.read().await; @@ -462,10 +464,13 @@ pub async fn decide_from_proposal( let vid_share = consensus_reader .vid_shares() .get(&leaf.view_number()) - .unwrap_or(&HashMap::new()) - .get(public_key) - .cloned() - .map(|prop| prop.data); + .and_then(|key_map| key_map.get(public_key)) + .and_then(|epoch_map| { + epoch_map + .get(&leaf.epoch(epoch_height)) + .or_else(|| epoch_map.get(&leaf.epoch(epoch_height).map(|e| e + 1))) + }) + .map(|prop| prop.data.clone()); // Add our data into a new `LeafInfo` res.leaf_views.push(LeafInfo::new( @@ -983,3 +988,107 @@ pub async fn validate_qc_and_next_epoch_qc( } Ok(()) } + +/// Gets the second VID share, the current or the next epoch accordingly, from the shared consensus state; +/// makes sure it corresponds to the given DA certificate; +/// if it's not yet available, waits for it with the given timeout. +pub async fn wait_for_second_vid_share( + vid_share: &Proposal>, + da_cert: &DaCertificate2, + consensus: &OuterConsensus, + timeout: u64, + view_start_time: Instant, + receiver: &Receiver>>, +) -> Result>> { + // If the VID share that we already have is for the current epoch, get the next epoch VID. + // And the other way around. + let target_epoch = if vid_share.data.epoch() == vid_share.data.target_epoch() { + vid_share.data.target_epoch().map(|e| e + 1) + } else { + vid_share.data.target_epoch().map(|e| e - 1) + }; + tracing::debug!("getting the second VID share for epoch {:?}", target_epoch); + let maybe_second_vid_share = consensus + .read() + .await + .vid_shares() + .get(&vid_share.data.view_number()) + .and_then(|key_map| key_map.get(vid_share.data.recipient_key())) + .and_then(|epoch_map| epoch_map.get(&target_epoch)) + .cloned(); + if let Some(second_vid_share) = maybe_second_vid_share { + if (target_epoch == da_cert.epoch() + && second_vid_share.data.payload_commitment() == da_cert.data().payload_commit) + || (target_epoch != da_cert.epoch() + && Some(second_vid_share.data.payload_commitment()) + == da_cert.data().next_epoch_payload_commit) + { + return Ok(second_vid_share); + } + } + + let wait_duration = Duration::from_millis(timeout / 2); + + // TODO configure timeout + let Some(time_spent) = Instant::now().checked_duration_since(view_start_time) else { + // Shouldn't be possible, now must be after the start + return Err(warn!( + "Now is earlier than the view start time. Shouldn't be possible." + )); + }; + let Some(time_left) = wait_duration.checked_sub(time_spent) else { + // No time left + return Err(warn!("Run out of time waiting for the second VID share.")); + }; + let receiver = receiver.clone(); + let Ok(Some(event)) = tokio::time::timeout(time_left, async move { + let da_cert_clone = da_cert.clone(); + EventDependency::new( + receiver, + Box::new(move |event| { + let event = event.as_ref(); + if let HotShotEvent::VidShareValidated(second_vid_share) = event { + if target_epoch == da_cert_clone.epoch() { + second_vid_share.data.payload_commitment() + == da_cert_clone.data().payload_commit + } else { + Some(second_vid_share.data.payload_commitment()) + == da_cert_clone.data().next_epoch_payload_commit + } + } else { + false + } + }), + ) + .completed() + .await + }) + .await + else { + // Check again, there is a chance we missed it + let maybe_second_vid_share = consensus + .read() + .await + .vid_shares() + .get(&vid_share.data.view_number()) + .and_then(|key_map| key_map.get(vid_share.data.recipient_key())) + .and_then(|epoch_map| epoch_map.get(&target_epoch)) + .cloned(); + if let Some(second_vid_share) = maybe_second_vid_share { + if (target_epoch == da_cert.epoch() + && second_vid_share.data.payload_commitment() == da_cert.data().payload_commit) + || (target_epoch != da_cert.epoch() + && Some(second_vid_share.data.payload_commitment()) + == da_cert.data().next_epoch_payload_commit) + { + return Ok(second_vid_share); + } + } + return Err(warn!("Error while waiting for the second VID share.")); + }; + let HotShotEvent::VidShareValidated(second_vid_share) = event.as_ref() else { + // this shouldn't happen + return Err(warn!("Received event is not VidShareValidated but we checked it earlier. Shouldn't be possible.")); + }; + Ok(second_vid_share.clone()) +} diff --git a/hotshot-task-impls/src/quorum_vote/handlers.rs b/hotshot-task-impls/src/quorum_vote/handlers.rs index 7f1937c253..00bd0cae75 100644 --- a/hotshot-task-impls/src/quorum_vote/handlers.rs +++ b/hotshot-task-impls/src/quorum_vote/handlers.rs @@ -386,6 +386,7 @@ pub(crate) async fn handle_quorum_proposal_validated< &task_state.public_key, version >= V::Epochs::VERSION, &task_state.membership, + task_state.epoch_height, ) .await }; diff --git a/hotshot-task-impls/src/quorum_vote/mod.rs b/hotshot-task-impls/src/quorum_vote/mod.rs index 5493a166e7..d593beaded 100644 --- a/hotshot-task-impls/src/quorum_vote/mod.rs +++ b/hotshot-task-impls/src/quorum_vote/mod.rs @@ -4,8 +4,11 @@ // You should have received a copy of the MIT License // along with the HotShot repository. If not, see . -use std::{collections::BTreeMap, sync::Arc}; - +use crate::{ + events::HotShotEvent, + helpers::{broadcast_event, wait_for_second_vid_share}, + quorum_vote::handlers::{handle_quorum_proposal_validated, submit_vote, update_shared_state}, +}; use async_broadcast::{InactiveReceiver, Receiver, Sender}; use async_lock::RwLock; use async_trait::async_trait; @@ -15,6 +18,7 @@ use hotshot_task::{ dependency_task::{DependencyTask, HandleDepOutput}, task::TaskState, }; +use hotshot_types::utils::is_last_block_in_epoch; use hotshot_types::{ consensus::{ConsensusMetricsValue, OuterConsensus}, data::{Leaf2, QuorumProposalWrapper}, @@ -33,16 +37,12 @@ use hotshot_types::{ vote::{Certificate, HasViewNumber}, }; use hotshot_utils::anytrace::*; +use std::time::Instant; +use std::{collections::BTreeMap, sync::Arc}; use tokio::task::JoinHandle; use tracing::instrument; use vbs::version::StaticVersionType; -use crate::{ - events::HotShotEvent, - helpers::broadcast_event, - quorum_vote::handlers::{handle_quorum_proposal_validated, submit_vote, update_shared_state}, -}; - /// Event handlers for `QuorumProposalValidated`. mod handlers; @@ -97,6 +97,12 @@ pub struct VoteDependencyHandle, V /// Number of blocks in an epoch, zero means there are no epochs pub epoch_height: u64, + + /// View timeout from config. + pub timeout: u64, + + /// The time this view started + pub view_start_time: Instant, } impl + 'static, V: Versions> HandleDepOutput @@ -111,6 +117,7 @@ impl + 'static, V: Versions> Handl let mut next_epoch_payload_commitment = None; let mut leaf = None; let mut vid_share = None; + let mut da_cert = None; let mut parent_view_number = None; for event in res { match event.as_ref() { @@ -185,6 +192,7 @@ impl + 'static, V: Versions> Handl } else { next_epoch_payload_commitment = next_epoch_cert_payload_comm; } + da_cert = Some(cert.clone()); } HotShotEvent::VidShareValidated(share) => { let vid_payload_commitment = &share.data.payload_commitment(); @@ -230,6 +238,52 @@ impl + 'static, V: Versions> Handl return; }; + let Some(da_cert) = da_cert else { + tracing::error!( + "We don't have the DA cert for this view {:?}, but we should, because the vote dependencies have completed.", + self.view_number + ); + return; + }; + + // If this is the last block in the epoch, we might need two VID shares. + if self.upgrade_lock.epochs_enabled(leaf.view_number()).await + && is_last_block_in_epoch(leaf.block_header().block_number(), self.epoch_height) + { + let current_epoch = option_epoch_from_block_number::( + leaf.with_epoch, + leaf.block_header().block_number(), + self.epoch_height, + ); + + let membership_reader = self.membership.read().await; + let committee_member_in_current_epoch = + membership_reader.has_stake(&self.public_key, current_epoch); + let committee_member_in_next_epoch = + membership_reader.has_stake(&self.public_key, current_epoch.map(|e| e + 1)); + drop(membership_reader); + + // If we belong to both epochs, we require VID shares from both epochs. + if committee_member_in_current_epoch && committee_member_in_next_epoch { + if let Err(e) = wait_for_second_vid_share( + &vid_share, + &da_cert, + &self.consensus, + self.timeout, + self.view_start_time, + &self.receiver.activate_cloned(), + ) + .await + { + tracing::warn!( + "This is the last block in epoch, we are in both epochs \ + but we received only one VID share. Do not vote! Error: {e:?}" + ); + return; + } + } + } + // Update internal state if let Err(e) = update_shared_state::( OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)), @@ -337,6 +391,9 @@ pub struct QuorumVoteTaskState, V: /// Number of blocks in an epoch, zero means there are no epochs pub epoch_height: u64, + + /// View timeout from config. + pub timeout: u64, } impl, V: Versions> QuorumVoteTaskState { @@ -442,6 +499,8 @@ impl, V: Versions> QuorumVoteTaskS id: self.id, epoch_height: self.epoch_height, consensus_metrics: Arc::clone(&self.consensus_metrics), + timeout: self.timeout, + view_start_time: Instant::now(), }, ); self.vote_dependencies @@ -685,24 +744,57 @@ impl, V: Versions> QuorumVoteTaskS "Reached end of epoch. Proposed leaf has the same height and payload as its parent." ); + let current_epoch = proposal.data.epoch(); + let next_epoch = proposal.data.epoch().map(|e| e + 1); + + let membership_reader = self.membership.read().await; + let committee_member_in_current_epoch = + membership_reader.has_stake(&self.public_key, current_epoch); + let committee_member_in_next_epoch = + membership_reader.has_stake(&self.public_key, next_epoch); + drop(membership_reader); + let mut consensus_writer = self.consensus.write().await; - let vid_shares = consensus_writer + let key_map = consensus_writer .vid_shares() .get(&parent_leaf.view_number()) .context(warn!( "Proposed leaf is the same as its parent but we don't have our VID for it" ))?; - let vid = vid_shares.get(&self.public_key).context(warn!( + let epoch_map = key_map.get(&self.public_key).cloned().context(warn!( "Proposed leaf is the same as its parent but we don't have our VID for it" ))?; - let mut updated_vid = vid.clone(); - updated_vid - .data - .set_view_number(proposal.data.view_number()); - consensus_writer.update_vid_shares(updated_vid.data.view_number(), updated_vid.clone()); + if committee_member_in_current_epoch { + ensure!( + epoch_map.contains_key(¤t_epoch), + warn!( + "We belong to the current epoch but we don't have the corresponding VID share." + ) + ) + } + + if committee_member_in_next_epoch { + ensure!( + epoch_map.contains_key(&next_epoch), + warn!("We belong to the next epoch but we don't have the corresponding VID share.") + ) + } + + for (_, vid) in epoch_map.iter() { + let mut updated_vid = vid.clone(); + updated_vid + .data + .set_view_number(proposal.data.view_number()); + consensus_writer.update_vid_shares(updated_vid.data.view_number(), updated_vid.clone()); + } + + let Some((_, vid_share)) = epoch_map.first_key_value() else { + bail!(warn!("We don't have our VID share but we just check that we have. This shouldn't happen.")); + }; + let vid_share = vid_share.clone(); drop(consensus_writer); @@ -734,7 +826,7 @@ impl, V: Versions> QuorumVoteTaskS Arc::clone(&self.instance_state), Arc::clone(&self.storage), &proposed_leaf, - &updated_vid, + &vid_share, Some(parent_leaf.view_number()), self.epoch_height, ) @@ -780,7 +872,7 @@ impl, V: Versions> QuorumVoteTaskS proposal.data.view_number(), Arc::clone(&self.storage), proposed_leaf, - updated_vid, + vid_share, is_vote_leaf_extended, self.epoch_height, ) diff --git a/hotshot-task-impls/src/request.rs b/hotshot-task-impls/src/request.rs index 02d10a284d..53fc580a17 100644 --- a/hotshot-task-impls/src/request.rs +++ b/hotshot-task-impls/src/request.rs @@ -375,7 +375,9 @@ impl> NetworkRequestState *view; diff --git a/hotshot-task-impls/src/response.rs b/hotshot-task-impls/src/response.rs index f0000a7c14..203bb004a6 100644 --- a/hotshot-task-impls/src/response.rs +++ b/hotshot-task-impls/src/response.rs @@ -163,9 +163,11 @@ impl NetworkResponseState { key: &TYPES::SignatureKey, ) -> Option>> { let consensus_reader = self.consensus.read().await; - if let Some(view) = consensus_reader.vid_shares().get(&view) { - if let Some(share) = view.get(key) { - return Some(share.clone()); + if let Some(key_map) = consensus_reader.vid_shares().get(&view) { + if let Some(epoch_map) = key_map.get(key) { + if let Some(share) = epoch_map.get(&target_epoch) { + return Some(share.clone()); + } } } @@ -200,7 +202,8 @@ impl NetworkResponseState { .await .vid_shares() .get(&view)? - .get(key) + .get(key)? + .get(&target_epoch) .cloned(); } diff --git a/hotshot-testing/src/spinning_task.rs b/hotshot-testing/src/spinning_task.rs index 8f54eaee0a..3694f7604e 100644 --- a/hotshot-testing/src/spinning_task.rs +++ b/hotshot-testing/src/spinning_task.rs @@ -258,7 +258,13 @@ where for (view, hash_map) in read_storage.vids_cloned().await { let mut converted_hash_map = HashMap::new(); for (key, proposal) in hash_map { - converted_hash_map.insert(key, convert_proposal(proposal)); + converted_hash_map + .entry(key) + .or_insert_with(BTreeMap::new) + .insert( + proposal.data.target_epoch, + convert_proposal(proposal), + ); } vid_shares.insert(view, converted_hash_map); } diff --git a/hotshot-types/src/consensus.rs b/hotshot-types/src/consensus.rs index c775880206..09f96319c4 100644 --- a/hotshot-types/src/consensus.rs +++ b/hotshot-types/src/consensus.rs @@ -6,19 +6,19 @@ //! Provides the core consensus types +use async_lock::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard}; +use committable::{Commitment, Committable}; +use hotshot_utils::anytrace::*; use std::{ collections::{BTreeMap, HashMap}, mem::ManuallyDrop, ops::{Deref, DerefMut}, sync::Arc, }; - -use async_lock::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard}; -use committable::{Commitment, Committable}; -use hotshot_utils::anytrace::*; use tracing::instrument; use vec1::Vec1; +use crate::simple_vote::HasEpoch; pub use crate::utils::{View, ViewInner}; use crate::{ data::{Leaf2, QuorumProposalWrapper, VidCommitment, VidDisperse, VidDisperseShare}, @@ -44,10 +44,13 @@ use crate::{ /// A type alias for `HashMap, T>` pub type CommitmentMap = HashMap, T>; -/// A type alias for `BTreeMap>>>` +/// A type alias for `BTreeMap>>>>` pub type VidShares = BTreeMap< ::View, - HashMap<::SignatureKey, Proposal>>, + HashMap< + ::SignatureKey, + BTreeMap::Epoch>, Proposal>>, + >, >; /// Type alias for consensus state wrapped in a lock. @@ -533,6 +536,7 @@ impl Consensus { public_key: &TYPES::SignatureKey, ) -> Option> { let parent_view_number = leaf.justify_qc().view_number(); + let parent_epoch = leaf.justify_qc().epoch(); let parent_leaf = self .saved_leaves .get(&leaf.justify_qc().data().leaf_commit)?; @@ -543,7 +547,13 @@ impl Consensus { let parent_vid = self .vid_shares() .get(&parent_view_number) - .and_then(|inner_map| inner_map.get(public_key).cloned()) + .and_then(|key_map| key_map.get(public_key).cloned()) + .and_then(|epoch_map| { + epoch_map + .get(&parent_epoch) + .or_else(|| epoch_map.get(&parent_epoch.map(|e| e + 1))) + .cloned() + }) .map(|prop| prop.data); Some(LeafInfo { @@ -801,7 +811,9 @@ impl Consensus { self.vid_shares .entry(view_number) .or_default() - .insert(disperse.data.recipient_key().clone(), disperse); + .entry(disperse.data.recipient_key().clone()) + .or_default() + .insert(disperse.data.target_epoch(), disperse); } /// Add a new entry to the da_certs map. diff --git a/hotshot/src/tasks/task_state.rs b/hotshot/src/tasks/task_state.rs index b0c1b9a11d..7c7ce46318 100644 --- a/hotshot/src/tasks/task_state.rs +++ b/hotshot/src/tasks/task_state.rs @@ -245,6 +245,7 @@ impl, V: Versions> CreateTaskState upgrade_lock: handle.hotshot.upgrade_lock.clone(), epoch_height: handle.hotshot.config.epoch_height, consensus_metrics, + timeout: handle.hotshot.config.next_view_timeout, } } } From bcf657403135be6a927eeedb5e84b165ced47345 Mon Sep 17 00:00:00 2001 From: Lukasz Rzasik Date: Wed, 5 Mar 2025 16:53:40 +0100 Subject: [PATCH 3/3] Split eQC formed handling to unblock Consensus task --- hotshot-task-impls/src/consensus/mod.rs | 68 ++++++++++++++++++++----- hotshot-task-impls/src/events.rs | 6 +++ sequencer/src/network/cdn.rs | 4 +- 3 files changed, 63 insertions(+), 15 deletions(-) diff --git a/hotshot-task-impls/src/consensus/mod.rs b/hotshot-task-impls/src/consensus/mod.rs index 510a8de1d8..b4bc5a62ab 100644 --- a/hotshot-task-impls/src/consensus/mod.rs +++ b/hotshot-task-impls/src/consensus/mod.rs @@ -30,8 +30,11 @@ use tracing::instrument; use self::handlers::{ handle_quorum_vote_recv, handle_timeout, handle_timeout_vote_recv, handle_view_change, }; -use crate::helpers::{validate_qc_and_next_epoch_qc, wait_for_next_epoch_qc}; -use crate::{events::HotShotEvent, helpers::broadcast_event, vote_collection::VoteCollectorsMap}; +use crate::{ + events::HotShotEvent, + helpers::{broadcast_event, validate_qc_and_next_epoch_qc}, + vote_collection::VoteCollectorsMap, +}; /// Event handlers for use in the `handle` method. mod handlers; @@ -155,24 +158,63 @@ impl, V: Versions> ConsensusTaskSt tracing::debug!("We formed QC but not eQC. Do nothing"); return Ok(()); } - wait_for_next_epoch_qc( - quorum_cert, - &self.consensus, - self.timeout, - self.view_start_time, - &receiver, + + let consensus_reader = self.consensus.read().await; + let Some(next_epoch_qc) = consensus_reader.next_epoch_high_qc() else { + tracing::debug!("We formed the current epoch eQC but we don't have the next epoch eQC at all."); + return Ok(()); + }; + if quorum_cert.view_number() != next_epoch_qc.view_number() + || quorum_cert.data != *next_epoch_qc.data + { + tracing::debug!("We formed the current epoch eQC but we don't have the corresponding next epoch eQC."); + return Ok(()); + } + drop(consensus_reader); + + broadcast_event( + Arc::new(HotShotEvent::ExtendedQc2Formed(quorum_cert.clone())), + &sender, ) - .await - .context(warn!( - "We formed eQC but we don't have corresponding next epoch eQC." - ))?; + .await; + } + HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) => { + let cert_view = next_epoch_qc.view_number(); + if !self.upgrade_lock.epochs_enabled(cert_view).await { + tracing::debug!("Next epoch QC2 formed but epochs not enabled. Do nothing"); + return Ok(()); + } + if !self + .consensus + .read() + .await + .is_leaf_extended(next_epoch_qc.data.leaf_commit) + { + tracing::debug!("We formed next epoch QC but not eQC. Do nothing"); + return Ok(()); + } + let consensus_reader = self.consensus.read().await; + let high_qc = consensus_reader.high_qc(); + if high_qc.view_number() != next_epoch_qc.view_number() + || high_qc.data != *next_epoch_qc.data + { + tracing::debug!("We formed the current epoch eQC but we don't have the corresponding next epoch eQC."); + return Ok(()); + } + let high_qc = high_qc.clone(); + drop(consensus_reader); + + broadcast_event(Arc::new(HotShotEvent::ExtendedQc2Formed(high_qc)), &sender).await; + } + HotShotEvent::ExtendedQc2Formed(eqc) => { + let cert_view = eqc.view_number(); let cert_block_number = self .consensus .read() .await .saved_leaves() - .get(&quorum_cert.data.leaf_commit) + .get(&eqc.data.leaf_commit) .context(error!( "Could not find the leaf for the eQC. It shouldn't happen." ))? diff --git a/hotshot-task-impls/src/events.rs b/hotshot-task-impls/src/events.rs index e34f856476..a1e828f27a 100644 --- a/hotshot-task-impls/src/events.rs +++ b/hotshot-task-impls/src/events.rs @@ -135,6 +135,8 @@ pub enum HotShotEvent { Qc2Formed(Either, TimeoutCertificate2>), /// The next leader has collected enough votes from the next epoch nodes to form a QC; emitted by the next leader in the consensus task; an internal event only NextEpochQc2Formed(Either, TimeoutCertificate>), + /// A validator formed both a current epoch eQC and a next epoch eQC + ExtendedQc2Formed(QuorumCertificate2), /// The DA leader has collected enough votes to form a DAC; emitted by the DA leader in the DA task; sent to the entire network via the networking task DacSend(DaCertificate2, TYPES::SignatureKey), /// The current view has changed; emitted by the replica in the consensus task or replica in the view sync task; received by almost all other tasks @@ -312,6 +314,7 @@ impl HotShotEvent { either::Left(qc) => Some(qc.view_number()), either::Right(tc) => Some(tc.view_number()), }, + HotShotEvent::ExtendedQc2Formed(cert) => Some(cert.view_number()), HotShotEvent::ViewSyncCommitVoteSend(vote) | HotShotEvent::ViewSyncCommitVoteRecv(vote) => Some(vote.view_number()), HotShotEvent::ViewSyncPreCommitVoteRecv(vote) @@ -447,6 +450,9 @@ impl Display for HotShotEvent { write!(f, "NextEpochQc2Formed(view_number={:?})", tc.view_number()) } }, + HotShotEvent::ExtendedQc2Formed(cert) => { + write!(f, "ExtendedQc2Formed(view_number={:?})", cert.view_number()) + } HotShotEvent::DacSend(cert, _) => { write!(f, "DacSend(view_number={:?})", cert.view_number()) } diff --git a/sequencer/src/network/cdn.rs b/sequencer/src/network/cdn.rs index adce13a758..7e0eef765a 100644 --- a/sequencer/src/network/cdn.rs +++ b/sequencer/src/network/cdn.rs @@ -80,7 +80,7 @@ impl SignatureScheme for WrappedSignatureKey { }; todo_by!( - "2025-3-4", + "2025-3-12", "Only accept the namespaced message once everyone has upgraded" ); public_key.0.validate(&signature, message) @@ -112,7 +112,7 @@ impl RunDef for ProductionDef { } todo_by!( - "2025-3-4", + "2025-3-12", "Remove this, switching to TCP+TLS singularly when everyone has updated" ); /// The user definition for the Push CDN.