diff --git a/code/crates/core-consensus/Cargo.toml b/code/crates/core-consensus/Cargo.toml index 8a4dace0e..8a212dd8d 100644 --- a/code/crates/core-consensus/Cargo.toml +++ b/code/crates/core-consensus/Cargo.toml @@ -14,7 +14,7 @@ all-features = true [features] default = ["std", "metrics"] -std = [] +std = ["malachitebft-core-driver/std"] metrics = ["std", "dep:malachitebft-metrics"] debug = ["std", "malachitebft-core-driver/debug"] diff --git a/code/crates/core-consensus/src/effect.rs b/code/crates/core-consensus/src/effect.rs index fae2ecf6b..d9f8902ca 100644 --- a/code/crates/core-consensus/src/effect.rs +++ b/code/crates/core-consensus/src/effect.rs @@ -157,6 +157,7 @@ where Ctx::Height, Round, VoteSet, + Option>, resume::Continue, ), diff --git a/code/crates/core-consensus/src/handle.rs b/code/crates/core-consensus/src/handle.rs index dc2a3f553..95be21631 100644 --- a/code/crates/core-consensus/src/handle.rs +++ b/code/crates/core-consensus/src/handle.rs @@ -64,8 +64,8 @@ where Input::VoteSetRequest(request_id, height, round) => { on_vote_set_request(co, state, metrics, request_id, height, round).await } - Input::VoteSetResponse(vote_set) => { - on_vote_set_response(co, state, metrics, vote_set).await + Input::VoteSetResponse(vote_set, polka_certificate) => { + on_vote_set_response(co, state, metrics, vote_set, polka_certificate).await } } } diff --git a/code/crates/core-consensus/src/handle/decide.rs b/code/crates/core-consensus/src/handle/decide.rs index ef5adcccc..985bf5bc3 100644 --- a/code/crates/core-consensus/src/handle/decide.rs +++ b/code/crates/core-consensus/src/handle/decide.rs @@ -51,7 +51,7 @@ where // Look for an existing certificate let (certificate, extensions) = state .driver - .get_certificate(proposal_round, value.id()) + .get_commit_certificate(proposal_round, value.id()) .cloned() .map(|certificate| (certificate, VoteExtensions::default())) .unwrap_or_else(|| { diff --git a/code/crates/core-consensus/src/handle/driver.rs b/code/crates/core-consensus/src/handle/driver.rs index 29617b127..16d43b337 100644 --- a/code/crates/core-consensus/src/handle/driver.rs +++ b/code/crates/core-consensus/src/handle/driver.rs @@ -73,7 +73,19 @@ where DriverInput::CommitCertificate(certificate) => { if certificate.height != state.driver.height() { warn!( - "Ignoring certificate for height {}, current height: {}", + "Ignoring commit certificate for height {}, current height: {}", + certificate.height, + state.driver.height() + ); + + return Ok(()); + } + } + + DriverInput::PolkaCertificate(certificate) => { + if certificate.height != state.driver.height() { + warn!( + "Ignoring polka certificate for height {}, current height: {}", certificate.height, state.driver.height() ); diff --git a/code/crates/core-consensus/src/handle/vote_set.rs b/code/crates/core-consensus/src/handle/vote_set.rs index ec235596a..aa4343100 100644 --- a/code/crates/core-consensus/src/handle/vote_set.rs +++ b/code/crates/core-consensus/src/handle/vote_set.rs @@ -1,3 +1,4 @@ +use crate::handle::driver::apply_driver_input; use crate::handle::vote::on_vote; use crate::input::RequestId; use crate::prelude::*; @@ -15,14 +16,17 @@ where { debug!(%height, %round, %request_id, "Received vote set request, retrieve the votes and send response if set is not empty"); - let votes = state.restore_votes(height, round); - - if !votes.is_empty() { - let vote_set = VoteSet::new(votes); - + if let Some((votes, polka_certificate)) = state.restore_votes(height, round) { perform!( co, - Effect::SendVoteSetResponse(request_id, height, round, vote_set, Default::default()) + Effect::SendVoteSetResponse( + request_id, + height, + round, + VoteSet::new(votes), + polka_certificate, + Default::default() + ) ); } @@ -33,18 +37,31 @@ pub async fn on_vote_set_response( co: &Co, state: &mut State, metrics: &Metrics, - response: VoteSet, + vote_set: VoteSet, + polka_certificate: Option>, ) -> Result<(), Error> where Ctx: Context, { debug!( - height = %state.height(), round = %state.round(), votes.count = %response.len(), + height = %state.height(), round = %state.round(), + votes.count = %vote_set.len(), + polka.certificate = %polka_certificate.is_some(), "Received vote set response" ); - for vote in response.votes { - let _ = on_vote(co, state, metrics, vote).await; + if let Some(certificate) = polka_certificate { + apply_driver_input( + co, + state, + metrics, + DriverInput::PolkaCertificate(certificate), + ) + .await?; + } else { + for vote in vote_set.votes { + on_vote(co, state, metrics, vote).await?; + } } Ok(()) diff --git a/code/crates/core-consensus/src/input.rs b/code/crates/core-consensus/src/input.rs index 3e829d2fa..46a2f5901 100644 --- a/code/crates/core-consensus/src/input.rs +++ b/code/crates/core-consensus/src/input.rs @@ -1,6 +1,7 @@ use derive_where::derive_where; use malachitebft_core_types::{ - CommitCertificate, Context, Round, SignedProposal, SignedVote, Timeout, ValueOrigin, VoteSet, + CommitCertificate, Context, PolkaCertificate, Round, SignedProposal, SignedVote, Timeout, + ValueOrigin, VoteSet, }; use crate::types::ProposedValue; @@ -40,5 +41,5 @@ where VoteSetRequest(RequestId, Ctx::Height, Round), /// Vote set to be sent to peer - VoteSetResponse(VoteSet), + VoteSetResponse(VoteSet, Option>), } diff --git a/code/crates/core-consensus/src/state.rs b/code/crates/core-consensus/src/state.rs index bf37f65b9..ceda7068f 100644 --- a/code/crates/core-consensus/src/state.rs +++ b/code/crates/core-consensus/src/state.rs @@ -134,17 +134,30 @@ where .collect() } - pub fn restore_votes(&mut self, height: Ctx::Height, round: Round) -> Vec> { - // TODO optimization - get votes for all rounds higher than or equal to `round` + #[allow(clippy::type_complexity)] + pub fn restore_votes( + &mut self, + height: Ctx::Height, + round: Round, + ) -> Option<(Vec>, Option>)> { + // TODO: optimization - get votes for all rounds higher than or equal to `round` if height != self.driver.height() { - return vec![]; + return None; } - if let Some(per_round) = self.driver.votes().per_round(round) { - per_round.received_votes().iter().cloned().collect() - } else { - vec![] + let per_round = self.driver.votes().per_round(round)?; + let votes = per_round + .received_votes() + .iter() + .cloned() + .collect::>(); + + if votes.is_empty() { + return None; } + + let polka_certificate = self.driver.get_polka_certificate(round).cloned(); + Some((votes, polka_certificate)) } pub fn full_proposal_at_round_and_value( diff --git a/code/crates/core-driver/src/driver.rs b/code/crates/core-driver/src/driver.rs index 6fd673de7..5632ed728 100644 --- a/code/crates/core-driver/src/driver.rs +++ b/code/crates/core-driver/src/driver.rs @@ -7,9 +7,11 @@ use malachitebft_core_state_machine::output::Output as RoundOutput; use malachitebft_core_state_machine::state::{RoundValue, State as RoundState, Step}; use malachitebft_core_state_machine::state_machine::Info; use malachitebft_core_types::{ - CommitCertificate, Context, Proposal, Round, SignedProposal, SignedVote, Timeout, TimeoutKind, - Validator, ValidatorSet, Validity, ValueId, Vote, VoteType, + CommitCertificate, Context, NilOrVal, PolkaCertificate, Proposal, Round, SignedProposal, + SignedVote, Timeout, TimeoutKind, Validator, ValidatorSet, Validity, Value, ValueId, Vote, + VoteType, }; +use malachitebft_core_votekeeper::keeper::Output as VKOutput; use malachitebft_core_votekeeper::keeper::VoteKeeper; use crate::input::Input; @@ -46,8 +48,11 @@ where /// The vote keeper. pub(crate) vote_keeper: VoteKeeper, - /// The certificate keeper - pub(crate) certificates: Vec>, + /// The commit certificates + pub(crate) commit_certificates: Vec>, + + /// The polka certificates + pub(crate) polka_certificates: Vec>, /// The state of the round state machine. pub(crate) round_state: RoundState, @@ -89,7 +94,8 @@ where round_state, proposer: None, pending_inputs: vec![], - certificates: vec![], + commit_certificates: vec![], + polka_certificates: vec![], last_prevote: None, last_precommit: None, } @@ -112,7 +118,8 @@ where self.vote_keeper = vote_keeper; self.round_state = round_state; self.pending_inputs = vec![]; - self.certificates = vec![]; + self.commit_certificates = vec![]; + self.polka_certificates = vec![]; self.last_prevote = None; self.last_precommit = None; } @@ -197,16 +204,24 @@ where } /// Get a commit certificate for the given round and value id. - pub fn get_certificate( + pub fn get_commit_certificate( &self, round: Round, value_id: ValueId, ) -> Option<&CommitCertificate> { - self.certificates + self.commit_certificates .iter() .find(|c| c.round == round && c.value_id == value_id) } + /// Get a polka certificate for the given round. + pub fn get_polka_certificate(&self, round: Round) -> Option<&PolkaCertificate> { + self.polka_certificates + .iter() + .filter(|c| c.round <= round) + .last() + } + /// Store the last vote that we have cast fn set_last_vote_cast(&mut self, vote: &Ctx::Vote) { assert_eq!(vote.height(), self.height()); @@ -274,18 +289,28 @@ where // - That vote is for a higher height than our last vote // - That vote is for a higher round than our last vote // - That vote is the same as our last vote + // Precommits have the additional constraint that the value must match the valid value let can_vote = match vote.vote_type() { VoteType::Prevote => self.last_prevote.as_ref().map_or(true, |prev| { prev.height() < vote.height() || prev.round() < vote.round() || prev == &vote }), - VoteType::Precommit => self.last_precommit.as_ref().map_or(true, |prev| { - prev.height() < vote.height() || prev.round() < vote.round() || prev == &vote - }), + VoteType::Precommit => { + let good_precommit = self.last_precommit.as_ref().map_or(true, |prev| { + prev.height() < vote.height() || prev.round() < vote.round() || prev == &vote + }); + let match_valid = self.round_state.valid.as_ref().map_or(true, |valid| { + if let NilOrVal::Val(value_id) = vote.value() { + &valid.value.id() == value_id + } else { + true + } + }); + good_precommit && match_valid + } }; if can_vote { self.set_last_vote_cast(&vote); - outputs.push(Output::Vote(vote)); } } @@ -294,6 +319,7 @@ where fn apply(&mut self, input: Input) -> Result>, Error> { match input { Input::CommitCertificate(certificate) => self.apply_commit_certificate(certificate), + Input::PolkaCertificate(certificate) => self.apply_polka_certificate(certificate), Input::NewRound(height, round, proposer) => { self.apply_new_round(height, round, proposer) } @@ -317,12 +343,29 @@ where let round = certificate.round; - match self.store_and_multiplex_certificate(certificate) { + match self.store_and_multiplex_commit_certificate(certificate) { Some(round_input) => self.apply_input(round, round_input), None => Ok(None), } } + fn apply_polka_certificate( + &mut self, + certificate: PolkaCertificate, + ) -> Result>, Error> { + if self.height() != certificate.height { + return Err(Error::InvalidCertificateHeight { + certificate_height: certificate.height, + consensus_height: self.height(), + }); + } + + match dbg!(self.store_and_multiplex_polka_certificate(certificate)) { + Some(round_input) => dbg!(self.apply_input(self.round(), round_input)), + None => Ok(None), + } + } + fn apply_new_round( &mut self, height: Ctx::Height, @@ -395,8 +438,11 @@ where return Ok(None); }; - let round_input = self.multiplex_vote_threshold(output, vote_round); + if let VKOutput::PolkaValue(val) = &output { + self.store_polka_certificate(vote_round, val); + } + let round_input = self.multiplex_vote_threshold(output, vote_round); if round_input == RoundInput::NoInput { return Ok(None); } @@ -404,6 +450,27 @@ where self.apply_input(vote_round, round_input) } + fn store_polka_certificate(&mut self, vote_round: Round, value_id: &ValueId) { + let Some(per_round) = self.vote_keeper.per_round(vote_round) else { + return; + }; + + self.polka_certificates.push(PolkaCertificate { + height: self.height(), + round: vote_round, + value_id: value_id.clone(), + votes: per_round + .received_votes() + .iter() + .filter(|v| { + v.vote_type() == VoteType::Prevote + && v.value().as_ref() == NilOrVal::Val(value_id) + }) + .cloned() + .collect(), + }) + } + fn apply_timeout(&mut self, timeout: Timeout) -> Result>, Error> { let input = match timeout.kind { TimeoutKind::Propose => RoundInput::TimeoutPropose, diff --git a/code/crates/core-driver/src/input.rs b/code/crates/core-driver/src/input.rs index defd688ab..c34e1c658 100644 --- a/code/crates/core-driver/src/input.rs +++ b/code/crates/core-driver/src/input.rs @@ -1,5 +1,6 @@ use malachitebft_core_types::{ - CommitCertificate, Context, Round, SignedProposal, SignedVote, Timeout, Validity, + CommitCertificate, Context, PolkaCertificate, Round, SignedProposal, SignedVote, Timeout, + Validity, }; use derive_where::derive_where; @@ -25,6 +26,9 @@ where /// Received a commit certificate CommitCertificate(CommitCertificate), + /// Received a polka certificate + PolkaCertificate(PolkaCertificate), + /// Receive a timeout TimeoutElapsed(Timeout), } diff --git a/code/crates/core-driver/src/mux.rs b/code/crates/core-driver/src/mux.rs index 918a0645b..3511a0181 100644 --- a/code/crates/core-driver/src/mux.rs +++ b/code/crates/core-driver/src/mux.rs @@ -31,7 +31,7 @@ use alloc::vec::Vec; use malachitebft_core_state_machine::input::Input as RoundInput; use malachitebft_core_state_machine::state::Step; -use malachitebft_core_types::{CommitCertificate, SignedProposal}; +use malachitebft_core_types::{CommitCertificate, PolkaCertificate, SignedProposal}; use malachitebft_core_types::{Context, Proposal, Round, Validity, Value, ValueId, VoteType}; use malachitebft_core_votekeeper::keeper::Output as VKOutput; use malachitebft_core_votekeeper::keeper::VoteKeeper; @@ -121,7 +121,7 @@ where // L49 if self.round_state.decision.is_none() && self - .get_certificate(proposal.round(), proposal.value().id()) + .get_commit_certificate(proposal.round(), proposal.value().id()) .is_some() { return Some(RoundInput::ProposalAndPrecommitValue(proposal)); @@ -181,7 +181,7 @@ where self.multiplex_proposal(proposal, validity) } - pub(crate) fn store_and_multiplex_certificate( + pub(crate) fn store_and_multiplex_commit_certificate( &mut self, certificate: CommitCertificate, ) -> Option> { @@ -192,7 +192,7 @@ where let certificate_value_id = certificate.value_id.clone(); // Store the certificate - self.certificates.push(certificate); + self.commit_certificates.push(certificate); if let Some((signed_proposal, validity)) = self .proposal_keeper @@ -207,6 +207,41 @@ where None } + pub(crate) fn store_and_multiplex_polka_certificate( + &mut self, + certificate: PolkaCertificate, + ) -> Option> { + // Should only receive proposals for our height. + assert_eq!(self.height(), certificate.height); + + dbg!(&certificate); + + let certificate_round = certificate.round; + let certificate_value_id = certificate.value_id.clone(); + + // Store the certificate + self.polka_certificates.push(certificate); + + let Some((signed_proposal, validity)) = self + .proposal_keeper + .get_proposal_and_validity_for_round(certificate_round) + else { + return Some(RoundInput::PolkaAny); + }; + + let proposal = &signed_proposal.message; + + if dbg!(dbg!(certificate_value_id) == dbg!(proposal.value().id())) { + if dbg!(validity.is_valid()) { + Some(RoundInput::ProposalAndPolkaCurrent(proposal.clone())) + } else { + None + } + } else { + Some(RoundInput::PolkaAny) + } + } + /// After a vote threshold change for a given round, check if we have a polka for nil, some value or any, /// based on the type of threshold and the current proposal. pub(crate) fn multiplex_vote_threshold( diff --git a/code/crates/core-types/src/certificate.rs b/code/crates/core-types/src/certificate.rs index 2df86eacd..c12576790 100644 --- a/code/crates/core-types/src/certificate.rs +++ b/code/crates/core-types/src/certificate.rs @@ -86,6 +86,19 @@ impl CommitCertificate { } } +/// Represents a certificate witnessing a Polka at a given height and round. +#[derive_where(Clone, Debug, PartialEq, Eq)] +pub struct PolkaCertificate { + /// The height at which a Polka was witnessed + pub height: Ctx::Height, + /// The round at which a Polka that was witnessed + pub round: Round, + /// The value that the Polka is for + pub value_id: ValueId, + /// The votes that make up the Polka + pub votes: Vec>, +} + /// Represents an error that can occur when verifying a certificate. #[derive_where(Clone, Debug)] #[derive(Error)] diff --git a/code/crates/core-types/src/lib.rs b/code/crates/core-types/src/lib.rs index 9129d43ee..b2c40ef04 100644 --- a/code/crates/core-types/src/lib.rs +++ b/code/crates/core-types/src/lib.rs @@ -53,7 +53,9 @@ pub type SignedProposalPart = SignedMessage::Proposal /// A signed vote extension pub type SignedExtension = SignedMessage::Extension>; -pub use certificate::{AggregatedSignature, CertificateError, CommitCertificate, CommitSignature}; +pub use certificate::{ + AggregatedSignature, CertificateError, CommitCertificate, CommitSignature, PolkaCertificate, +}; pub use context::Context; pub use height::Height; pub use proposal::{Proposal, Validity}; diff --git a/code/crates/engine/src/consensus.rs b/code/crates/engine/src/consensus.rs index 01b491660..1f97e4ab3 100644 --- a/code/crates/engine/src/consensus.rs +++ b/code/crates/engine/src/consensus.rs @@ -429,6 +429,7 @@ where height, round, vote_set, + polka_certificate, }), ) => { if vote_set.votes.is_empty() { @@ -442,7 +443,7 @@ where .process_input( &myself, state, - ConsensusInput::VoteSetResponse(vote_set), + ConsensusInput::VoteSetResponse(vote_set, polka_certificate), ) .await { @@ -1031,15 +1032,28 @@ where Ok(r.resume_with(())) } - Effect::SendVoteSetResponse(request_id_str, height, round, vote_set, r) => { + Effect::SendVoteSetResponse( + request_id_str, + height, + round, + vote_set, + polka_certificate, + r, + ) => { let Some(sync) = self.sync.as_ref() else { warn!("Responding to a vote set request but sync actor is not available"); return Ok(r.resume_with(())); }; let vote_count = vote_set.len(); - let response = - Response::VoteSetResponse(VoteSetResponse::new(height, round, vote_set)); + let with_polka = polka_certificate.is_some(); + + let response = Response::VoteSetResponse(VoteSetResponse::new( + height, + round, + vote_set, + polka_certificate, + )); let request_id = InboundRequestId::new(request_id_str); @@ -1057,7 +1071,7 @@ where })?; self.tx_event - .send(|| Event::SentVoteSetResponse(height, round, vote_count)); + .send(|| Event::SentVoteSetResponse(height, round, vote_count, with_polka)); Ok(r.resume_with(())) } diff --git a/code/crates/engine/src/util/events.rs b/code/crates/engine/src/util/events.rs index e154a62f7..d8ef4cae2 100644 --- a/code/crates/engine/src/util/events.rs +++ b/code/crates/engine/src/util/events.rs @@ -50,7 +50,7 @@ pub enum Event { Decided(CommitCertificate), Rebroadcast(SignedVote), RequestedVoteSet(Ctx::Height, Round), - SentVoteSetResponse(Ctx::Height, Round, usize), + SentVoteSetResponse(Ctx::Height, Round, usize, bool), WalReplayBegin(Ctx::Height, usize), WalReplayConsensus(SignedConsensusMsg), WalReplayTimeout(Timeout), @@ -78,10 +78,10 @@ impl fmt::Display for Event { Event::RequestedVoteSet(height, round) => { write!(f, "RequestedVoteSet(height: {height}, round: {round})") } - Event::SentVoteSetResponse(height, round, count) => { + Event::SentVoteSetResponse(height, round, count, with_polka) => { write!( f, - "SentVoteSetResponse(height: {height}, round: {round}, count: {count})" + "SentVoteSetResponse(height: {height}, round: {round}, count: {count}, polka_certificate: {with_polka})" ) } Event::WalReplayBegin(height, count) => { diff --git a/code/crates/starknet/host/src/block_store.rs b/code/crates/starknet/host/src/block_store.rs index 91c2ec9fd..718017728 100644 --- a/code/crates/starknet/host/src/block_store.rs +++ b/code/crates/starknet/host/src/block_store.rs @@ -29,11 +29,11 @@ pub struct DecidedBlock { fn decode_certificate(bytes: &[u8]) -> Result, ProtoError> { let proto = proto::sync::CommitCertificate::decode(bytes)?; - codec::decode_certificate(proto) + codec::decode_commit_certificate(proto) } fn encode_certificate(certificate: &CommitCertificate) -> Result, ProtoError> { - let proto = codec::encode_certificate(certificate)?; + let proto = codec::encode_commit_certificate(certificate)?; Ok(proto.encode_to_vec()) } diff --git a/code/crates/starknet/host/src/codec.rs b/code/crates/starknet/host/src/codec.rs index a813f88e8..58afba99b 100644 --- a/code/crates/starknet/host/src/codec.rs +++ b/code/crates/starknet/host/src/codec.rs @@ -5,7 +5,8 @@ use prost::Message; use malachitebft_codec::Codec; use malachitebft_core_types::{ - AggregatedSignature, CommitCertificate, CommitSignature, Round, SignedVote, Validity, + AggregatedSignature, CommitCertificate, CommitSignature, PolkaCertificate, Round, SignedVote, + Validity, }; use malachitebft_engine::util::streaming::{StreamContent, StreamMessage}; use malachitebft_sync::{ @@ -288,11 +289,15 @@ pub fn decode_sync_response( let vote_set = vote_set_response .vote_set .ok_or_else(|| ProtoError::missing_field::("vote_set"))?; + let polka_certificate = vote_set_response.polka_certificate; sync::Response::VoteSetResponse(VoteSetResponse::new( height, round, decode_vote_set(vote_set)?, + polka_certificate + .map(decode_polka_certificate) + .transpose()?, )) } }; @@ -326,6 +331,11 @@ pub fn encode_sync_response( .as_u32() .expect("round should not be nil"), vote_set: Some(encode_vote_set(&vote_set_response.vote_set)?), + polka_certificate: vote_set_response + .polka_certificate + .as_ref() + .map(encode_polka_certificate) + .transpose()?, }, )), }, @@ -478,7 +488,7 @@ impl Codec> for ProtobufCodec { } } -pub fn decode_certificate( +pub fn decode_commit_certificate( certificate: proto::sync::CommitCertificate, ) -> Result, ProtoError> { let value_id = if let Some(block_hash) = certificate.block_hash { @@ -507,7 +517,7 @@ pub fn decode_certificate( Ok(certificate) } -pub fn encode_certificate( +pub fn encode_commit_certificate( certificate: &CommitCertificate, ) -> Result { Ok(proto::sync::CommitCertificate { @@ -525,13 +535,60 @@ impl Codec> for ProtobufCodec { type Error = ProtoError; fn decode(&self, bytes: Bytes) -> Result, Self::Error> { - decode_certificate( + decode_commit_certificate( proto::sync::CommitCertificate::decode(bytes).map_err(ProtoError::Decode)?, ) } fn encode(&self, msg: &CommitCertificate) -> Result { - encode_certificate(msg).map(|proto| proto.encode_to_bytes()) + encode_commit_certificate(msg).map(|proto| proto.encode_to_bytes()) + } +} + +pub(crate) fn encode_polka_certificate( + certificate: &PolkaCertificate, +) -> Result { + Ok(proto::sync::PolkaCertificate { + fork_id: certificate.height.fork_id, + block_number: certificate.height.block_number, + block_hash: Some(certificate.value_id.to_proto()?), + round: certificate.round.as_u32().unwrap(), + votes: certificate + .votes + .iter() + .map(encode_vote) + .collect::>()?, + }) +} + +pub(crate) fn decode_polka_certificate( + certificate: proto::sync::PolkaCertificate, +) -> Result, ProtoError> { + let block_hash = certificate + .block_hash + .ok_or_else(|| ProtoError::missing_field::("block_hash"))?; + + Ok(PolkaCertificate { + height: Height::new(certificate.block_number, certificate.fork_id), + round: Round::new(certificate.round), + value_id: BlockHash::from_proto(block_hash)?, + votes: certificate + .votes + .into_iter() + .map(decode_vote) + .collect::, _>>()?, + }) +} + +impl Codec> for ProtobufCodec { + type Error = ProtoError; + + fn decode(&self, bytes: Bytes) -> Result, Self::Error> { + decode_polka_certificate(proto::sync::PolkaCertificate::decode(bytes)?) + } + + fn encode(&self, msg: &PolkaCertificate) -> Result { + encode_polka_certificate(msg).map(|proto| proto.encode_to_bytes()) } } @@ -540,7 +597,7 @@ pub fn encode_synced_value( ) -> Result { Ok(proto::sync::SyncedValue { value_bytes: synced_value.value_bytes.clone(), - certificate: Some(encode_certificate(&synced_value.certificate)?), + certificate: Some(encode_commit_certificate(&synced_value.certificate)?), }) } @@ -555,7 +612,7 @@ pub fn decode_synced_value( Ok(sync::RawDecidedValue { value_bytes: proto.value_bytes, - certificate: decode_certificate(certificate)?, + certificate: decode_commit_certificate(certificate)?, }) } @@ -591,8 +648,8 @@ pub(crate) fn decode_vote_set( votes: vote_set .signed_votes .into_iter() - .filter_map(decode_vote) - .collect(), + .map(decode_vote) + .collect::, _>>()?, }) } @@ -600,8 +657,8 @@ pub(crate) fn encode_vote(vote: &SignedVote) -> Result Option> { +pub(crate) fn decode_vote(msg: proto::Vote) -> Result, ProtoError> { let signature = Signature::dummy(); - let vote = Vote::from_proto(msg).ok()?; - Some(SignedVote::new(vote, signature)) + let vote = Vote::from_proto(msg)?; + Ok(SignedVote::new(vote, signature)) } diff --git a/code/crates/starknet/p2p-proto/build.rs b/code/crates/starknet/p2p-proto/build.rs index 98d316e8d..6dd13f8c7 100644 --- a/code/crates/starknet/p2p-proto/build.rs +++ b/code/crates/starknet/p2p-proto/build.rs @@ -1,7 +1,6 @@ fn main() -> Result<(), Box> { let protos = &[ "./proto/sync.proto", - "./proto/certificate.proto", "./proto/p2p/proto/common.proto", "./proto/p2p/proto/transaction.proto", "./proto/p2p/proto/consensus/consensus.proto", diff --git a/code/crates/starknet/p2p-proto/proto/certificate.proto b/code/crates/starknet/p2p-proto/proto/certificate.proto deleted file mode 100644 index 5e069c366..000000000 --- a/code/crates/starknet/p2p-proto/proto/certificate.proto +++ /dev/null @@ -1,26 +0,0 @@ -syntax = "proto3"; - -package certificate; - -import "p2p/proto/common.proto"; -import "p2p/proto/consensus/consensus.proto"; -import "p2p/proto/transaction.proto"; - -message CommitSignature { - // TODO - add flag (no vote, nil, value?) - Address validator_address = 1; - ConsensusSignature signature = 2; -} - -message AggregatedSignature { - repeated CommitSignature signatures = 1; -} - -message CommitCertificate { - uint64 fork_id = 1; - uint64 block_number = 2; - uint32 round = 3; - Hash block_hash = 4; - AggregatedSignature aggregated_signature = 5; -} - diff --git a/code/crates/starknet/p2p-proto/proto/sync.proto b/code/crates/starknet/p2p-proto/proto/sync.proto index 389099fe6..381303421 100644 --- a/code/crates/starknet/p2p-proto/proto/sync.proto +++ b/code/crates/starknet/p2p-proto/proto/sync.proto @@ -58,6 +58,14 @@ message CommitCertificate { AggregatedSignature aggregated_signature = 5; } +message PolkaCertificate { + uint64 fork_id = 1; + uint64 block_number = 2; + uint32 round = 3; + Hash block_hash = 4; + repeated Vote votes = 5; +} + message ProposedValue { uint64 fork_id = 1; uint64 block_number = 2; @@ -79,6 +87,7 @@ message VoteSetResponse { uint64 block_number = 2; uint32 round = 3; VoteSet vote_set = 4; + optional PolkaCertificate polka_certificate = 5; } message VoteSet { diff --git a/code/crates/starknet/p2p-proto/src/lib.rs b/code/crates/starknet/p2p-proto/src/lib.rs index 8e10780b3..36e5f633b 100644 --- a/code/crates/starknet/p2p-proto/src/lib.rs +++ b/code/crates/starknet/p2p-proto/src/lib.rs @@ -6,9 +6,9 @@ pub mod sync { include!(concat!(env!("OUT_DIR"), "/sync.rs")); } -pub mod certificate { - include!(concat!(env!("OUT_DIR"), "/certificate.rs")); -} +// pub mod certificate { +// include!(concat!(env!("OUT_DIR"), "/certificate.rs")); +// } impl From for u128 { fn from(value: Uint128) -> Self { diff --git a/code/crates/sync/src/types.rs b/code/crates/sync/src/types.rs index 5bf8e8418..aa06d700f 100644 --- a/code/crates/sync/src/types.rs +++ b/code/crates/sync/src/types.rs @@ -4,7 +4,7 @@ use displaydoc::Display; use libp2p::request_response; use serde::{Deserialize, Serialize}; -use malachitebft_core_types::{CommitCertificate, Context, Round, VoteSet}; +use malachitebft_core_types::{CommitCertificate, Context, PolkaCertificate, Round, VoteSet}; pub use malachitebft_peer::PeerId; #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Display)] @@ -122,17 +122,28 @@ impl VoteSetRequest { #[derive_where(Clone, Debug, PartialEq, Eq)] pub struct VoteSetResponse { + /// The height of the vote set pub height: Ctx::Height, + /// The round of the vote set pub round: Round, + /// The set of votes at this height and round pub vote_set: VoteSet, + /// A certificate witnessing a Polka at this height and round (if any) + pub polka_certificate: Option>, } impl VoteSetResponse { - pub fn new(height: Ctx::Height, round: Round, vote_set: VoteSet) -> Self { + pub fn new( + height: Ctx::Height, + round: Round, + vote_set: VoteSet, + polka_certificate: Option>, + ) -> Self { Self { height, round, vote_set, + polka_certificate, } } } diff --git a/code/crates/test/app/src/store.rs b/code/crates/test/app/src/store.rs index b67b1c8e1..b4001184c 100644 --- a/code/crates/test/app/src/store.rs +++ b/code/crates/test/app/src/store.rs @@ -28,11 +28,11 @@ pub struct DecidedValue { fn decode_certificate(bytes: &[u8]) -> Result, ProtoError> { let proto = proto::CommitCertificate::decode(bytes)?; - codec::decode_certificate(proto) + codec::decode_commit_certificate(proto) } fn encode_certificate(certificate: &CommitCertificate) -> Result, ProtoError> { - let proto = codec::encode_certificate(certificate)?; + let proto = codec::encode_commit_certificate(certificate)?; Ok(proto.encode_to_vec()) } diff --git a/code/crates/test/proto/sync.proto b/code/crates/test/proto/sync.proto index 2b4405761..fd0901c7b 100644 --- a/code/crates/test/proto/sync.proto +++ b/code/crates/test/proto/sync.proto @@ -45,6 +45,13 @@ message CommitCertificate { AggregatedSignature aggregated_signature = 4; } +message PolkaCertificate { + uint64 height = 1; + uint32 round = 2; + ValueId value_id = 3; + repeated SignedMessage votes = 4; +} + message ProposedValue { uint64 height = 1; uint32 round = 2; @@ -63,6 +70,7 @@ message VoteSetResponse { uint64 height = 1; uint32 round = 2; VoteSet vote_set = 3; + optional PolkaCertificate polka_certificate = 5; } message VoteSet { diff --git a/code/crates/test/src/codec/proto/mod.rs b/code/crates/test/src/codec/proto/mod.rs index cf7089d96..42641d492 100644 --- a/code/crates/test/src/codec/proto/mod.rs +++ b/code/crates/test/src/codec/proto/mod.rs @@ -5,8 +5,8 @@ use malachitebft_app::streaming::{StreamContent, StreamId, StreamMessage}; use malachitebft_codec::Codec; use malachitebft_core_consensus::{ProposedValue, SignedConsensusMsg}; use malachitebft_core_types::{ - AggregatedSignature, CommitCertificate, CommitSignature, Round, SignedExtension, - SignedProposal, SignedVote, Validity, VoteSet, + AggregatedSignature, CommitCertificate, CommitSignature, PolkaCertificate, Round, + SignedExtension, SignedProposal, SignedVote, Validity, VoteSet, }; use malachitebft_proto::{Error as ProtoError, Protobuf}; use malachitebft_signing_ed25519::Signature; @@ -300,6 +300,10 @@ pub fn decode_sync_response( height, round, decode_vote_set(vote_set)?, + vote_set_response + .polka_certificate + .map(decode_polka_certificate) + .transpose()?, )) } }; @@ -331,6 +335,11 @@ pub fn encode_sync_response( .as_u32() .expect("round should not be nil"), vote_set: Some(encode_vote_set(&vote_set_response.vote_set)?), + polka_certificate: vote_set_response + .polka_certificate + .as_ref() + .map(encode_polka_certificate) + .transpose()?, }, )), }, @@ -344,7 +353,7 @@ pub fn encode_synced_value( ) -> Result { Ok(proto::SyncedValue { value_bytes: synced_value.value_bytes.clone(), - certificate: Some(encode_certificate(&synced_value.certificate)?), + certificate: Some(encode_commit_certificate(&synced_value.certificate)?), }) } @@ -357,11 +366,49 @@ pub fn decode_synced_value( Ok(sync::RawDecidedValue { value_bytes: proto.value_bytes, - certificate: decode_certificate(certificate)?, + certificate: decode_commit_certificate(certificate)?, + }) +} + +pub(crate) fn encode_polka_certificate( + polka_certificate: &PolkaCertificate, +) -> Result { + Ok(proto::PolkaCertificate { + height: polka_certificate.height.as_u64(), + round: polka_certificate + .round + .as_u32() + .expect("round should not be nil"), + value_id: Some(polka_certificate.value_id.to_proto()?), + votes: polka_certificate + .votes + .iter() + .map(encode_vote) + .collect::, _>>()?, + }) +} + +pub(crate) fn decode_polka_certificate( + certificate: proto::PolkaCertificate, +) -> Result, ProtoError> { + let value_id = certificate + .value_id + .ok_or_else(|| ProtoError::missing_field::("value_id")) + .and_then(ValueId::from_proto)?; + + Ok(PolkaCertificate { + height: Height::new(certificate.height), + round: Round::new(certificate.round), + value_id, + votes: certificate + .votes + .into_iter() + .map(decode_vote) + .collect::, _>>()?, }) } -pub fn decode_certificate( +pub fn decode_commit_certificate( certificate: proto::CommitCertificate, ) -> Result, ProtoError> { let value_id = certificate @@ -386,7 +433,7 @@ pub fn decode_certificate( Ok(certificate) } -pub fn encode_certificate( +pub fn encode_commit_certificate( certificate: &CommitCertificate, ) -> Result { Ok(proto::CommitCertificate { @@ -484,21 +531,26 @@ pub fn decode_vote_set(vote_set: proto::VoteSet) -> Result, votes: vote_set .signed_votes .into_iter() - .filter_map(decode_vote) - .collect(), + .map(decode_vote) + .collect::, _>>()?, }) } -pub fn decode_vote(msg: proto::SignedMessage) -> Option> { - let signature = msg.signature?; +pub fn decode_vote(msg: proto::SignedMessage) -> Result, ProtoError> { + let signature = msg + .signature + .ok_or_else(|| ProtoError::missing_field::("signature"))?; + let vote = match msg.message { - Some(proto::signed_message::Message::Vote(v)) => Some(v), - _ => None, + Some(proto::signed_message::Message::Vote(v)) => Ok(v), + _ => Err(ProtoError::Other( + "Invalid message type: not a vote".to_string(), + )), }?; - let signature = decode_signature(signature).ok()?; - let vote = Vote::from_proto(vote).ok()?; - Some(SignedVote::new(vote, signature)) + let signature = decode_signature(signature)?; + let vote = Vote::from_proto(vote)?; + Ok(SignedVote::new(vote, signature)) } pub fn encode_signature(signature: &Signature) -> proto::Signature { diff --git a/code/crates/test/tests/it/wal.rs b/code/crates/test/tests/it/wal.rs index d67d4a838..101ebdff2 100644 --- a/code/crates/test/tests/it/wal.rs +++ b/code/crates/test/tests/it/wal.rs @@ -229,6 +229,7 @@ async fn byzantine_proposer_crashes_after_proposing_1(params: TestParams) { .wait_until(CRASH_HEIGHT) .crash() .restart_after(Duration::from_secs(5)) + .wait_until(CRASH_HEIGHT + 2) .success(); test.add_node() @@ -237,6 +238,7 @@ async fn byzantine_proposer_crashes_after_proposing_1(params: TestParams) { .wait_until(CRASH_HEIGHT) .crash() .restart_after(Duration::from_secs(5)) + .wait_until(CRASH_HEIGHT + 2) .success(); test.add_node() @@ -392,7 +394,7 @@ async fn byzantine_proposer_crashes_after_proposing_2(params: TestParams) { test.build() .run_with_params( - Duration::from_secs(60), + Duration::from_secs(90), TestParams { timeout_step: Duration::from_secs(5), value_payload: ValuePayload::ProposalAndParts, diff --git a/code/examples/channel/src/store.rs b/code/examples/channel/src/store.rs index b33d56606..a08f19c38 100644 --- a/code/examples/channel/src/store.rs +++ b/code/examples/channel/src/store.rs @@ -32,11 +32,11 @@ pub struct DecidedValue { fn decode_certificate(bytes: &[u8]) -> Result, ProtoError> { let proto = proto::CommitCertificate::decode(bytes)?; - codec::decode_certificate(proto) + codec::decode_commit_certificate(proto) } fn encode_certificate(certificate: &CommitCertificate) -> Result, ProtoError> { - let proto = codec::encode_certificate(certificate)?; + let proto = codec::encode_commit_certificate(certificate)?; Ok(proto.encode_to_vec()) }