diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 2599eb56670..2eec0b1d071 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3161,9 +3161,9 @@ impl BeaconChain { blobs: FixedBlobSidecarList, ) -> Result> { if let Some(slasher) = self.slasher.as_ref() { + let mut slashable_cache = self.observed_slashable.write(); for blob_sidecar in blobs.iter().filter_map(|blob| blob.clone()) { - self.observed_slashable - .write() + slashable_cache .observe_slashable( blob_sidecar.slot(), blob_sidecar.block_proposer_index(), diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 1b37139d2b7..d6d64da1f6b 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -58,6 +58,7 @@ use crate::execution_payload::{ is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block, AllowOptimisticImport, NotifyExecutionLayer, PayloadNotifier, }; +use crate::observed_block_producers::SeenBlock; use crate::snapshot_cache::PreProcessingSnapshot; use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::validator_pubkey_cache::ValidatorPubkeyCache; @@ -955,13 +956,17 @@ impl GossipVerifiedBlock { // // It's important to double-check that the proposer still hasn't been observed so we don't // have a race-condition when verifying two blocks simultaneously. - if chain + match chain .observed_block_producers .write() - .observe_proposal(block.message()) + .observe_proposal(block_root, block.message()) .map_err(|e| BlockError::BeaconChainError(e.into()))? { - return Err(BlockError::BlockIsAlreadyKnown); + SeenBlock::Slashable => { + return Err(BlockError::Slashable); + } + SeenBlock::Duplicate => return Err(BlockError::BlockIsAlreadyKnown), + SeenBlock::UniqueNonSlashable => {} }; if block.message().proposer_index() != expected_proposer as u64 { @@ -1242,14 +1247,15 @@ impl ExecutionPendingBlock { notify_execution_layer: NotifyExecutionLayer, ) -> Result> { chain - .observed_block_producers + .observed_slashable .write() - .observe_proposal(block.message()) + .observe_slashable(block.slot(), block.message().proposer_index(), block_root) .map_err(|e| BlockError::BeaconChainError(e.into()))?; + chain - .observed_slashable + .observed_block_producers .write() - .observe_slashable(block.slot(), block.message().proposer_index(), block_root) + .observe_proposal(block_root, block.message()) .map_err(|e| BlockError::BeaconChainError(e.into()))?; if let Some(parent) = chain diff --git a/beacon_node/beacon_chain/src/observed_block_producers.rs b/beacon_node/beacon_chain/src/observed_block_producers.rs index 1e44190f6d5..096c8bff77d 100644 --- a/beacon_node/beacon_chain/src/observed_block_producers.rs +++ b/beacon_node/beacon_chain/src/observed_block_producers.rs @@ -1,9 +1,10 @@ //! Provides the `ObservedBlockProducers` struct which allows for rejecting gossip blocks from //! validators that have already produced a block. -use std::collections::HashSet; +use std::collections::hash_map::Entry; +use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; -use types::{BeaconBlockRef, Epoch, EthSpec, Slot, Unsigned}; +use types::{BeaconBlockRef, Epoch, EthSpec, Hash256, Slot, Unsigned}; #[derive(Debug, PartialEq)] pub enum Error { @@ -39,7 +40,7 @@ impl ProposalKey { /// known_distinct_shufflings` which is much smaller. pub struct ObservedBlockProducers { finalized_slot: Slot, - items: HashSet, + items: HashMap>, _phantom: PhantomData, } @@ -48,12 +49,30 @@ impl Default for ObservedBlockProducers { fn default() -> Self { Self { finalized_slot: Slot::new(0), - items: HashSet::new(), + items: HashMap::new(), _phantom: PhantomData, } } } +pub enum SeenBlock { + Duplicate, + Slashable, + UniqueNonSlashable, +} + +impl SeenBlock { + pub fn proposer_previously_observed(self) -> bool { + match self { + Self::Duplicate | Self::Slashable => true, + Self::UniqueNonSlashable => false, + } + } + pub fn is_slashable(&self) -> bool { + matches!(self, Self::Slashable) + } +} + impl ObservedBlockProducers { /// Observe that the `block` was produced by `block.proposer_index` at `block.slot`. This will /// update `self` so future calls to it indicate that this block is known. @@ -64,7 +83,11 @@ impl ObservedBlockProducers { /// /// - `block.proposer_index` is greater than `VALIDATOR_REGISTRY_LIMIT`. /// - `block.slot` is equal to or less than the latest pruned `finalized_slot`. - pub fn observe_proposal(&mut self, block: BeaconBlockRef<'_, E>) -> Result { + pub fn observe_proposal( + &mut self, + block_root: Hash256, + block: BeaconBlockRef<'_, E>, + ) -> Result { self.sanitize_block(block)?; let key = ProposalKey { @@ -72,9 +95,69 @@ impl ObservedBlockProducers { proposer: block.proposer_index(), }; - let already_exists = self.items.insert(key); + let entry = self.items.entry(key); + + let slashable_proposal = match entry { + Entry::Occupied(mut occupied_entry) => { + let block_roots = occupied_entry.get_mut(); + let newly_inserted = block_roots.insert(block_root); + + let is_equivocation = block_roots.len() > 1; + + if is_equivocation { + SeenBlock::Slashable + } else if !newly_inserted { + SeenBlock::Duplicate + } else { + SeenBlock::UniqueNonSlashable + } + } + Entry::Vacant(vacant_entry) => { + let block_roots = HashSet::from([block_root]); + vacant_entry.insert(block_roots); + + SeenBlock::UniqueNonSlashable + } + }; - Ok(!already_exists) + Ok(slashable_proposal) + } + + /// Returns `Ok(true)` if the `block` has been observed before, `Ok(false)` if not. Does not + /// update the cache, so calling this function multiple times will continue to return + /// `Ok(false)`, until `Self::observe_proposer` is called. + /// + /// ## Errors + /// + /// - `block.proposer_index` is greater than `VALIDATOR_REGISTRY_LIMIT`. + /// - `block.slot` is equal to or less than the latest pruned `finalized_slot`. + pub fn proposer_has_been_observed( + &self, + block: BeaconBlockRef<'_, E>, + block_root: Hash256, + ) -> Result { + self.sanitize_block(block)?; + + let key = ProposalKey { + slot: block.slot(), + proposer: block.proposer_index(), + }; + + if let Some(block_roots) = self.items.get(&key) { + let block_already_known = block_roots.contains(&block_root); + let no_prev_known_blocks = + block_roots.difference(&HashSet::from([block_root])).count() == 0; + + if !no_prev_known_blocks { + Ok(SeenBlock::Slashable) + } else if block_already_known { + Ok(SeenBlock::Duplicate) + } else { + Ok(SeenBlock::UniqueNonSlashable) + } + } else { + Ok(SeenBlock::UniqueNonSlashable) + } } /// Returns `Ok(())` if the given `block` is sane. @@ -106,14 +189,14 @@ impl ObservedBlockProducers { } self.finalized_slot = finalized_slot; - self.items.retain(|key| key.slot > finalized_slot); + self.items.retain(|key, _| key.slot > finalized_slot); } /// Returns `true` if the given `validator_index` has been stored in `self` at `epoch`. /// /// This is useful for doppelganger detection. pub fn index_seen_at_epoch(&self, validator_index: u64, epoch: Epoch) -> bool { - self.items.iter().any(|key| { + self.items.iter().any(|(key, _)| { key.slot.epoch(E::slots_per_epoch()) == epoch && key.proposer == validator_index }) } @@ -142,9 +225,12 @@ mod tests { // Slot 0, proposer 0 let block_a = get_block(0, 0); + let block_root = block_a.canonical_root(); assert_eq!( - cache.observe_proposal(block_a.to_ref()), + cache + .observe_proposal(block_root, block_a.to_ref()) + .map(SeenBlock::proposer_previously_observed), Ok(false), "can observe proposer, indicates proposer unobserved" ); @@ -155,14 +241,16 @@ mod tests { assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); assert_eq!(cache.items.len(), 1, "only one slot should be present"); - assert!( + assert_eq!( cache .items .get(&ProposalKey { slot: Slot::new(0), proposer: 0 }) - .is_some(), + .expect("slot zero should be present") + .len(), + 1, "only one proposer should be present" ); @@ -174,14 +262,16 @@ mod tests { assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); assert_eq!(cache.items.len(), 1, "only one slot should be present"); - assert!( + assert_eq!( cache .items .get(&ProposalKey { slot: Slot::new(0), proposer: 0 }) - .is_some(), + .expect("slot zero should be present") + .len(), + 1, "only one proposer should be present" ); @@ -203,9 +293,12 @@ mod tests { // First slot of finalized epoch, proposer 0 let block_b = get_block(E::slots_per_epoch(), 0); + let block_root_b = block_b.canonical_root(); assert_eq!( - cache.observe_proposal(block_b.to_ref()), + cache + .observe_proposal(block_root_b, block_b.to_ref()) + .map(SeenBlock::proposer_previously_observed), Err(Error::FinalizedBlock { slot: E::slots_per_epoch().into(), finalized_slot: E::slots_per_epoch().into(), @@ -225,20 +318,24 @@ mod tests { let block_b = get_block(three_epochs, 0); assert_eq!( - cache.observe_proposal(block_b.to_ref()), + cache + .observe_proposal(block_root_b, block_b.to_ref()) + .map(SeenBlock::proposer_previously_observed), Ok(false), "can insert non-finalized block" ); assert_eq!(cache.items.len(), 1, "only one slot should be present"); - assert!( + assert_eq!( cache .items .get(&ProposalKey { slot: Slot::new(three_epochs), proposer: 0 }) - .is_some(), + .expect("the three epochs slot should be present") + .len(), + 1, "only one proposer should be present" ); @@ -256,14 +353,16 @@ mod tests { ); assert_eq!(cache.items.len(), 1, "only one slot should be present"); - assert!( + assert_eq!( cache .items .get(&ProposalKey { slot: Slot::new(three_epochs), proposer: 0 }) - .is_some(), + .expect("the three epochs slot should be present") + .len(), + 1, "only one proposer should be present" ); } @@ -274,78 +373,141 @@ mod tests { // Slot 0, proposer 0 let block_a = get_block(0, 0); + let block_root_a = block_a.canonical_root(); assert_eq!( - cache.observe_proposal(block_a.to_ref()), + cache + .proposer_has_been_observed(block_a.to_ref(), block_a.canonical_root()) + .map(|x| x.proposer_previously_observed()), + Ok(false), + "no observation in empty cache" + ); + assert_eq!( + cache + .observe_proposal(block_root_a, block_a.to_ref()) + .map(SeenBlock::proposer_previously_observed), Ok(false), "can observe proposer, indicates proposer unobserved" ); assert_eq!( - cache.observe_proposal(block_a.to_ref()), + cache + .proposer_has_been_observed(block_a.to_ref(), block_a.canonical_root()) + .map(|x| x.proposer_previously_observed()), + Ok(true), + "observed block is indicated as true" + ); + assert_eq!( + cache + .observe_proposal(block_root_a, block_a.to_ref()) + .map(SeenBlock::proposer_previously_observed), Ok(true), "observing again indicates true" ); assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); assert_eq!(cache.items.len(), 1, "only one slot should be present"); - assert!( + assert_eq!( cache .items .get(&ProposalKey { slot: Slot::new(0), proposer: 0 }) - .is_some(), + .expect("slot zero should be present") + .len(), + 1, "only one proposer should be present" ); // Slot 1, proposer 0 let block_b = get_block(1, 0); + let block_root_b = block_b.canonical_root(); assert_eq!( - cache.observe_proposal(block_b.to_ref()), + cache + .proposer_has_been_observed(block_b.to_ref(), block_b.canonical_root()) + .map(|x| x.proposer_previously_observed()), + Ok(false), + "no observation for new slot" + ); + assert_eq!( + cache + .observe_proposal(block_root_b, block_b.to_ref()) + .map(SeenBlock::proposer_previously_observed), Ok(false), "can observe proposer for new slot, indicates proposer unobserved" ); assert_eq!( - cache.observe_proposal(block_b.to_ref()), + cache + .proposer_has_been_observed(block_b.to_ref(), block_b.canonical_root()) + .map(|x| x.proposer_previously_observed()), + Ok(true), + "observed block in slot 1 is indicated as true" + ); + assert_eq!( + cache + .observe_proposal(block_root_b, block_b.to_ref()) + .map(SeenBlock::proposer_previously_observed), Ok(true), "observing slot 1 again indicates true" ); assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); assert_eq!(cache.items.len(), 2, "two slots should be present"); - assert!( + assert_eq!( cache .items .get(&ProposalKey { slot: Slot::new(0), proposer: 0 }) - .is_some(), + .expect("slot zero should be present") + .len(), + 1, "only one proposer should be present in slot 0" ); - assert!( + assert_eq!( cache .items .get(&ProposalKey { slot: Slot::new(1), proposer: 0 }) - .is_some(), + .expect("slot zero should be present") + .len(), + 1, "only one proposer should be present in slot 1" ); // Slot 0, proposer 1 let block_c = get_block(0, 1); + let block_root_c = block_c.canonical_root(); assert_eq!( - cache.observe_proposal(block_c.to_ref()), + cache + .proposer_has_been_observed(block_c.to_ref(), block_c.canonical_root()) + .map(|x| x.proposer_previously_observed()), + Ok(false), + "no observation for new proposer" + ); + assert_eq!( + cache + .observe_proposal(block_root_c, block_c.to_ref()) + .map(SeenBlock::proposer_previously_observed), Ok(false), "can observe new proposer, indicates proposer unobserved" ); assert_eq!( - cache.observe_proposal(block_c.to_ref()), + cache + .proposer_has_been_observed(block_c.to_ref(), block_c.canonical_root()) + .map(|x| x.proposer_previously_observed()), + Ok(true), + "observed new proposer block is indicated as true" + ); + assert_eq!( + cache + .observe_proposal(block_root_c, block_c.to_ref()) + .map(SeenBlock::proposer_previously_observed), Ok(true), "observing new proposer again indicates true" ); @@ -356,7 +518,7 @@ mod tests { cache .items .iter() - .filter(|k| k.slot == cache.finalized_slot) + .filter(|(k, _)| k.slot == cache.finalized_slot) .count(), 2, "two proposers should be present in slot 0" @@ -365,7 +527,7 @@ mod tests { cache .items .iter() - .filter(|k| k.slot == Slot::new(1)) + .filter(|(k, _)| k.slot == Slot::new(1)) .count(), 1, "only one proposer should be present in slot 1" diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index e8d783c182c..8b037715405 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -19,9 +19,9 @@ use std::time::Duration; use tokio::sync::mpsc::UnboundedSender; use tree_hash::TreeHash; use types::{ - AbstractExecPayload, BeaconBlockRef, BlobSidecar, BlobSidecarList, EthSpec, ExecPayload, - ExecutionBlockHash, ForkName, FullPayload, FullPayloadMerge, Hash256, SignedBeaconBlock, - SignedBlindedBeaconBlock, VariableList, + AbstractExecPayload, BeaconBlockRef, BlobSidecarList, EthSpec, ExecPayload, ExecutionBlockHash, + ForkName, FullPayload, FullPayloadMerge, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, + VariableList, }; use warp::http::StatusCode; use warp::{reply::Response, Rejection, Reply}; @@ -60,7 +60,7 @@ pub async fn publish_block (block_contents, true), ProvenancedBlock::Builder(block_contents, _) => (block_contents, false), }; - let block = block_contents.inner_block(); + let block = block_contents.inner_block().clone(); let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock); debug!(log, "Signed block received in HTTP API"; "slot" => block.slot()); @@ -117,22 +117,6 @@ pub async fn publish_block { - if let Err(e) = check_slashable( - &chain_clone, - &None, - block_root.unwrap_or(block.canonical_root()), - &block, - &log_clone, - ) { - warn!( - log, - "Not publishing block - not gossip verified"; - "slot" => slot, - "error" => ?e - ); - return Err(warp_utils::reject::custom_bad_request(e.to_string())); - } - // Allow the status code for duplicate blocks to be overridden based on config. return Ok(warp::reply::with_status( warp::reply::json(&ErrorMessage {