diff --git a/Cargo.lock b/Cargo.lock index e41616d7dd1..760feb588cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -860,7 +860,7 @@ dependencies = [ [[package]] name = "beacon_node" -version = "7.0.0-beta.0" +version = "7.0.0-beta.1" dependencies = [ "account_utils", "beacon_chain", @@ -1108,7 +1108,7 @@ dependencies = [ [[package]] name = "boot_node" -version = "7.0.0-beta.0" +version = "7.0.0-beta.1" dependencies = [ "beacon_node", "bytes", @@ -4811,7 +4811,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "lcli" -version = "7.0.0-beta.0" +version = "7.0.0-beta.1" dependencies = [ "account_utils", "beacon_chain", @@ -5366,7 +5366,7 @@ dependencies = [ [[package]] name = "lighthouse" -version = "7.0.0-beta.0" +version = "7.0.0-beta.1" dependencies = [ "account_manager", "account_utils", diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index f6948e87434..c441ca779e7 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "beacon_node" -version = "7.0.0-beta.0" +version = "7.0.0-beta.1" authors = [ "Paul Hauner ", "Age Manning BeaconChain { let proposer_index = if let Some(proposer) = cached_proposer { proposer.index as u64 } else { - if head_epoch + 2 < proposal_epoch { + if 2 + 2 == 5 && head_epoch + 2 < proposal_epoch { warn!( self.log, "Skipping proposer preparation"; @@ -6089,8 +6089,10 @@ impl BeaconChain { // // This prevents the routine from running during sync. let head_slot = cached_head.head_slot(); - if head_slot + T::EthSpec::slots_per_epoch() * PREPARE_PROPOSER_HISTORIC_EPOCHS - < current_slot + if 2 + 2 == 5 + && head_slot + + T::EthSpec::slots_per_epoch() * PREPARE_PROPOSER_HISTORIC_EPOCHS + < current_slot { debug!( chain.log, @@ -7280,6 +7282,31 @@ impl BeaconChain { Ok(None) } + + /// Retrieves block roots (in ascending slot order) within some slot range from fork choice. + pub fn block_roots_from_fork_choice(&self, start_slot: u64, count: u64) -> Vec { + let head_block_root = self.canonical_head.cached_head().head_block_root(); + let fork_choice_read_lock = self.canonical_head.fork_choice_read_lock(); + let block_roots_iter = fork_choice_read_lock + .proto_array() + .iter_block_roots(&head_block_root); + let end_slot = start_slot.saturating_add(count); + let mut roots = vec![]; + + for (root, slot) in block_roots_iter { + if slot < end_slot && slot >= start_slot { + roots.push(root); + } + if slot < start_slot { + break; + } + } + + drop(fork_choice_read_lock); + // return in ascending slot order + roots.reverse(); + roots + } } impl Drop for BeaconChain { diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 12652763763..5f8c90d12e8 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -90,6 +90,7 @@ use std::borrow::Cow; use std::fmt::Debug; use std::fs; use std::io::Write; +use std::str::FromStr; use std::sync::Arc; use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp}; use strum::AsRefStr; @@ -146,7 +147,9 @@ pub enum BlockError { /// /// It's unclear if this block is valid, but it cannot be processed without already knowing /// its parent. - ParentUnknown { parent_root: Hash256 }, + ParentUnknown { + parent_root: Hash256, + }, /// The block slot is greater than the present slot. /// /// ## Peer scoring @@ -161,7 +164,10 @@ pub enum BlockError { /// ## Peer scoring /// /// The peer has incompatible state transition logic and is faulty. - StateRootMismatch { block: Hash256, local: Hash256 }, + StateRootMismatch { + block: Hash256, + local: Hash256, + }, /// The block was a genesis block, these blocks cannot be re-imported. GenesisBlock, /// The slot is finalized, no need to import. @@ -180,7 +186,9 @@ pub enum BlockError { /// /// It's unclear if this block is valid, but it conflicts with finality and shouldn't be /// imported. - NotFinalizedDescendant { block_parent_root: Hash256 }, + NotFinalizedDescendant { + block_parent_root: Hash256, + }, /// Block is already known and valid, no need to re-import. /// /// ## Peer scoring @@ -207,7 +215,10 @@ pub enum BlockError { /// ## Peer scoring /// /// The block is invalid and the peer is faulty. - IncorrectBlockProposer { block: u64, local_shuffling: u64 }, + IncorrectBlockProposer { + block: u64, + local_shuffling: u64, + }, /// The `block.proposal_index` is not known. /// /// ## Peer scoring @@ -225,7 +236,10 @@ pub enum BlockError { /// ## Peer scoring /// /// The block is invalid and the peer is faulty. - BlockIsNotLaterThanParent { block_slot: Slot, parent_slot: Slot }, + BlockIsNotLaterThanParent { + block_slot: Slot, + parent_slot: Slot, + }, /// At least one block in the chain segment did not have it's parent root set to the root of /// the prior block. /// @@ -281,7 +295,10 @@ pub enum BlockError { /// If it's actually our fault (e.g. our execution node database is corrupt) we have bigger /// problems to worry about than losing peers, and we're doing the network a favour by /// disconnecting. - ParentExecutionPayloadInvalid { parent_root: Hash256 }, + ParentExecutionPayloadInvalid { + parent_root: Hash256, + }, + KnownInvalidExecutionPayload(Hash256), /// The block is a slashable equivocation from the proposer. /// /// ## Peer scoring @@ -1326,6 +1343,13 @@ impl ExecutionPendingBlock { chain: &Arc>, notify_execution_layer: NotifyExecutionLayer, ) -> Result { + if block_root + == Hash256::from_str("2db899881ed8546476d0b92c6aa9110bea9a4cd0dbeb5519eb0ea69575f1f359") + .expect("valid hash") + { + return Err(BlockError::KnownInvalidExecutionPayload(block_root)); + } + chain .observed_slashable .write() diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index fcdd57abbc8..99199d813bd 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -94,6 +94,8 @@ pub struct ChainConfig { /// The delay in milliseconds applied by the node between sending each blob or data column batch. /// This doesn't apply if the node is the block proposer. pub blob_publication_batch_interval: Duration, + pub disable_attesting: bool, + pub sync_tolerance_epochs: u64, } impl Default for ChainConfig { @@ -129,6 +131,8 @@ impl Default for ChainConfig { enable_sampling: false, blob_publication_batches: 4, blob_publication_batch_interval: Duration::from_millis(300), + disable_attesting: false, + sync_tolerance_epochs: 16, } } } diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 2fb3ec06bf9..7bd4807cc99 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -46,11 +46,11 @@ tree_hash = { workspace = true } types = { workspace = true } warp = { workspace = true } warp_utils = { workspace = true } +proto_array = { workspace = true } [dev-dependencies] genesis = { workspace = true } logging = { workspace = true } -proto_array = { workspace = true } serde_json = { workspace = true } [[test]] diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 5d75dc8c9a0..38758fe5c31 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -107,13 +107,6 @@ use warp_utils::{query::multi_key_query, reject::convert_rejection, uor::Unifyin const API_PREFIX: &str = "eth"; -/// If the node is within this many epochs from the head, we declare it to be synced regardless of -/// the network sync state. -/// -/// This helps prevent attacks where nodes can convince us that we're syncing some non-existent -/// finalized head. -const SYNC_TOLERANCE_EPOCHS: u64 = 8; - /// A custom type which allows for both unsecured and TLS-enabled HTTP servers. type HttpServer = (SocketAddr, Pin + Send>>); @@ -157,6 +150,7 @@ pub struct Config { pub duplicate_block_status_code: StatusCode, pub enable_light_client_server: bool, pub target_peers: usize, + pub sync_tolerance_epochs: usize, } impl Default for Config { @@ -173,6 +167,7 @@ impl Default for Config { duplicate_block_status_code: StatusCode::ACCEPTED, enable_light_client_server: true, target_peers: 100, + sync_tolerance_epochs: 16, } } } @@ -473,7 +468,8 @@ pub fn serve( ) })?; - let tolerance = SYNC_TOLERANCE_EPOCHS * T::EthSpec::slots_per_epoch(); + let tolerance = + chain.config.sync_tolerance_epochs * T::EthSpec::slots_per_epoch(); if head_slot + tolerance >= current_slot { Ok(()) @@ -1859,6 +1855,13 @@ pub fn serve( network_tx: UnboundedSender>, reprocess_tx: Option>, log: Logger| async move { + if chain.config.disable_attesting { + return convert_rejection::>(Err( + warp_utils::reject::custom_bad_request("Attesting disabled".to_string()), + )) + .await; + } + let attestations = attestations.into_iter().map(Either::Left).collect(); let result = crate::publish_attestations::publish_attestations( task_spawner, @@ -1891,6 +1894,12 @@ pub fn serve( network_tx: UnboundedSender>, reprocess_tx: Option>, log: Logger| async move { + if chain.config.disable_attesting { + return convert_rejection::>(Err( + warp_utils::reject::custom_bad_request("Attesting disabled".to_string()), + )) + .await; + } let attestations = match crate::publish_attestations::deserialize_attestation_payload::( payload, fork_name, &log, @@ -1937,49 +1946,66 @@ pub fn serve( task_spawner: TaskSpawner, chain: Arc>, query: api_types::AttestationPoolQuery| { - task_spawner.blocking_response_task(Priority::P1, move || { - let query_filter = |data: &AttestationData| { - query.slot.is_none_or(|slot| slot == data.slot) - && query - .committee_index - .is_none_or(|index| index == data.index) - }; + async move { + if chain.config.disable_attesting { + return convert_rejection::>(Err( + warp_utils::reject::custom_bad_request( + "Attesting disabled".to_string(), + ), + )) + .await; + } + task_spawner + .blocking_response_task(Priority::P1, move || { + let query_filter = |data: &AttestationData| { + query.slot.is_none_or(|slot| slot == data.slot) + && query + .committee_index + .is_none_or(|index| index == data.index) + }; - let mut attestations = chain.op_pool.get_filtered_attestations(query_filter); - attestations.extend( - chain - .naive_aggregation_pool - .read() - .iter() - .filter(|&att| query_filter(att.data())) - .cloned(), - ); - // Use the current slot to find the fork version, and convert all messages to the - // current fork's format. This is to ensure consistent message types matching - // `Eth-Consensus-Version`. - let current_slot = - chain - .slot_clock - .now() - .ok_or(warp_utils::reject::custom_server_error( - "unable to read slot clock".to_string(), - ))?; - let fork_name = chain.spec.fork_name_at_slot::(current_slot); - let attestations = attestations - .into_iter() - .filter(|att| { - (fork_name.electra_enabled() && matches!(att, Attestation::Electra(_))) - || (!fork_name.electra_enabled() - && matches!(att, Attestation::Base(_))) - }) - .collect::>(); + let mut attestations = + chain.op_pool.get_filtered_attestations(query_filter); + attestations.extend( + chain + .naive_aggregation_pool + .read() + .iter() + .filter(|&att| query_filter(att.data())) + .cloned(), + ); + // Use the current slot to find the fork version, and convert all messages to the + // current fork's format. This is to ensure consistent message types matching + // `Eth-Consensus-Version`. + let current_slot = chain.slot_clock.now().ok_or( + warp_utils::reject::custom_server_error( + "unable to read slot clock".to_string(), + ), + )?; + let fork_name = + chain.spec.fork_name_at_slot::(current_slot); + let attestations = attestations + .into_iter() + .filter(|att| { + (fork_name.electra_enabled() + && matches!(att, Attestation::Electra(_))) + || (!fork_name.electra_enabled() + && matches!(att, Attestation::Base(_))) + }) + .collect::>(); - let res = fork_versioned_response(endpoint_version, fork_name, &attestations)?; - Ok(add_consensus_version_header( - warp::reply::json(&res).into_response(), - fork_name, - )) - }) + let res = fork_versioned_response( + endpoint_version, + fork_name, + &attestations, + )?; + Ok(add_consensus_version_header( + warp::reply::json(&res).into_response(), + fork_name, + )) + }) + .await + } }, ); @@ -2200,12 +2226,24 @@ pub fn serve( signatures: Vec, network_tx: UnboundedSender>, log: Logger| { - task_spawner.blocking_json_task(Priority::P0, move || { - sync_committees::process_sync_committee_signatures( - signatures, network_tx, &chain, log, - )?; - Ok(api_types::GenericResponse::from(())) - }) + async move { + if chain.config.disable_attesting { + return convert_rejection::>(Err( + warp_utils::reject::custom_bad_request( + "Attesting disabled".to_string(), + ), + )) + .await; + } + task_spawner + .blocking_json_task(Priority::P0, move || { + sync_committees::process_sync_committee_signatures( + signatures, network_tx, &chain, log, + )?; + Ok(api_types::GenericResponse::from(())) + }) + .await + } }, ); @@ -3419,10 +3457,22 @@ pub fn serve( indices: api_types::ValidatorIndexData, task_spawner: TaskSpawner, chain: Arc>| { - task_spawner.blocking_json_task(Priority::P0, move || { - not_synced_filter?; - attester_duties::attester_duties(epoch, &indices.0, &chain) - }) + async move { + if chain.config.disable_attesting { + return convert_rejection::>(Err( + warp_utils::reject::custom_bad_request( + "Attesting disabled".to_string(), + ), + )) + .await; + } + task_spawner + .blocking_json_task(Priority::P0, move || { + not_synced_filter?; + attester_duties::attester_duties(epoch, &indices.0, &chain) + }) + .await + } }, ); @@ -3447,10 +3497,22 @@ pub fn serve( indices: api_types::ValidatorIndexData, task_spawner: TaskSpawner, chain: Arc>| { - task_spawner.blocking_json_task(Priority::P0, move || { - not_synced_filter?; - sync_committees::sync_committee_duties(epoch, &indices.0, &chain) - }) + async move { + if chain.config.disable_attesting { + return convert_rejection::>(Err( + warp_utils::reject::custom_bad_request( + "Attesting disabled".to_string(), + ), + )) + .await; + } + task_spawner + .blocking_json_task(Priority::P0, move || { + not_synced_filter?; + sync_committees::sync_committee_duties(epoch, &indices.0, &chain) + }) + .await + } }, ); @@ -3468,23 +3530,35 @@ pub fn serve( not_synced_filter: Result<(), Rejection>, task_spawner: TaskSpawner, chain: Arc>| { - task_spawner.blocking_json_task(Priority::P0, move || { - not_synced_filter?; - chain - .get_aggregated_sync_committee_contribution(&sync_committee_data) - .map_err(|e| { - warp_utils::reject::custom_bad_request(format!( - "unable to fetch sync contribution: {:?}", - e - )) - })? - .map(api_types::GenericResponse::from) - .ok_or_else(|| { - warp_utils::reject::custom_not_found( - "no matching sync contribution found".to_string(), - ) + async move { + if chain.config.disable_attesting { + return convert_rejection::>(Err( + warp_utils::reject::custom_bad_request( + "Attesting disabled".to_string(), + ), + )) + .await; + } + task_spawner + .blocking_json_task(Priority::P0, move || { + not_synced_filter?; + chain + .get_aggregated_sync_committee_contribution(&sync_committee_data) + .map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "unable to fetch sync contribution: {:?}", + e + )) + })? + .map(api_types::GenericResponse::from) + .ok_or_else(|| { + warp_utils::reject::custom_not_found( + "no matching sync contribution found".to_string(), + ) + }) }) - }) + .await + } }, ); @@ -3509,6 +3583,15 @@ pub fn serve( chain: Arc>, aggregates: Vec>, network_tx: UnboundedSender>, log: Logger| { + async move { + if chain.config.disable_attesting { + return convert_rejection::>(Err( + warp_utils::reject::custom_bad_request( + "Attesting disabled".to_string(), + ), + )) + .await; + } task_spawner.blocking_json_task(Priority::P0, move || { not_synced_filter?; let seen_timestamp = timestamp_now(); @@ -3604,7 +3687,8 @@ pub fn serve( } else { Ok(()) } - }) + }).await + } }, ); @@ -3625,36 +3709,58 @@ pub fn serve( contributions: Vec>, network_tx: UnboundedSender>, log: Logger| { - task_spawner.blocking_json_task(Priority::P0, move || { - not_synced_filter?; - sync_committees::process_signed_contribution_and_proofs( - contributions, - network_tx, - &chain, - log, - )?; - Ok(api_types::GenericResponse::from(())) - }) + async move { + if chain.config.disable_attesting { + return convert_rejection::>(Err( + warp_utils::reject::custom_bad_request( + "Attesting disabled".to_string(), + ), + )) + .await; + } + task_spawner + .blocking_json_task(Priority::P0, move || { + not_synced_filter?; + sync_committees::process_signed_contribution_and_proofs( + contributions, + network_tx, + &chain, + log, + )?; + Ok(api_types::GenericResponse::from(())) + }) + .await + } }, ); // POST validator/beacon_committee_subscriptions - let post_validator_beacon_committee_subscriptions = eth_v1 - .and(warp::path("validator")) - .and(warp::path("beacon_committee_subscriptions")) - .and(warp::path::end()) - .and(warp_utils::json::json()) - .and(validator_subscription_tx_filter.clone()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .and(log_filter.clone()) - .then( - |subscriptions: Vec, - validator_subscription_tx: Sender, - task_spawner: TaskSpawner, - chain: Arc>, - log: Logger| { - task_spawner.blocking_json_task(Priority::P0, move || { + let post_validator_beacon_committee_subscriptions = + eth_v1 + .and(warp::path("validator")) + .and(warp::path("beacon_committee_subscriptions")) + .and(warp::path::end()) + .and(warp_utils::json::json()) + .and(validator_subscription_tx_filter.clone()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .and(log_filter.clone()) + .then( + |subscriptions: Vec, + validator_subscription_tx: Sender, + task_spawner: TaskSpawner, + chain: Arc>, + log: Logger| { + async move { + if chain.config.disable_attesting { + return convert_rejection::>(Err( + warp_utils::reject::custom_bad_request( + "Attesting disabled".to_string(), + ), + )) + .await; + } + task_spawner.blocking_json_task(Priority::P0, move || { let subscriptions: std::collections::BTreeSet<_> = subscriptions .iter() .map(|subscription| { @@ -3686,9 +3792,10 @@ pub fn serve( } Ok(()) - }) - }, - ); + }).await + } + }, + ); // POST validator/prepare_beacon_proposer let post_validator_prepare_beacon_proposer = eth_v1 @@ -3945,6 +4052,15 @@ pub fn serve( chain: Arc>, log: Logger | { + async move { + if chain.config.disable_attesting { + return convert_rejection::>(Err( + warp_utils::reject::custom_bad_request( + "Attesting disabled".to_string(), + ), + )) + .await; + } task_spawner.blocking_json_task(Priority::P0, move || { for subscription in subscriptions { chain @@ -3969,7 +4085,8 @@ pub fn serve( } Ok(()) - }) + }).await + } }, ); @@ -4422,6 +4539,34 @@ pub fn serve( }, ); + let post_lighthouse_fork_choice_invalidate = warp::path("lighthouse") + .and(warp::path("fork_choice")) + .and(warp::path("invalidate")) + .and(warp::path::end()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .and(warp_utils::json::json()) + .then( + |task_spawner: TaskSpawner, + chain: Arc>, + block_root: Hash256| { + task_spawner.blocking_json_task(Priority::P0, move || { + let invalidation = + proto_array::InvalidationOperation::InvalidateOne { block_root }; + chain + .canonical_head + .fork_choice_write_lock() + .on_invalid_execution_payload(&invalidation) + .map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "not invalidated due to error: {e:?}" + )) + })?; + Ok("invalidated") + }) + }, + ); + // GET lighthouse/analysis/block_rewards let get_lighthouse_block_rewards = warp::path("lighthouse") .and(warp::path("analysis")) @@ -4783,6 +4928,7 @@ pub fn serve( .uor(post_validator_liveness_epoch) .uor(post_lighthouse_liveness) .uor(post_lighthouse_database_reconstruct) + .uor(post_lighthouse_fork_choice_invalidate) .uor(post_lighthouse_block_rewards) .uor(post_lighthouse_ui_validator_metrics) .uor(post_lighthouse_ui_validator_info) diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 090b963cbc2..0956c153a68 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1457,6 +1457,7 @@ impl NetworkBeaconProcessor { | Err(e @ BlockError::InconsistentFork(_)) | Err(e @ BlockError::ExecutionPayloadError(_)) | Err(e @ BlockError::ParentExecutionPayloadInvalid { .. }) + | Err(e @ BlockError::KnownInvalidExecutionPayload(_)) | Err(e @ BlockError::GenesisBlock) => { warn!(self.log, "Could not verify block for gossip. Rejecting the block"; "error" => %e); diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 67a15702756..341177e1371 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -17,7 +17,7 @@ use std::collections::{hash_map::Entry, HashMap}; use std::sync::Arc; use tokio_stream::StreamExt; use types::blob_sidecar::BlobIdentifier; -use types::{Epoch, EthSpec, FixedBytesExtended, Hash256, Slot}; +use types::{Epoch, EthSpec, Hash256, Slot}; impl NetworkBeaconProcessor { /* Auxiliary functions */ @@ -93,20 +93,42 @@ impl NetworkBeaconProcessor { // current slot. This could be because they are using a different genesis time, or that // their or our system's clock is incorrect. Some("Different system clocks or genesis time".to_string()) - } else if remote.finalized_epoch <= local.finalized_epoch - && remote.finalized_root != Hash256::zero() - && local.finalized_root != Hash256::zero() - && self - .chain - .block_root_at_slot(start_slot(remote.finalized_epoch), WhenSlotSkipped::Prev) - .map(|root_opt| root_opt != Some(remote.finalized_root))? + } else if (remote.finalized_epoch == local.finalized_epoch + && remote.finalized_root == local.finalized_root) + || remote.finalized_root.is_zero() + || local.finalized_root.is_zero() + || remote.finalized_epoch > local.finalized_epoch { - // The remote's finalized epoch is less than or equal to ours, but the block root is - // different to the one in our chain. Therefore, the node is on a different chain and we - // should not communicate with them. - Some("Different finalized chain".to_string()) - } else { + // Fast path. Remote finalized checkpoint is either identical, or genesis, or we are at + // genesis, or they are ahead. In all cases, we should allow this peer to connect to us + // so we can sync from them. None + } else { + // Remote finalized epoch is less than ours. + let remote_finalized_slot = start_slot(remote.finalized_epoch); + if remote_finalized_slot < self.chain.store.get_oldest_block_slot() { + // Peer's finalized checkpoint is older than anything in our DB. We are unlikely + // to be able to help them sync. + Some("Old finality out of range".to_string()) + } else if remote_finalized_slot < self.chain.store.get_split_slot() { + // Peer's finalized slot is in range for a quick block root check in our freezer DB. + // If that block root check fails, reject them as they're on a different finalized + // chain. + if self + .chain + .block_root_at_slot(remote_finalized_slot, WhenSlotSkipped::Prev) + .map(|root_opt| root_opt != Some(remote.finalized_root))? + { + Some("Different finalized chain".to_string()) + } else { + None + } + } else { + // Peer's finality is older than ours, but newer than our split point, making a + // block root check infeasible. This case shouldn't happen particularly often so + // we give the peer the benefit of the doubt and let them connect to us. + None + } }; Ok(irrelevant_reason) @@ -653,86 +675,32 @@ impl NetworkBeaconProcessor { request_id: RequestId, req: BlocksByRangeRequest, ) -> Result<(), (RpcErrorResponse, &'static str)> { + let req_start_slot = *req.start_slot(); + let req_count = *req.count(); + debug!(self.log, "Received BlocksByRange Request"; "peer_id" => %peer_id, - "count" => req.count(), - "start_slot" => req.start_slot(), + "start_slot" => req_start_slot, + "count" => req_count, ); - let forwards_block_root_iter = match self - .chain - .forwards_iter_block_roots(Slot::from(*req.start_slot())) - { - Ok(iter) => iter, - Err(BeaconChainError::HistoricalBlockOutOfRange { - slot, - oldest_block_slot, - }) => { - debug!(self.log, "Range request failed during backfill"; - "requested_slot" => slot, - "oldest_known_slot" => oldest_block_slot - ); - return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); - } - Err(e) => { - error!(self.log, "Unable to obtain root iter"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; - - // Pick out the required blocks, ignoring skip-slots. - let mut last_block_root = None; - let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - iter.take_while(|(_, slot)| { - slot.as_u64() < req.start_slot().saturating_add(*req.count()) - }) - // map skip slots to None - .map(|(root, _)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .collect::>>() - }); - - let block_roots = match maybe_block_roots { - Ok(block_roots) => block_roots, - Err(e) => { - error!(self.log, "Error during iteration over blocks"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Iteration error")); - } - }; - - // remove all skip slots - let block_roots = block_roots.into_iter().flatten().collect::>(); - + let block_roots = + self.get_block_roots_for_slot_range(req_start_slot, req_count, "BlocksByRange")?; let current_slot = self .chain .slot() .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); - let log_results = |req: BlocksByRangeRequest, peer_id, blocks_sent| { - if blocks_sent < (*req.count() as usize) { + let log_results = |peer_id, blocks_sent| { + if blocks_sent < (req_count as usize) { debug!( self.log, "BlocksByRange outgoing response processed"; "peer" => %peer_id, "msg" => "Failed to return all requested blocks", - "start_slot" => req.start_slot(), + "start_slot" => req_start_slot, "current_slot" => current_slot, - "requested" => req.count(), + "requested" => req_count, "returned" => blocks_sent ); } else { @@ -740,9 +708,9 @@ impl NetworkBeaconProcessor { self.log, "BlocksByRange outgoing response processed"; "peer" => %peer_id, - "start_slot" => req.start_slot(), + "start_slot" => req_start_slot, "current_slot" => current_slot, - "requested" => req.count(), + "requested" => req_count, "returned" => blocks_sent ); } @@ -763,8 +731,7 @@ impl NetworkBeaconProcessor { Ok(Some(block)) => { // Due to skip slots, blocks could be out of the range, we ensure they // are in the range before sending - if block.slot() >= *req.start_slot() - && block.slot() < req.start_slot() + req.count() + if block.slot() >= req_start_slot && block.slot() < req_start_slot + req.count() { blocks_sent += 1; self.send_network_message(NetworkMessage::SendResponse { @@ -783,7 +750,7 @@ impl NetworkBeaconProcessor { "peer" => %peer_id, "request_root" => ?root ); - log_results(req, peer_id, blocks_sent); + log_results(peer_id, blocks_sent); return Err((RpcErrorResponse::ServerError, "Database inconsistency")); } Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { @@ -793,7 +760,7 @@ impl NetworkBeaconProcessor { "block_root" => ?root, "reason" => "execution layer not synced", ); - log_results(req, peer_id, blocks_sent); + log_results(peer_id, blocks_sent); // send the stream terminator return Err(( RpcErrorResponse::ResourceUnavailable, @@ -821,17 +788,121 @@ impl NetworkBeaconProcessor { "error" => ?e ); } - log_results(req, peer_id, blocks_sent); + log_results(peer_id, blocks_sent); // send the stream terminator return Err((RpcErrorResponse::ServerError, "Failed fetching blocks")); } } } - log_results(req, peer_id, blocks_sent); + log_results(peer_id, blocks_sent); Ok(()) } + fn get_block_roots_for_slot_range( + &self, + req_start_slot: u64, + req_count: u64, + req_type: &str, + ) -> Result, (RpcErrorResponse, &'static str)> { + let block_roots_timer = std::time::Instant::now(); + let finalized_slot = self + .chain + .canonical_head + .cached_head() + .finalized_checkpoint() + .epoch + .start_slot(T::EthSpec::slots_per_epoch()); + + let (block_roots, block_roots_source) = if req_start_slot >= finalized_slot.as_u64() { + ( + self.chain + .block_roots_from_fork_choice(req_start_slot, req_count), + "fork_choice", + ) + } else { + ( + self.get_block_roots_from_store(req_start_slot, req_count)?, + "store", + ) + }; + + debug!( + self.log, + "Range request block roots retrieved"; + "req_type" => req_type, + "start_slot" => req_start_slot, + "count" => req_count, + "block_roots_count" => block_roots.len(), + "block_roots_source" => block_roots_source, + "elapsed" => ?block_roots_timer.elapsed() + ); + + Ok(block_roots) + } + + /// Get block roots for a `BlocksByRangeRequest` from the store using roots iterator. + fn get_block_roots_from_store( + &self, + start_slot: u64, + count: u64, + ) -> Result, (RpcErrorResponse, &'static str)> { + let forwards_block_root_iter = + match self.chain.forwards_iter_block_roots(Slot::from(start_slot)) { + Ok(iter) => iter, + Err(BeaconChainError::HistoricalBlockOutOfRange { + slot, + oldest_block_slot, + }) => { + debug!(self.log, "Range request failed during backfill"; + "requested_slot" => slot, + "oldest_known_slot" => oldest_block_slot + ); + return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); + } + Err(e) => { + error!(self.log, "Unable to obtain root iter for range request"; + "start_slot" => start_slot, + "count" => count, + "error" => ?e + ); + return Err((RpcErrorResponse::ServerError, "Database error")); + } + }; + + // Pick out the required blocks, ignoring skip-slots. + let mut last_block_root = None; + let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { + iter.take_while(|(_, slot)| slot.as_u64() < start_slot.saturating_add(count)) + // map skip slots to None + .map(|(root, _)| { + let result = if Some(root) == last_block_root { + None + } else { + Some(root) + }; + last_block_root = Some(root); + result + }) + .collect::>>() + }); + + let block_roots = match maybe_block_roots { + Ok(block_roots) => block_roots, + Err(e) => { + error!(self.log, "Error during iteration over blocks for range request"; + "start_slot" => start_slot, + "count" => count, + "error" => ?e + ); + return Err((RpcErrorResponse::ServerError, "Iteration error")); + } + }; + + // remove all skip slots + Ok(block_roots.into_iter().flatten().collect::>()) + } + /// Handle a `BlobsByRange` request from the peer. pub fn handle_blobs_by_range_request( self: Arc, @@ -910,65 +981,8 @@ impl NetworkBeaconProcessor { }; } - let forwards_block_root_iter = - match self.chain.forwards_iter_block_roots(request_start_slot) { - Ok(iter) => iter, - Err(BeaconChainError::HistoricalBlockOutOfRange { - slot, - oldest_block_slot, - }) => { - debug!(self.log, "Range request failed during backfill"; - "requested_slot" => slot, - "oldest_known_slot" => oldest_block_slot - ); - return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); - } - Err(e) => { - error!(self.log, "Unable to obtain root iter"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; - - // Use `WhenSlotSkipped::Prev` to get the most recent block root prior to - // `request_start_slot` in order to check whether the `request_start_slot` is a skip. - let mut last_block_root = req.start_slot.checked_sub(1).and_then(|prev_slot| { - self.chain - .block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev) - .ok() - .flatten() - }); - - // Pick out the required blocks, ignoring skip-slots. - let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) - // map skip slots to None - .map(|(root, _)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .collect::>>() - }); - - let block_roots = match maybe_block_roots { - Ok(block_roots) => block_roots, - Err(e) => { - error!(self.log, "Error during iteration over blocks"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; + let block_roots = + self.get_block_roots_for_slot_range(req.start_slot, req.count, "BlobsByRange")?; let current_slot = self .chain @@ -987,8 +1001,6 @@ impl NetworkBeaconProcessor { ); }; - // remove all skip slots - let block_roots = block_roots.into_iter().flatten(); let mut blobs_sent = 0; for root in block_roots { @@ -1114,68 +1126,8 @@ impl NetworkBeaconProcessor { }; } - let forwards_block_root_iter = - match self.chain.forwards_iter_block_roots(request_start_slot) { - Ok(iter) => iter, - Err(BeaconChainError::HistoricalBlockOutOfRange { - slot, - oldest_block_slot, - }) => { - debug!(self.log, "Range request failed during backfill"; - "requested_slot" => slot, - "oldest_known_slot" => oldest_block_slot - ); - return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); - } - Err(e) => { - error!(self.log, "Unable to obtain root iter"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; - - // Use `WhenSlotSkipped::Prev` to get the most recent block root prior to - // `request_start_slot` in order to check whether the `request_start_slot` is a skip. - let mut last_block_root = req.start_slot.checked_sub(1).and_then(|prev_slot| { - self.chain - .block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev) - .ok() - .flatten() - }); - - // Pick out the required blocks, ignoring skip-slots. - let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) - // map skip slots to None - .map(|(root, _)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .collect::>>() - }); - - let block_roots = match maybe_block_roots { - Ok(block_roots) => block_roots, - Err(e) => { - error!(self.log, "Error during iteration over blocks"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; - - // remove all skip slots - let block_roots = block_roots.into_iter().flatten(); + let block_roots = + self.get_block_roots_for_slot_range(req.start_slot, req.count, "DataColumnsByRange")?; let mut data_columns_sent = 0; for root in block_roots { diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 338f2bc4c8b..ef4d9cc4c16 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -800,6 +800,18 @@ impl NetworkBeaconProcessor { peer_action: Some(PeerAction::LowToleranceError), }) } + // Penalise peers for sending us banned blocks. + BlockError::KnownInvalidExecutionPayload(block_root) => { + warn!( + self.log, + "Received block known to be invalid"; + "block_root" => ?block_root, + ); + Err(ChainSegmentFailed { + message: format!("Banned block: {block_root:?}"), + peer_action: Some(PeerAction::LowToleranceError), + }) + } other => { debug!( self.log, "Invalid block received"; diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 2c8b271bd25..6411ba366f4 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -553,6 +553,22 @@ pub fn cli_app() -> Command { .action(ArgAction::Set) .display_order(0) ) + .arg( + Arg::new("disable-attesting") + .long("disable-attesting") + .help("Turn off attestation related APIs so that we have some hope of producing \ + blocks") + .action(ArgAction::SetTrue) + .display_order(0) + ) + .arg( + Arg::new("sync-tolerance-epochs") + .long("sync-tolerance-epochs") + .help("If the beacon node is within this many epochs from the head, we declare it to \ + be synced regardless of the network sync state") + .action(ArgAction::Set) + .display_order(0) + ) .arg( Arg::new("http-sse-capacity-multiplier") .long("http-sse-capacity-multiplier") @@ -812,7 +828,7 @@ pub fn cli_app() -> Command { .long("state-cache-size") .value_name("STATE_CACHE_SIZE") .help("Specifies the size of the state cache") - .default_value("128") + .default_value("32") .action(ArgAction::Set) .display_order(0) ) diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 84320762d6c..8690ac390ed 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -191,6 +191,16 @@ pub fn get_config( client_config.chain.enable_light_client_server = false; } + if cli_args.get_flag("disable-attesting") { + client_config.chain.disable_attesting = true; + } + + if let Some(sync_tolerance_epochs) = + clap_utils::parse_optional(cli_args, "sync-tolerance-epochs")? + { + client_config.chain.sync_tolerance_epochs = sync_tolerance_epochs; + } + if let Some(cache_size) = clap_utils::parse_optional(cli_args, "shuffling-cache-size")? { client_config.chain.shuffling_cache_size = cache_size; } diff --git a/boot_node/Cargo.toml b/boot_node/Cargo.toml index 94dcfac5e1e..0068e19ce86 100644 --- a/boot_node/Cargo.toml +++ b/boot_node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "boot_node" -version = "7.0.0-beta.0" +version = "7.0.0-beta.1" authors = ["Sigma Prime "] edition = { workspace = true } diff --git a/common/lighthouse_version/src/lib.rs b/common/lighthouse_version/src/lib.rs index cfffdbbb09e..dbb0617fba1 100644 --- a/common/lighthouse_version/src/lib.rs +++ b/common/lighthouse_version/src/lib.rs @@ -17,8 +17,8 @@ pub const VERSION: &str = git_version!( // NOTE: using --match instead of --exclude for compatibility with old Git "--match=thiswillnevermatchlol" ], - prefix = "Lighthouse/v7.0.0-beta.0-", - fallback = "Lighthouse/v7.0.0-beta.0" + prefix = "Lighthouse/v7.0.0-beta.1-", + fallback = "Lighthouse/v7.0.0-beta.1" ); /// Returns the first eight characters of the latest commit hash for this build. @@ -54,7 +54,7 @@ pub fn version_with_platform() -> String { /// /// `1.5.1` pub fn version() -> &'static str { - "7.0.0-beta.0" + "7.0.0-beta.1" } /// Returns the name of the current client running. diff --git a/consensus/proto_array/src/proto_array.rs b/consensus/proto_array/src/proto_array.rs index 5d0bee4c853..25332f4ea63 100644 --- a/consensus/proto_array/src/proto_array.rs +++ b/consensus/proto_array/src/proto_array.rs @@ -531,15 +531,7 @@ impl ProtoArray { || latest_valid_ancestor_is_descendant { match &node.execution_status { - // It's illegal for an execution client to declare that some previously-valid block - // is now invalid. This is a consensus failure on their behalf. - ExecutionStatus::Valid(hash) => { - return Err(Error::ValidExecutionStatusBecameInvalid { - block_root: node.root, - payload_block_hash: *hash, - }) - } - ExecutionStatus::Optimistic(hash) => { + ExecutionStatus::Valid(hash) | ExecutionStatus::Optimistic(hash) => { invalidated_indices.insert(index); node.execution_status = ExecutionStatus::Invalid(*hash); @@ -597,13 +589,9 @@ impl ProtoArray { if let Some(parent_index) = node.parent { if invalidated_indices.contains(&parent_index) { match &node.execution_status { - ExecutionStatus::Valid(hash) => { - return Err(Error::ValidExecutionStatusBecameInvalid { - block_root: node.root, - payload_block_hash: *hash, - }) - } - ExecutionStatus::Optimistic(hash) | ExecutionStatus::Invalid(hash) => { + ExecutionStatus::Valid(hash) + | ExecutionStatus::Optimistic(hash) + | ExecutionStatus::Invalid(hash) => { node.execution_status = ExecutionStatus::Invalid(*hash) } ExecutionStatus::Irrelevant(_) => { diff --git a/consensus/proto_array/src/proto_array_fork_choice.rs b/consensus/proto_array/src/proto_array_fork_choice.rs index 88d46603117..891590f4756 100644 --- a/consensus/proto_array/src/proto_array_fork_choice.rs +++ b/consensus/proto_array/src/proto_array_fork_choice.rs @@ -856,10 +856,18 @@ impl ProtoArrayForkChoice { } /// See `ProtoArray::iter_nodes` - pub fn iter_nodes<'a>(&'a self, block_root: &Hash256) -> Iter<'a> { + pub fn iter_nodes(&self, block_root: &Hash256) -> Iter { self.proto_array.iter_nodes(block_root) } + /// See `ProtoArray::iter_block_roots` + pub fn iter_block_roots( + &self, + block_root: &Hash256, + ) -> impl Iterator + use<'_> { + self.proto_array.iter_block_roots(block_root) + } + pub fn as_bytes(&self) -> Vec { SszContainer::from(self).as_ssz_bytes() } diff --git a/lcli/Cargo.toml b/lcli/Cargo.toml index 74b7ddcb2a3..7b6a38f0ce9 100644 --- a/lcli/Cargo.toml +++ b/lcli/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "lcli" description = "Lighthouse CLI (modeled after zcli)" -version = "7.0.0-beta.0" +version = "7.0.0-beta.1" authors = ["Paul Hauner "] edition = { workspace = true } diff --git a/lcli/src/http_sync.rs b/lcli/src/http_sync.rs index 1ef40e63978..e96d0beaab8 100644 --- a/lcli/src/http_sync.rs +++ b/lcli/src/http_sync.rs @@ -2,7 +2,9 @@ use clap::ArgMatches; use clap_utils::{parse_optional, parse_required}; use environment::Environment; use eth2::{ - types::{BlockId, ChainSpec, ForkName, PublishBlockRequest, SignedBlockContents}, + types::{ + BlockId, BroadcastValidation, ChainSpec, ForkName, PublishBlockRequest, SignedBlockContents, + }, BeaconNodeHttpClient, Error, SensitiveUrl, Timeouts, }; use eth2_network_config::Eth2NetworkConfig; @@ -85,7 +87,10 @@ pub async fn run_async( // 2. Apply blocks to target. for (slot, block) in blocks.iter().rev() { println!("posting block at slot {slot}"); - if let Err(e) = target.post_beacon_blocks(block).await { + if let Err(e) = target + .post_beacon_blocks_v2(block, Some(BroadcastValidation::Consensus)) + .await + { if let Error::ServerMessage(ref e) = e { if e.code == 202 { println!("duplicate block detected while posting block at slot {slot}"); @@ -115,7 +120,7 @@ async fn get_block_from_source( let mut f = File::open(&cache_path).unwrap(); let mut bytes = vec![]; f.read_to_end(&mut bytes).unwrap(); - PublishBlockRequest::from_ssz_bytes(&bytes, ForkName::Deneb).unwrap() + PublishBlockRequest::from_ssz_bytes(&bytes, ForkName::Electra).unwrap() } else { let block_from_source = source .get_beacon_blocks_ssz::(block_id, spec) diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index fc73a2cb930..3f676e2326e 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lighthouse" -version = "7.0.0-beta.0" +version = "7.0.0-beta.1" authors = ["Sigma Prime "] edition = { workspace = true } autotests = false diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index da10c2c4bd9..ffd2832d6f0 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1873,7 +1873,7 @@ fn block_cache_size_flag() { fn state_cache_size_default() { CommandLineTest::new() .run_with_zero_port() - .with_config(|config| assert_eq!(config.store.state_cache_size, new_non_zero_usize(128))); + .with_config(|config| assert_eq!(config.store.state_cache_size, new_non_zero_usize(32))); } #[test] fn state_cache_size_flag() { @@ -2579,6 +2579,16 @@ fn light_client_http_server_disabled() { }); } +#[test] +fn disable_attesting() { + CommandLineTest::new() + .flag("disable-attesting", None) + .run_with_zero_port() + .with_config(|config| { + assert!(config.chain.disable_attesting); + }); +} + #[test] fn gui_flag() { CommandLineTest::new() diff --git a/validator_client/src/cli.rs b/validator_client/src/cli.rs index dfcd2064e52..01bbdc99998 100644 --- a/validator_client/src/cli.rs +++ b/validator_client/src/cli.rs @@ -97,6 +97,14 @@ pub struct ValidatorClient { )] pub disable_auto_discover: bool, + #[clap( + long, + help = "Disable everything except block proposals", + display_order = 0, + help_heading = FLAG_HEADER + )] + pub disable_attesting: bool, + #[clap( long, help = "If present, the validator client will use longer timeouts for requests \ diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs index 2a848e20225..24213e83e65 100644 --- a/validator_client/src/config.rs +++ b/validator_client/src/config.rs @@ -85,6 +85,7 @@ pub struct Config { /// Configuration for the initialized validators #[serde(flatten)] pub initialized_validators: InitializedValidatorsConfig, + pub disable_attesting: bool, } impl Default for Config { @@ -126,6 +127,7 @@ impl Default for Config { validator_registration_batch_size: 500, distributed: false, initialized_validators: <_>::default(), + disable_attesting: false, } } } @@ -379,6 +381,8 @@ impl Config { true }; + config.disable_attesting = validator_client_config.disable_attesting; + Ok(config) } } diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 70236d6a3cc..04c8dc431ff 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -325,7 +325,7 @@ impl ProductionValidatorClient { get_validator_block: slot_duration / HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT, } } else { - Timeouts::set_all(slot_duration) + Timeouts::set_all(25 * slot_duration) }; Ok(BeaconNodeHttpClient::from_components( @@ -478,6 +478,7 @@ impl ProductionValidatorClient { context: duties_context, enable_high_validator_count_metrics: config.enable_high_validator_count_metrics, distributed: config.distributed, + disable_attesting: config.disable_attesting, }); // Update the metrics server. @@ -507,6 +508,7 @@ impl ProductionValidatorClient { .validator_store(validator_store.clone()) .beacon_nodes(beacon_nodes.clone()) .runtime_context(context.service_context("attestation".into())) + .disable(config.disable_attesting) .build()?; let preparation_service = PreparationServiceBuilder::new() diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index 9a6f94d52bc..961741a9770 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -21,6 +21,7 @@ pub struct AttestationServiceBuilder { slot_clock: Option, beacon_nodes: Option>>, context: Option>, + disable: bool, } impl AttestationServiceBuilder { @@ -31,6 +32,7 @@ impl AttestationServiceBuilder { slot_clock: None, beacon_nodes: None, context: None, + disable: false, } } @@ -59,6 +61,11 @@ impl AttestationServiceBuilder { self } + pub fn disable(mut self, disable: bool) -> Self { + self.disable = disable; + self + } + pub fn build(self) -> Result, String> { Ok(AttestationService { inner: Arc::new(Inner { @@ -77,6 +84,7 @@ impl AttestationServiceBuilder { context: self .context .ok_or("Cannot build AttestationService without runtime_context")?, + disable: self.disable, }), }) } @@ -89,6 +97,7 @@ pub struct Inner { slot_clock: T, beacon_nodes: Arc>, context: RuntimeContext, + disable: bool, } /// Attempts to produce attestations for all known validators 1/3rd of the way through each slot. @@ -120,6 +129,10 @@ impl AttestationService { /// Starts the service which periodically produces attestations. pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> { let log = self.context.log().clone(); + if self.disable { + info!(log, "Attestation service disabled"); + return Ok(()); + } let slot_duration = Duration::from_secs(spec.seconds_per_slot); let duration_to_next_slot = self diff --git a/validator_client/validator_services/src/duties_service.rs b/validator_client/validator_services/src/duties_service.rs index 1c0fd338d27..7437ff8bcf7 100644 --- a/validator_client/validator_services/src/duties_service.rs +++ b/validator_client/validator_services/src/duties_service.rs @@ -230,6 +230,7 @@ pub struct DutiesService { pub enable_high_validator_count_metrics: bool, /// If this validator is running in distributed mode. pub distributed: bool, + pub disable_attesting: bool, } impl DutiesService { @@ -403,6 +404,11 @@ pub fn start_update_service( "duties_service_proposers", ); + // Skip starting attestation duties or sync committee services. + if core_duties_service.disable_attesting { + return; + } + /* * Spawn the task which keeps track of local attestation duties. */ diff --git a/validator_client/validator_services/src/sync_committee_service.rs b/validator_client/validator_services/src/sync_committee_service.rs index 3ab5b33b6cc..5f84c517f37 100644 --- a/validator_client/validator_services/src/sync_committee_service.rs +++ b/validator_client/validator_services/src/sync_committee_service.rs @@ -87,6 +87,11 @@ impl SyncCommitteeService { pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> { let log = self.context.log().clone(); + if self.duties_service.disable_attesting { + info!(log, "Sync committee service disabled"); + return Ok(()); + } + let slot_duration = Duration::from_secs(spec.seconds_per_slot); let duration_to_next_slot = self .slot_clock