diff --git a/crates/hotshot/src/tasks/mod.rs b/crates/hotshot/src/tasks/mod.rs index d42f392675..d32a671b94 100644 --- a/crates/hotshot/src/tasks/mod.rs +++ b/crates/hotshot/src/tasks/mod.rs @@ -82,7 +82,7 @@ pub fn add_response_task, V: Versi ) { let state = NetworkResponseState::::new( handle.hotshot.consensus(), - Arc::clone(&handle.hotshot.memberships), + Arc::clone(&handle.memberships), handle.public_key().clone(), handle.private_key().clone(), handle.hotshot.id, @@ -156,7 +156,9 @@ pub fn add_network_message_task< message = network.recv_message().fuse() => { // Make sure the message did not fail let message = match message { - Ok(message) => message, + Ok(message) => { + message + } Err(e) => { tracing::error!("Failed to receive message: {:?}", e); continue; diff --git a/crates/hotshot/src/tasks/task_state.rs b/crates/hotshot/src/tasks/task_state.rs index 7a610ac6e1..9b0230c990 100644 --- a/crates/hotshot/src/tasks/task_state.rs +++ b/crates/hotshot/src/tasks/task_state.rs @@ -219,6 +219,7 @@ impl, V: Versions> CreateTaskState .marketplace_config .fallback_builder_url .clone(), + epoch_height: handle.epoch_height, } } } diff --git a/crates/hotshot/src/traits/networking/push_cdn_network.rs b/crates/hotshot/src/traits/networking/push_cdn_network.rs index 553b2545ef..3601c38f2d 100644 --- a/crates/hotshot/src/traits/networking/push_cdn_network.rs +++ b/crates/hotshot/src/traits/networking/push_cdn_network.rs @@ -6,7 +6,7 @@ #[cfg(feature = "hotshot-testing")] use std::sync::atomic::{AtomicBool, Ordering}; -use std::{marker::PhantomData, sync::Arc}; +use std::{collections::VecDeque, marker::PhantomData, sync::Arc}; #[cfg(feature = "hotshot-testing")] use std::{path::Path, time::Duration}; @@ -46,6 +46,7 @@ use hotshot_types::{ BoxSyncFuture, }; use num_enum::{IntoPrimitive, TryFromPrimitive}; +use parking_lot::Mutex; #[cfg(feature = "hotshot-testing")] use rand::{rngs::StdRng, RngCore, SeedableRng}; use tokio::{spawn, sync::mpsc::error::TrySendError, time::sleep}; @@ -191,6 +192,10 @@ pub struct PushCdnNetwork { client: Client>, /// The CDN-specific metrics metrics: Arc, + /// The internal queue for messages to ourselves + internal_queue: Arc>>>, + /// The public key of this node + public_key: K, /// Whether or not the underlying network is supposed to be paused #[cfg(feature = "hotshot-testing")] is_paused: Arc, @@ -229,7 +234,7 @@ impl PushCdnNetwork { let config = ClientConfig { endpoint: marshal_endpoint, subscribed_topics: topics.into_iter().map(|t| t as u8).collect(), - keypair, + keypair: keypair.clone(), use_local_authority: true, }; @@ -239,6 +244,8 @@ impl PushCdnNetwork { Ok(Self { client, metrics: Arc::from(metrics), + internal_queue: Arc::new(Mutex::new(VecDeque::new())), + public_key: keypair.public_key.0, // Start unpaused #[cfg(feature = "hotshot-testing")] is_paused: Arc::from(AtomicBool::new(false)), @@ -422,7 +429,7 @@ impl TestableNetworkingImplementation let client_config: ClientConfig> = ClientConfig { keypair: KeyPair { - public_key: WrappedSignatureKey(public_key), + public_key: WrappedSignatureKey(public_key.clone()), private_key, }, subscribed_topics: topics, @@ -434,6 +441,8 @@ impl TestableNetworkingImplementation Arc::new(PushCdnNetwork { client: Client::new(client_config), metrics: Arc::new(CdnMetricsValue::default()), + internal_queue: Arc::new(Mutex::new(VecDeque::new())), + public_key, #[cfg(feature = "hotshot-testing")] is_paused: Arc::from(AtomicBool::new(false)), }) @@ -533,6 +542,12 @@ impl ConnectedNetwork for PushCdnNetwork { return Ok(()); } + // If the message is to ourselves, just add it to the internal queue + if recipient == self.public_key { + self.internal_queue.lock().push_back(message); + return Ok(()); + } + // Send the message if let Err(e) = self .client @@ -554,7 +569,12 @@ impl ConnectedNetwork for PushCdnNetwork { /// # Errors /// - If we fail to receive messages. Will trigger a retry automatically. async fn recv_message(&self) -> Result, NetworkError> { - // Receive a message + // If we have a message in the internal queue, return it + if let Some(message) = self.internal_queue.lock().pop_front() { + return Ok(message); + } + + // Receive a message from the network let message = self.client.receive_message().await; // If we're paused, receive but don't process messages diff --git a/crates/task-impls/src/network.rs b/crates/task-impls/src/network.rs index 42b2235dd3..7d6664f39c 100644 --- a/crates/task-impls/src/network.rs +++ b/crates/task-impls/src/network.rs @@ -404,7 +404,6 @@ impl< ) -> std::result::Result<(), ()> { if let Some(mut action) = maybe_action { if !consensus.write().await.update_action(action, view) { - tracing::warn!("Already actioned {:?} in view {:?}", action, view); return Err(()); } // If the action was view sync record it as a vote, but we don't diff --git a/crates/task-impls/src/quorum_vote/mod.rs b/crates/task-impls/src/quorum_vote/mod.rs index 5ec3e54b1a..2b30bab7a9 100644 --- a/crates/task-impls/src/quorum_vote/mod.rs +++ b/crates/task-impls/src/quorum_vote/mod.rs @@ -173,7 +173,13 @@ impl + 'static, V: Versions> Handl } } HotShotEvent::VidShareValidated(share) => { - let vid_payload_commitment = &share.data.payload_commitment; + let vid_payload_commitment = if let Some(ref data_epoch_payload_commitment) = + share.data.data_epoch_payload_commitment + { + data_epoch_payload_commitment + } else { + &share.data.payload_commitment + }; vid_share = Some(share.clone()); if let Some(ref comm) = payload_commitment { if vid_payload_commitment != comm { @@ -372,8 +378,12 @@ impl, V: Versions> QuorumVoteTaskS view_number: TYPES::View, event_receiver: Receiver>>, event_sender: &Sender>>, - event: Option>>, + event: Arc>, ) { + tracing::debug!( + "Attempting to make dependency task for view {view_number:?} and event {event:?}" + ); + if self.vote_dependencies.contains_key(&view_number) { return; } @@ -388,10 +398,8 @@ impl, V: Versions> QuorumVoteTaskS let vid_dependency = self.create_event_dependency(VoteDependency::Vid, view_number, event_receiver.clone()); // If we have an event provided to us - if let Some(event) = event { - if let HotShotEvent::QuorumProposalValidated(..) = event.as_ref() { - quorum_proposal_dependency.mark_as_completed(event); - } + if let HotShotEvent::QuorumProposalValidated(..) = event.as_ref() { + quorum_proposal_dependency.mark_as_completed(event); } let deps = vec![quorum_proposal_dependency, dac_dependency, vid_dependency]; @@ -500,7 +508,7 @@ impl, V: Versions> QuorumVoteTaskS proposal.data.view_number, event_receiver, &event_sender, - Some(Arc::clone(&event)), + Arc::clone(&event), ); } } @@ -544,7 +552,12 @@ impl, V: Versions> QuorumVoteTaskS &event_sender.clone(), ) .await; - self.create_dependency_task_if_new(view, event_receiver, &event_sender, None); + self.create_dependency_task_if_new( + view, + event_receiver, + &event_sender, + Arc::clone(&event), + ); } HotShotEvent::VidShareRecv(sender, disperse) => { let view = disperse.data.view_number(); @@ -557,25 +570,25 @@ impl, V: Versions> QuorumVoteTaskS // Validate the VID share. let payload_commitment = &disperse.data.payload_commitment; - let disperse_epoch = disperse.data.epoch; - // Check that the signature is valid ensure!( sender.validate(&disperse.signature, payload_commitment.as_ref()), "VID share signature is invalid" ); + let vid_epoch = disperse.data.epoch; + let target_epoch = disperse.data.target_epoch; let membership_reader = self.membership.read().await; // ensure that the VID share was sent by a DA member OR the view leader ensure!( membership_reader - .da_committee_members(view, disperse_epoch) + .da_committee_members(view, vid_epoch) .contains(sender) - || *sender == membership_reader.leader(view, disperse_epoch)?, + || *sender == membership_reader.leader(view, vid_epoch)?, "VID share was not sent by a DA member or the view leader." ); - let membership_total_nodes = membership_reader.total_nodes(disperse_epoch); + let membership_total_nodes = membership_reader.total_nodes(target_epoch); drop(membership_reader); // NOTE: `verify_share` returns a nested `Result`, so we must check both the inner @@ -606,7 +619,12 @@ impl, V: Versions> QuorumVoteTaskS &event_sender.clone(), ) .await; - self.create_dependency_task_if_new(view, event_receiver, &event_sender, None); + self.create_dependency_task_if_new( + view, + event_receiver, + &event_sender, + Arc::clone(&event), + ); } HotShotEvent::Timeout(view, ..) => { let view = TYPES::View::new(view.saturating_sub(1)); @@ -717,25 +735,30 @@ impl, V: Versions> QuorumVoteTaskS current_block_number, self.epoch_height, )); - tracing::trace!( - "Sending ViewChange for view {} and epoch {}", - proposal.data.view_number() + 1, - *current_epoch - ); - broadcast_event( - Arc::new(HotShotEvent::ViewChange( - proposal.data.view_number() + 1, - current_epoch, - )), - &event_sender, - ) - .await; let is_vote_leaf_extended = self .consensus .read() .await .is_leaf_extended(proposed_leaf.commit()); + if !is_vote_leaf_extended { + // We're voting for the proposal that will probably form the eQC. We don't want to change + // the view here because we will probably change it when we form the eQC. + // The main reason is to handle view change event only once in the transaction task. + tracing::trace!( + "Sending ViewChange for view {} and epoch {}", + proposal.data.view_number() + 1, + *current_epoch + ); + broadcast_event( + Arc::new(HotShotEvent::ViewChange( + proposal.data.view_number() + 1, + current_epoch, + )), + &event_sender, + ) + .await; + } if let Err(e) = submit_vote::( event_sender.clone(), Arc::clone(&self.membership), diff --git a/crates/task-impls/src/request.rs b/crates/task-impls/src/request.rs index 5e1bdef390..c3bb9a2fb8 100644 --- a/crates/task-impls/src/request.rs +++ b/crates/task-impls/src/request.rs @@ -220,6 +220,7 @@ impl> NetworkRequestState = spawn(async move { // Do the delay only if primary is up and then start sending if !network.is_primary_down() { @@ -261,8 +262,9 @@ impl> NetworkRequestState, V /// fallback builder url pub fallback_builder_url: Url, + + /// Number of blocks in an epoch, zero means there are no epochs + pub epoch_height: u64, } impl, V: Versions> TransactionTaskState { @@ -477,9 +480,13 @@ impl, V: Versions> TransactionTask } HotShotEvent::ViewChange(view, epoch) => { let view = TYPES::View::new(std::cmp::max(1, **view)); - + let epoch = if self.epoch_height != 0 { + TYPES::Epoch::new(std::cmp::max(1, **epoch)) + } else { + *epoch + }; ensure!( - *view > *self.cur_view || *epoch > self.cur_epoch, + *view > *self.cur_view && *epoch >= *self.cur_epoch, debug!( "Received a view change to an older view and epoch: tried to change view to {:?}\ and epoch {:?} though we are at view {:?} and epoch {:?}", @@ -487,11 +494,11 @@ impl, V: Versions> TransactionTask ) ); self.cur_view = view; - self.cur_epoch = *epoch; + self.cur_epoch = epoch; - let leader = self.membership.read().await.leader(view, *epoch)?; + let leader = self.membership.read().await.leader(view, epoch)?; if leader == self.public_key { - self.handle_view_change(&event_stream, view, *epoch).await; + self.handle_view_change(&event_stream, view, epoch).await; return Ok(()); } } diff --git a/crates/task-impls/src/vid.rs b/crates/task-impls/src/vid.rs index 1f9b218aaf..3754e2a01d 100644 --- a/crates/task-impls/src/vid.rs +++ b/crates/task-impls/src/vid.rs @@ -102,7 +102,7 @@ impl> VidTaskState { &Arc::clone(&self.membership), *view_number, epoch, - None, + epoch, vid_precompute.clone(), ) .await; @@ -208,7 +208,7 @@ impl> VidTaskState { &Arc::clone(&self.membership), proposal_view_number, target_epoch, - Some(sender_epoch), + sender_epoch, None, ) .await; diff --git a/crates/testing/src/helpers.rs b/crates/testing/src/helpers.rs index 9ec7e68b93..6d945894f3 100644 --- a/crates/testing/src/helpers.rs +++ b/crates/testing/src/helpers.rs @@ -336,6 +336,7 @@ pub async fn build_vid_proposal( vid.disperse(&encoded_transactions).unwrap(), membership, epoch_number, + epoch_number, None, ) .await; diff --git a/crates/testing/src/test_builder.rs b/crates/testing/src/test_builder.rs index 6a2d981cba..a4083f461c 100644 --- a/crates/testing/src/test_builder.rs +++ b/crates/testing/src/test_builder.rs @@ -4,7 +4,9 @@ // You should have received a copy of the MIT License // along with the HotShot repository. If not, see . -use std::{collections::HashMap, num::NonZeroUsize, rc::Rc, sync::Arc, time::Duration}; +use std::{ + any::TypeId, collections::HashMap, num::NonZeroUsize, rc::Rc, sync::Arc, time::Duration, +}; use anyhow::{ensure, Result}; use async_lock::RwLock; @@ -15,8 +17,8 @@ use hotshot::{ HotShotInitializer, MarketplaceConfig, SystemContext, TwinsHandlerState, }; use hotshot_example_types::{ - auction_results_provider_types::TestAuctionResultsProvider, state_types::TestInstanceState, - storage_types::TestStorage, testable_delay::DelayConfig, + auction_results_provider_types::TestAuctionResultsProvider, node_types::EpochsTestVersions, + state_types::TestInstanceState, storage_types::TestStorage, testable_delay::DelayConfig, }; use hotshot_types::{ consensus::ConsensusMetricsValue, @@ -99,8 +101,6 @@ pub struct TestDescription, V: Ver pub start_solver: bool, /// boxed closure used to validate the resulting transactions pub validate_transactions: TransactionValidator, - /// Number of blocks in an epoch, zero means there are no epochs - pub epoch_height: u64, } pub fn nonempty_block_threshold(threshold: (u64, u64)) -> TransactionValidator { @@ -385,7 +385,7 @@ impl, V: Versions> Default /// by default, just a single round #[allow(clippy::redundant_field_names)] fn default() -> Self { - let num_nodes_with_stake = 6; + let num_nodes_with_stake = 7; Self { timing_data: TimingData::default(), num_nodes_with_stake, @@ -418,7 +418,6 @@ impl, V: Versions> Default upgrade_view: None, start_solver: true, validate_transactions: Arc::new(|_| Ok(())), - epoch_height: 0, } } } @@ -456,7 +455,6 @@ where timing_data, da_staked_committee_size, unreliable_network, - epoch_height, .. } = self.clone(); @@ -490,6 +488,11 @@ where 0 < da_staked_committee_size, ); // let da_committee_nodes = known_nodes[0..da_committee_size].to_vec(); + let epoch_height = if TypeId::of::() == TypeId::of::() { + 10 + } else { + 0 + }; let config = HotShotConfig { start_threshold: (1, 1), num_nodes_with_stake: NonZeroUsize::new(num_nodes_with_stake).unwrap(), diff --git a/crates/testing/tests/tests_1/test_success.rs b/crates/testing/tests/tests_1/test_success.rs index bdf55c1f6a..e7bc509593 100644 --- a/crates/testing/tests/tests_1/test_success.rs +++ b/crates/testing/tests/tests_1/test_success.rs @@ -4,7 +4,7 @@ // You should have received a copy of the MIT License // along with the HotShot repository. If not, see . -use std::{sync::Arc, time::Duration}; +use std::time::Duration; use hotshot_example_types::{ node_types::{ @@ -18,7 +18,6 @@ use hotshot_macros::cross_tests; use hotshot_testing::{ block_builder::SimpleBuilderImplementation, completion_task::{CompletionTaskDescription, TimeBasedCompletionTaskDescription}, - overall_safety_task::OverallSafetyPropertiesDescription, spinning_task::{ChangeNode, NodeAction, SpinningTaskDescription}, test_builder::TestDescription, view_sync_task::ViewSyncTaskDescription, @@ -43,6 +42,25 @@ cross_tests!( }, ); +cross_tests!( + TestName: test_success_with_epochs, + Impls: [Libp2pImpl, PushCdnImpl, CombinedImpl], + Types: [TestTypes, TestTypesRandomizedLeader, TestTwoStakeTablesTypes], + Versions: [EpochsTestVersions], + Ignore: false, + Metadata: { + TestDescription { + // allow more time to pass in CI + completion_task_description: CompletionTaskDescription::TimeBasedCompletionTaskBuilder( + TimeBasedCompletionTaskDescription { + duration: Duration::from_secs(60), + }, + ), + ..TestDescription::default() + } + }, +); + // cross_tests!( // TestName: test_epoch_success, // Impls: [MemoryImpl, Libp2pImpl, PushCdnImpl], @@ -94,6 +112,38 @@ cross_tests!( }, ); +cross_tests!( + TestName: test_success_with_async_delay_with_epochs, + Impls: [Libp2pImpl, PushCdnImpl, CombinedImpl], + Types: [TestTypes, TestTwoStakeTablesTypes], + Versions: [EpochsTestVersions], + Ignore: false, + Metadata: { + let mut metadata = TestDescription { + // allow more time to pass in CI + completion_task_description: CompletionTaskDescription::TimeBasedCompletionTaskBuilder( + TimeBasedCompletionTaskDescription { + duration: Duration::from_secs(60), + }, + ), + ..TestDescription::default() + }; + + metadata.overall_safety_properties.num_failed_views = 0; + metadata.overall_safety_properties.num_successful_views = 0; + let mut config = DelayConfig::default(); + let delay_settings = DelaySettings { + delay_option: DelayOptions::Random, + min_time_in_milliseconds: 10, + max_time_in_milliseconds: 100, + fixed_time_in_milliseconds: 0, + }; + config.add_settings_for_all_types(delay_settings); + metadata.async_delay_config = config; + metadata + }, +); + cross_tests!( TestName: test_success_with_async_delay_2, Impls: [MemoryImpl, Libp2pImpl, PushCdnImpl], @@ -134,6 +184,46 @@ cross_tests!( }, ); +cross_tests!( + TestName: test_success_with_async_delay_2_with_epochs, + Impls: [Libp2pImpl, PushCdnImpl, CombinedImpl], + Types: [TestTypes, TestTwoStakeTablesTypes], + Versions: [EpochsTestVersions], + Ignore: false, + Metadata: { + let mut metadata = TestDescription { + // allow more time to pass in CI + completion_task_description: CompletionTaskDescription::TimeBasedCompletionTaskBuilder( + TimeBasedCompletionTaskDescription { + duration: Duration::from_secs(60), + }, + ), + ..TestDescription::default() + }; + + metadata.overall_safety_properties.num_failed_views = 0; + metadata.overall_safety_properties.num_successful_views = 30; + let mut config = DelayConfig::default(); + let mut delay_settings = DelaySettings { + delay_option: DelayOptions::Random, + min_time_in_milliseconds: 10, + max_time_in_milliseconds: 100, + fixed_time_in_milliseconds: 15, + }; + config.add_setting(SupportedTraitTypesForAsyncDelay::Storage, &delay_settings); + + delay_settings.delay_option = DelayOptions::Fixed; + config.add_setting(SupportedTraitTypesForAsyncDelay::BlockHeader, &delay_settings); + + delay_settings.delay_option = DelayOptions::Random; + delay_settings.min_time_in_milliseconds = 5; + delay_settings.max_time_in_milliseconds = 20; + config.add_setting(SupportedTraitTypesForAsyncDelay::ValidatedState, &delay_settings); + metadata.async_delay_config = config; + metadata + }, +); + cross_tests!( TestName: test_with_double_leader_no_failures, Impls: [MemoryImpl, Libp2pImpl, PushCdnImpl], @@ -155,6 +245,27 @@ cross_tests!( } ); +cross_tests!( + TestName: test_with_double_leader_no_failures_with_epochs, + Impls: [Libp2pImpl, PushCdnImpl, CombinedImpl], + Types: [TestConsecutiveLeaderTypes, TestTwoStakeTablesTypes], + Versions: [EpochsTestVersions], + Ignore: false, + Metadata: { + let mut metadata = TestDescription::default_more_nodes(); + metadata.num_bootstrap_nodes = 10; + metadata.num_nodes_with_stake = 12; + metadata.da_staked_committee_size = 12; + metadata.start_nodes = 12; + + metadata.overall_safety_properties.num_failed_views = 0; + + metadata.view_sync_properties = ViewSyncTaskDescription::Threshold(0, 0); + + metadata + } +); + cross_tests!( TestName: test_epoch_end, Impls: [CombinedImpl, Libp2pImpl, PushCdnImpl], @@ -168,17 +279,10 @@ cross_tests!( duration: Duration::from_millis(100000), }, ), - epoch_height: 10, - num_nodes_with_stake: 10, - start_nodes: 10, - num_bootstrap_nodes: 10, - da_staked_committee_size: 10, - overall_safety_properties: OverallSafetyPropertiesDescription { - // Explicitly show that we use normal threshold, i.e. 2 nodes_len / 3 + 1 - // but we divide by two because only half of the nodes are active in each epoch - threshold_calculator: Arc::new(|_, nodes_len| 2 * nodes_len / 2 / 3 + 1), - ..OverallSafetyPropertiesDescription::default() - }, + num_nodes_with_stake: 11, + start_nodes: 11, + num_bootstrap_nodes: 11, + da_staked_committee_size: 11, ..TestDescription::default() } @@ -189,8 +293,8 @@ cross_tests!( // This test fails with the old decide rule cross_tests!( TestName: test_shorter_decide, - Impls: [MemoryImpl], - Types: [TestTypes], + Impls: [Libp2pImpl, PushCdnImpl, CombinedImpl], + Types: [TestTypes, TestTwoStakeTablesTypes], Versions: [EpochsTestVersions], Ignore: false, Metadata: { diff --git a/crates/testing/tests/tests_1/test_with_failures_2.rs b/crates/testing/tests/tests_1/test_with_failures_2.rs index c922b7fd6e..242b5ea2c0 100644 --- a/crates/testing/tests/tests_1/test_with_failures_2.rs +++ b/crates/testing/tests/tests_1/test_with_failures_2.rs @@ -9,7 +9,10 @@ use std::collections::HashMap; use hotshot_example_types::{ - node_types::{Libp2pImpl, MemoryImpl, PushCdnImpl, TestConsecutiveLeaderTypes, TestVersions}, + node_types::{ + CombinedImpl, EpochsTestVersions, Libp2pImpl, MemoryImpl, PushCdnImpl, + TestConsecutiveLeaderTypes, TestTwoStakeTablesTypes, TestVersions, + }, state_types::TestTypes, }; use hotshot_macros::cross_tests; @@ -70,6 +73,49 @@ cross_tests!( } ); +cross_tests!( + TestName: test_with_failures_2_with_epochs, + Impls: [Libp2pImpl, PushCdnImpl, CombinedImpl], + Types: [TestTwoStakeTablesTypes], + Versions: [EpochsTestVersions], + Ignore: false, + Metadata: { + let mut metadata = TestDescription::default_more_nodes(); + metadata.num_nodes_with_stake = 12; + metadata.da_staked_committee_size = 12; + metadata.start_nodes = 12; + let dead_nodes = vec![ + ChangeNode { + idx: 10, + updown: NodeAction::Down, + }, + ChangeNode { + idx: 11, + updown: NodeAction::Down, + }, + ]; + + metadata.spinning_properties = SpinningTaskDescription { + node_changes: vec![(5, dead_nodes)] + }; + + // 2 nodes fail triggering view sync, expect no other timeouts + metadata.overall_safety_properties.num_failed_views = 6; + // Make sure we keep committing rounds after the bad leaders, but not the full 50 because of the numerous timeouts + metadata.overall_safety_properties.num_successful_views = 20; + metadata.overall_safety_properties.expected_views_to_fail = HashMap::from([ + (ViewNumber::new(5), false), + (ViewNumber::new(11), false), + (ViewNumber::new(17), false), + (ViewNumber::new(23), false), + (ViewNumber::new(29), false), + (ViewNumber::new(35), false), + ]); + + metadata + } +); + cross_tests!( TestName: test_with_double_leader_failures, Impls: [MemoryImpl, Libp2pImpl, PushCdnImpl], diff --git a/crates/testing/tests/tests_1/vid_task.rs b/crates/testing/tests/tests_1/vid_task.rs index fc62f23819..adafcbfbf7 100644 --- a/crates/testing/tests/tests_1/vid_task.rs +++ b/crates/testing/tests/tests_1/vid_task.rs @@ -93,6 +93,7 @@ async fn test_vid_task() { vid_disperse, &membership, EpochNumber::new(0), + EpochNumber::new(0), None, ) .await; diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index f87c6f365f..2530417f90 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -950,7 +950,7 @@ impl Consensus { .view_inner .epoch()?; let vid = - VidDisperse::calculate_vid_disperse(txns, &membership, view, epoch, None, None).await; + VidDisperse::calculate_vid_disperse(txns, &membership, view, epoch, epoch, None).await; let shares = VidDisperseShare2::from_vid_disperse(vid); let mut consensus_writer = consensus.write().await; for share in shares { diff --git a/crates/types/src/data.rs b/crates/types/src/data.rs index 8cbd6ffdc8..0d6fcfff7a 100644 --- a/crates/types/src/data.rs +++ b/crates/types/src/data.rs @@ -210,10 +210,14 @@ where pub struct VidDisperse { /// The view number for which this VID data is intended pub view_number: TYPES::View, - /// Epoch this proposal applies to + /// Epoch the data of this proposal belongs to pub epoch: TYPES::Epoch, - /// Block payload commitment + /// Epoch to which the recipients of this VID belong to + pub target_epoch: TYPES::Epoch, + /// VidCommitment calculated based on the number of nodes in `target_epoch`. pub payload_commitment: VidCommitment, + /// VidCommitment calculated based on the number of nodes in `epoch`. Needed during epoch transition. + pub data_epoch_payload_commitment: Option, /// A storage node's key and its corresponding VID share pub shares: BTreeMap, /// VID common data sent to all storage nodes @@ -229,7 +233,8 @@ impl VidDisperse { mut vid_disperse: JfVidDisperse, membership: &Arc>, target_epoch: TYPES::Epoch, - sender_epoch: Option, + data_epoch: TYPES::Epoch, + data_epoch_payload_commitment: Option, ) -> Self { let shares = membership .read() @@ -244,7 +249,9 @@ impl VidDisperse { shares, common: vid_disperse.common, payload_commitment: vid_disperse.commit, - epoch: sender_epoch.unwrap_or(target_epoch), + data_epoch_payload_commitment, + epoch: data_epoch, + target_epoch, } } @@ -260,23 +267,43 @@ impl VidDisperse { membership: &Arc>, view: TYPES::View, target_epoch: TYPES::Epoch, - sender_epoch: Option, + data_epoch: TYPES::Epoch, precompute_data: Option, ) -> Self { let num_nodes = membership.read().await.total_nodes(target_epoch); + let txns_clone = Arc::clone(&txns); let vid_disperse = spawn_blocking(move || { precompute_data .map_or_else( - || vid_scheme(num_nodes).disperse(Arc::clone(&txns)), - |data| vid_scheme(num_nodes).disperse_precompute(Arc::clone(&txns), &data) + || vid_scheme(num_nodes).disperse(&txns_clone), + |data| vid_scheme(num_nodes).disperse_precompute(&txns_clone, &data) ) - .unwrap_or_else(|err| panic!("VID disperse failure:(num_storage nodes,payload_byte_len)=({num_nodes},{}) error: {err}", txns.len())) + .unwrap_or_else(|err| panic!("VID disperse failure:(num_storage nodes,payload_byte_len)=({num_nodes},{}) error: {err}", txns_clone.len())) }).await; + let data_epoch_payload_commitment = if target_epoch == data_epoch { + None + } else { + let data_epoch_num_nodes = membership.read().await.total_nodes(data_epoch); + Some(spawn_blocking(move || { + vid_scheme(data_epoch_num_nodes).commit_only(&txns) + .unwrap_or_else(|err| panic!("VID commit_only failure:(num_storage nodes,payload_byte_len)=({num_nodes},{}) error: {err}", txns.len())) + }).await) + }; // Unwrap here will just propagate any panic from the spawned task, it's not a new place we can panic. let vid_disperse = vid_disperse.unwrap(); - - Self::from_membership(view, vid_disperse, membership, target_epoch, sender_epoch).await + let data_epoch_payload_commitment = + data_epoch_payload_commitment.map(|result| result.unwrap()); + + Self::from_membership( + view, + vid_disperse, + membership, + target_epoch, + data_epoch, + data_epoch_payload_commitment, + ) + .await } } @@ -364,7 +391,9 @@ impl VidDisperseShare { let mut vid_disperse = VidDisperse { view_number: first_vid_disperse_share.view_number, epoch: TYPES::Epoch::new(0), + target_epoch: TYPES::Epoch::new(0), payload_commitment: first_vid_disperse_share.payload_commitment, + data_epoch_payload_commitment: None, common: first_vid_disperse_share.common, shares: share_map, }; @@ -405,10 +434,14 @@ impl VidDisperseShare { pub struct VidDisperseShare2 { /// The view number for which this VID data is intended pub view_number: TYPES::View, - /// The epoch number for which this VID data is intended + /// The epoch number for which this VID data belongs to pub epoch: TYPES::Epoch, + /// The epoch number to which the recipient of this VID belongs to + pub target_epoch: TYPES::Epoch, /// Block payload commitment pub payload_commitment: VidCommitment, + /// VidCommitment calculated based on the number of nodes in `epoch`. Needed during epoch transition. + pub data_epoch_payload_commitment: Option, /// A storage node's key and its corresponding VID share pub share: VidShare, /// VID common data sent to all storage nodes @@ -422,7 +455,9 @@ impl From> for VidDisperseShare let VidDisperseShare2 { view_number, epoch: _, + target_epoch: _, payload_commitment, + data_epoch_payload_commitment: _, share, common, recipient_key, @@ -451,7 +486,9 @@ impl From> for VidDisperseShare2 Self { view_number, epoch: TYPES::Epoch::new(0), + target_epoch: TYPES::Epoch::new(0), payload_commitment, + data_epoch_payload_commitment: None, share, common, recipient_key, @@ -462,7 +499,6 @@ impl From> for VidDisperseShare2 impl VidDisperseShare2 { /// Create a vector of `VidDisperseShare` from `VidDisperse` pub fn from_vid_disperse(vid_disperse: VidDisperse) -> Vec { - let epoch = vid_disperse.epoch; vid_disperse .shares .into_iter() @@ -472,7 +508,9 @@ impl VidDisperseShare2 { view_number: vid_disperse.view_number, common: vid_disperse.common.clone(), payload_commitment: vid_disperse.payload_commitment, - epoch, + data_epoch_payload_commitment: vid_disperse.data_epoch_payload_commitment, + epoch: vid_disperse.epoch, + target_epoch: vid_disperse.target_epoch, }) .collect() } @@ -501,7 +539,6 @@ impl VidDisperseShare2 { I: Iterator, { let first_vid_disperse_share = it.next()?.clone(); - let epoch = first_vid_disperse_share.epoch; let mut share_map = BTreeMap::new(); share_map.insert( first_vid_disperse_share.recipient_key, @@ -509,8 +546,10 @@ impl VidDisperseShare2 { ); let mut vid_disperse = VidDisperse { view_number: first_vid_disperse_share.view_number, - epoch, + epoch: first_vid_disperse_share.epoch, + target_epoch: first_vid_disperse_share.target_epoch, payload_commitment: first_vid_disperse_share.payload_commitment, + data_epoch_payload_commitment: first_vid_disperse_share.data_epoch_payload_commitment, common: first_vid_disperse_share.common, shares: share_map, }; @@ -527,7 +566,6 @@ impl VidDisperseShare2 { pub fn to_vid_share_proposals( vid_disperse_proposal: Proposal>, ) -> Vec> { - let epoch = vid_disperse_proposal.data.epoch; vid_disperse_proposal .data .shares @@ -539,7 +577,11 @@ impl VidDisperseShare2 { view_number: vid_disperse_proposal.data.view_number, common: vid_disperse_proposal.data.common.clone(), payload_commitment: vid_disperse_proposal.data.payload_commitment, - epoch, + data_epoch_payload_commitment: vid_disperse_proposal + .data + .data_epoch_payload_commitment, + epoch: vid_disperse_proposal.data.epoch, + target_epoch: vid_disperse_proposal.data.target_epoch, }, signature: vid_disperse_proposal.signature.clone(), _pd: vid_disperse_proposal._pd,