Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validators in two epochs should verify that they got both VID shares #2705

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions hotshot-task-impls/src/consensus/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,17 +173,18 @@ pub async fn send_high_qc<TYPES: NodeType, V: Versions, I: NodeImplementation<TY
drop(consensus_reader);

if is_eqc {
let Some(next_epoch_high_qc) = wait_for_next_epoch_qc(
let next_epoch_high_qc = wait_for_next_epoch_qc(
&high_qc,
&task_state.consensus,
task_state.timeout,
task_state.view_start_time,
receiver,
)
.await
else {
bail!("We've seen an extended QC but we don't have a corresponding next epoch extended QC");
};
.context(warn!(
"We've seen an extended QC but we don't have a corresponding next epoch extended QC"
))?;

tracing::debug!(
"Broadcasting Extended QC for view {:?} and epoch {:?}, my id {:?}.",
high_qc.view_number(),
Expand Down
65 changes: 53 additions & 12 deletions hotshot-task-impls/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ use tracing::instrument;
use self::handlers::{
handle_quorum_vote_recv, handle_timeout, handle_timeout_vote_recv, handle_view_change,
};
use crate::helpers::{validate_qc_and_next_epoch_qc, wait_for_next_epoch_qc};
use crate::{events::HotShotEvent, helpers::broadcast_event, vote_collection::VoteCollectorsMap};
use crate::{
events::HotShotEvent,
helpers::{broadcast_event, validate_qc_and_next_epoch_qc},
vote_collection::VoteCollectorsMap,
};

/// Event handlers for use in the `handle` method.
mod handlers;
Expand Down Expand Up @@ -155,25 +158,63 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ConsensusTaskSt
tracing::debug!("We formed QC but not eQC. Do nothing");
return Ok(());
}
if wait_for_next_epoch_qc(
quorum_cert,
&self.consensus,
self.timeout,
self.view_start_time,
&receiver,

let consensus_reader = self.consensus.read().await;
let Some(next_epoch_qc) = consensus_reader.next_epoch_high_qc() else {
tracing::debug!("We formed the current epoch eQC but we don't have the next epoch eQC at all.");
return Ok(());
};
if quorum_cert.view_number() != next_epoch_qc.view_number()
|| quorum_cert.data != *next_epoch_qc.data
{
tracing::debug!("We formed the current epoch eQC but we don't have the corresponding next epoch eQC.");
return Ok(());
}
drop(consensus_reader);

broadcast_event(
Arc::new(HotShotEvent::ExtendedQc2Formed(quorum_cert.clone())),
&sender,
)
.await
.is_none()
.await;
}
HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) => {
let cert_view = next_epoch_qc.view_number();
if !self.upgrade_lock.epochs_enabled(cert_view).await {
tracing::debug!("Next epoch QC2 formed but epochs not enabled. Do nothing");
return Ok(());
}
if !self
.consensus
.read()
.await
.is_leaf_extended(next_epoch_qc.data.leaf_commit)
{
tracing::warn!("We formed eQC but we don't have corresponding next epoch eQC.");
tracing::debug!("We formed next epoch QC but not eQC. Do nothing");
return Ok(());
}

let consensus_reader = self.consensus.read().await;
let high_qc = consensus_reader.high_qc();
if high_qc.view_number() != next_epoch_qc.view_number()
|| high_qc.data != *next_epoch_qc.data
{
tracing::debug!("We formed the current epoch eQC but we don't have the corresponding next epoch eQC.");
return Ok(());
}
let high_qc = high_qc.clone();
drop(consensus_reader);

broadcast_event(Arc::new(HotShotEvent::ExtendedQc2Formed(high_qc)), &sender).await;
}
HotShotEvent::ExtendedQc2Formed(eqc) => {
let cert_view = eqc.view_number();
let cert_block_number = self
.consensus
.read()
.await
.saved_leaves()
.get(&quorum_cert.data.leaf_commit)
.get(&eqc.data.leaf_commit)
.context(error!(
"Could not find the leaf for the eQC. It shouldn't happen."
))?
Expand Down
9 changes: 7 additions & 2 deletions hotshot-task-impls/src/da.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,17 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> DaTaskState<TYP
&upgrade_lock,
)
.await;
if let Some(Some(vid_share)) = consensus
if let Some(vid_share) = consensus
.read()
.await
.vid_shares()
.get(&view_number)
.map(|shares| shares.get(&public_key).cloned())
.and_then(|key_map| key_map.get(&public_key))
.and_then(|epoch_map| {
epoch_map
.get(&epoch_number)
.or_else(|| epoch_map.get(&epoch_number.map(|e| e + 1)))
})
{
broadcast_event(
Arc::new(HotShotEvent::VidShareRecv(
Expand Down
6 changes: 6 additions & 0 deletions hotshot-task-impls/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ pub enum HotShotEvent<TYPES: NodeType> {
Qc2Formed(Either<QuorumCertificate2<TYPES>, TimeoutCertificate2<TYPES>>),
/// The next leader has collected enough votes from the next epoch nodes to form a QC; emitted by the next leader in the consensus task; an internal event only
NextEpochQc2Formed(Either<NextEpochQuorumCertificate2<TYPES>, TimeoutCertificate<TYPES>>),
/// A validator formed both a current epoch eQC and a next epoch eQC
ExtendedQc2Formed(QuorumCertificate2<TYPES>),
/// The DA leader has collected enough votes to form a DAC; emitted by the DA leader in the DA task; sent to the entire network via the networking task
DacSend(DaCertificate2<TYPES>, TYPES::SignatureKey),
/// The current view has changed; emitted by the replica in the consensus task or replica in the view sync task; received by almost all other tasks
Expand Down Expand Up @@ -312,6 +314,7 @@ impl<TYPES: NodeType> HotShotEvent<TYPES> {
either::Left(qc) => Some(qc.view_number()),
either::Right(tc) => Some(tc.view_number()),
},
HotShotEvent::ExtendedQc2Formed(cert) => Some(cert.view_number()),
HotShotEvent::ViewSyncCommitVoteSend(vote)
| HotShotEvent::ViewSyncCommitVoteRecv(vote) => Some(vote.view_number()),
HotShotEvent::ViewSyncPreCommitVoteRecv(vote)
Expand Down Expand Up @@ -447,6 +450,9 @@ impl<TYPES: NodeType> Display for HotShotEvent<TYPES> {
write!(f, "NextEpochQc2Formed(view_number={:?})", tc.view_number())
}
},
HotShotEvent::ExtendedQc2Formed(cert) => {
write!(f, "ExtendedQc2Formed(view_number={:?})", cert.view_number())
}
HotShotEvent::DacSend(cert, _) => {
write!(f, "DacSend(view_number={:?})", cert.view_number())
}
Expand Down
141 changes: 126 additions & 15 deletions hotshot-task-impls/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
// You should have received a copy of the MIT License
// along with the HotShot repository. If not, see <https://mit-license.org/>.

use crate::{events::HotShotEvent, quorum_proposal_recv::ValidationInfo, request::REQUEST_TIMEOUT};
use async_broadcast::{Receiver, SendError, Sender};
use async_lock::RwLock;
use committable::{Commitment, Committable};
use either::Either;
use hotshot_task::dependency::{Dependency, EventDependency};
use hotshot_types::data::VidDisperseShare;
use hotshot_types::simple_certificate::DaCertificate2;
use hotshot_types::{
consensus::OuterConsensus,
data::{Leaf2, QuorumProposalWrapper, ViewChangeEvidence2},
Expand All @@ -33,15 +36,13 @@ use hotshot_types::{
};
use hotshot_utils::anytrace::*;
use std::{
collections::{HashMap, HashSet},
collections::HashSet,
sync::Arc,
time::{Duration, Instant},
};
use tokio::time::timeout;
use tracing::instrument;

use crate::{events::HotShotEvent, quorum_proposal_recv::ValidationInfo, request::REQUEST_TIMEOUT};

/// Trigger a request to the network for a proposal for a view and wait for the response or timeout.
#[instrument(skip_all)]
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -372,6 +373,7 @@ pub async fn decide_from_proposal<TYPES: NodeType, V: Versions>(
public_key: &TYPES::SignatureKey,
with_epochs: bool,
membership: &Arc<RwLock<TYPES::Membership>>,
epoch_height: u64,
) -> LeafChainTraversalOutcome<TYPES> {
let consensus_reader = consensus.read().await;
let existing_upgrade_cert_reader = existing_upgrade_cert.read().await;
Expand Down Expand Up @@ -451,10 +453,13 @@ pub async fn decide_from_proposal<TYPES: NodeType, V: Versions>(
let vid_share = consensus_reader
.vid_shares()
.get(&leaf.view_number())
.unwrap_or(&HashMap::new())
.get(public_key)
.cloned()
.map(|prop| prop.data);
.and_then(|key_map| key_map.get(public_key))
.and_then(|epoch_map| {
epoch_map
.get(&leaf.epoch(epoch_height))
.or_else(|| epoch_map.get(&leaf.epoch(epoch_height).map(|e| e + 1)))
})
.map(|prop| prop.data.clone());

// Add our data into a new `LeafInfo`
res.leaf_views.push(LeafInfo::new(
Expand Down Expand Up @@ -862,12 +867,12 @@ pub async fn wait_for_next_epoch_qc<TYPES: NodeType>(
timeout: u64,
view_start_time: Instant,
receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
) -> Option<NextEpochQuorumCertificate2<TYPES>> {
) -> Result<NextEpochQuorumCertificate2<TYPES>> {
tracing::debug!("getting the next epoch QC");
if let Some(next_epoch_qc) = consensus.read().await.next_epoch_high_qc() {
if next_epoch_qc.data.leaf_commit == high_qc.data.leaf_commit {
// We have it already, no reason to wait
return Some(next_epoch_qc.clone());
return Ok(next_epoch_qc.clone());
}
};

Expand All @@ -876,11 +881,13 @@ pub async fn wait_for_next_epoch_qc<TYPES: NodeType>(
// TODO configure timeout
let Some(time_spent) = Instant::now().checked_duration_since(view_start_time) else {
// Shouldn't be possible, now must be after the start
return None;
return Err(warn!(
"Now is earlier than the view start time. Shouldn't be possible."
));
};
let Some(time_left) = wait_duration.checked_sub(time_spent) else {
// No time left
return None;
return Err(warn!("Run out of time waiting for the next epoch QC."));
};
let receiver = receiver.clone();
let Ok(Some(event)) = tokio::time::timeout(time_left, async move {
Expand All @@ -904,16 +911,16 @@ pub async fn wait_for_next_epoch_qc<TYPES: NodeType>(
// Check again, there is a chance we missed it
if let Some(next_epoch_qc) = consensus.read().await.next_epoch_high_qc() {
if next_epoch_qc.data.leaf_commit == high_qc.data.leaf_commit {
return Some(next_epoch_qc.clone());
return Ok(next_epoch_qc.clone());
}
};
return None;
return Err(warn!("Error while waiting for the next epoch QC."));
};
let HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) = event.as_ref() else {
// this shouldn't happen
return None;
return Err(warn!("Received event is not NextEpochQc2Formed but we checked it earlier. Shouldn't be possible."));
};
Some(next_epoch_qc.clone())
Ok(next_epoch_qc.clone())
}

/// Validates qc's signatures and, if provided, validates next_epoch_qc's signatures and whether it
Expand Down Expand Up @@ -970,3 +977,107 @@ pub async fn validate_qc_and_next_epoch_qc<TYPES: NodeType, V: Versions>(
}
Ok(())
}

/// Gets the second VID share, the current or the next epoch accordingly, from the shared consensus state;
/// makes sure it corresponds to the given DA certificate;
/// if it's not yet available, waits for it with the given timeout.
pub async fn wait_for_second_vid_share<TYPES: NodeType>(
vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
da_cert: &DaCertificate2<TYPES>,
consensus: &OuterConsensus<TYPES>,
timeout: u64,
view_start_time: Instant,
receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
) -> Result<Proposal<TYPES, VidDisperseShare<TYPES>>> {
// If the VID share that we already have is for the current epoch, get the next epoch VID.
// And the other way around.
let target_epoch = if vid_share.data.epoch() == vid_share.data.target_epoch() {
vid_share.data.target_epoch().map(|e| e + 1)
} else {
vid_share.data.target_epoch().map(|e| e - 1)
};
tracing::debug!("getting the second VID share for epoch {:?}", target_epoch);
let maybe_second_vid_share = consensus
.read()
.await
.vid_shares()
.get(&vid_share.data.view_number())
.and_then(|key_map| key_map.get(vid_share.data.recipient_key()))
.and_then(|epoch_map| epoch_map.get(&target_epoch))
.cloned();
if let Some(second_vid_share) = maybe_second_vid_share {
if (target_epoch == da_cert.epoch()
&& second_vid_share.data.payload_commitment() == da_cert.data().payload_commit)
|| (target_epoch != da_cert.epoch()
&& Some(second_vid_share.data.payload_commitment())
== da_cert.data().next_epoch_payload_commit)
{
return Ok(second_vid_share);
}
}

let wait_duration = Duration::from_millis(timeout / 2);

// TODO configure timeout
let Some(time_spent) = Instant::now().checked_duration_since(view_start_time) else {
// Shouldn't be possible, now must be after the start
return Err(warn!(
"Now is earlier than the view start time. Shouldn't be possible."
));
};
let Some(time_left) = wait_duration.checked_sub(time_spent) else {
// No time left
return Err(warn!("Run out of time waiting for the second VID share."));
};
let receiver = receiver.clone();
let Ok(Some(event)) = tokio::time::timeout(time_left, async move {
let da_cert_clone = da_cert.clone();
EventDependency::new(
receiver,
Box::new(move |event| {
let event = event.as_ref();
if let HotShotEvent::VidShareValidated(second_vid_share) = event {
if target_epoch == da_cert_clone.epoch() {
second_vid_share.data.payload_commitment()
== da_cert_clone.data().payload_commit
} else {
Some(second_vid_share.data.payload_commitment())
== da_cert_clone.data().next_epoch_payload_commit
}
} else {
false
}
}),
)
.completed()
.await
})
.await
else {
// Check again, there is a chance we missed it
let maybe_second_vid_share = consensus
.read()
.await
.vid_shares()
.get(&vid_share.data.view_number())
.and_then(|key_map| key_map.get(vid_share.data.recipient_key()))
.and_then(|epoch_map| epoch_map.get(&target_epoch))
.cloned();
if let Some(second_vid_share) = maybe_second_vid_share {
if (target_epoch == da_cert.epoch()
&& second_vid_share.data.payload_commitment() == da_cert.data().payload_commit)
|| (target_epoch != da_cert.epoch()
&& Some(second_vid_share.data.payload_commitment())
== da_cert.data().next_epoch_payload_commit)
{
return Ok(second_vid_share);
}
}
return Err(warn!("Error while waiting for the second VID share."));
};
let HotShotEvent::VidShareValidated(second_vid_share) = event.as_ref() else {
// this shouldn't happen
return Err(warn!("Received event is not VidShareValidated but we checked it earlier. Shouldn't be possible."));
};
Ok(second_vid_share.clone())
}
Loading
Loading