From 2cb71e2211e7a778407c17ab7a6704a93966bca2 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Fri, 28 Feb 2025 17:30:15 +1100 Subject: [PATCH 1/2] Load block roots from fork choice where possible to avoid loading state from disk when serving block by range requests. --- beacon_node/beacon_chain/src/beacon_chain.rs | 33 +++++ .../network_beacon_processor/rpc_methods.rs | 134 +++++++++++------- .../src/proto_array_fork_choice.rs | 10 +- 3 files changed, 122 insertions(+), 55 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index ebd0ef186a9..90c0be5e422 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -7282,6 +7282,39 @@ impl BeaconChain { Ok(None) } + + /// Retrieves block roots within some slot range from fork choice. + /// Returns `None` if the provided slot is not found. + pub fn block_roots_from_fork_choice( + &self, + start_slot: u64, + count: u64, + ) -> Option> { + let head_block_root = self.canonical_head.cached_head().head_block_root(); + let fork_choice_read_lock = self.canonical_head.fork_choice_read_lock(); + let block_roots_iter = fork_choice_read_lock + .proto_array() + .iter_block_roots(&head_block_root); + let end_slot = start_slot.saturating_add(count); + let mut roots = vec![]; + + for (root, slot) in block_roots_iter { + if slot < end_slot && slot >= start_slot { + roots.push(root); + } + if slot < start_slot { + break; + } + } + + if roots.is_empty() { + None + } else { + // return in ascending slot order + roots.reverse(); + Some(roots) + } + } } impl Drop for BeaconChain { diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 857fc266dae..d78abe59285 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -681,64 +681,24 @@ impl NetworkBeaconProcessor { "start_slot" => req.start_slot(), ); - let forwards_block_root_iter = match self + let block_roots_timer = std::time::Instant::now(); + let (block_roots, block_roots_source) = if let Some(block_roots) = self .chain - .forwards_iter_block_roots(Slot::from(*req.start_slot())) + .block_roots_from_fork_choice(*req.start_slot(), *req.count()) { - Ok(iter) => iter, - Err(BeaconChainError::HistoricalBlockOutOfRange { - slot, - oldest_block_slot, - }) => { - debug!(self.log, "Range request failed during backfill"; - "requested_slot" => slot, - "oldest_known_slot" => oldest_block_slot - ); - return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); - } - Err(e) => { - error!(self.log, "Unable to obtain root iter"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; - - // Pick out the required blocks, ignoring skip-slots. - let mut last_block_root = None; - let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - iter.take_while(|(_, slot)| { - slot.as_u64() < req.start_slot().saturating_add(*req.count()) - }) - // map skip slots to None - .map(|(root, _)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .collect::>>() - }); - - let block_roots = match maybe_block_roots { - Ok(block_roots) => block_roots, - Err(e) => { - error!(self.log, "Error during iteration over blocks"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Iteration error")); - } + (block_roots, "fork choice") + } else { + (self.get_block_roots_from_store(&req, &peer_id)?, "store") }; - // remove all skip slots - let block_roots = block_roots.into_iter().flatten().collect::>(); + debug!( + self.log, + "BlocksByRange block roots retrieved"; + "start_slot" => req.start_slot(), + "block_roots_count" => block_roots.len(), + "block_roots_source" => block_roots_source, + "elapsed" => ?block_roots_timer.elapsed() + ); let current_slot = self .chain @@ -854,6 +814,72 @@ impl NetworkBeaconProcessor { Ok(()) } + /// Get block roots for a `BlocksByRangeRequest` from the store using roots iterator. + fn get_block_roots_from_store( + &self, + req: &BlocksByRangeRequest, + peer_id: &PeerId, + ) -> Result, (RpcErrorResponse, &'static str)> { + let forwards_block_root_iter = match self + .chain + .forwards_iter_block_roots(Slot::from(*req.start_slot())) + { + Ok(iter) => iter, + Err(BeaconChainError::HistoricalBlockOutOfRange { + slot, + oldest_block_slot, + }) => { + debug!(self.log, "Range request failed during backfill"; + "requested_slot" => slot, + "oldest_known_slot" => oldest_block_slot + ); + return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); + } + Err(e) => { + error!(self.log, "Unable to obtain root iter"; + "request" => ?req, + "peer" => %peer_id, + "error" => ?e + ); + return Err((RpcErrorResponse::ServerError, "Database error")); + } + }; + + // Pick out the required blocks, ignoring skip-slots. + let mut last_block_root = None; + let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { + iter.take_while(|(_, slot)| { + slot.as_u64() < req.start_slot().saturating_add(*req.count()) + }) + // map skip slots to None + .map(|(root, _)| { + let result = if Some(root) == last_block_root { + None + } else { + Some(root) + }; + last_block_root = Some(root); + result + }) + .collect::>>() + }); + + let block_roots = match maybe_block_roots { + Ok(block_roots) => block_roots, + Err(e) => { + error!(self.log, "Error during iteration over blocks"; + "request" => ?req, + "peer" => %peer_id, + "error" => ?e + ); + return Err((RpcErrorResponse::ServerError, "Iteration error")); + } + }; + + // remove all skip slots + Ok(block_roots.into_iter().flatten().collect::>()) + } + /// Handle a `BlobsByRange` request from the peer. pub fn handle_blobs_by_range_request( self: Arc, diff --git a/consensus/proto_array/src/proto_array_fork_choice.rs b/consensus/proto_array/src/proto_array_fork_choice.rs index 88d46603117..891590f4756 100644 --- a/consensus/proto_array/src/proto_array_fork_choice.rs +++ b/consensus/proto_array/src/proto_array_fork_choice.rs @@ -856,10 +856,18 @@ impl ProtoArrayForkChoice { } /// See `ProtoArray::iter_nodes` - pub fn iter_nodes<'a>(&'a self, block_root: &Hash256) -> Iter<'a> { + pub fn iter_nodes(&self, block_root: &Hash256) -> Iter { self.proto_array.iter_nodes(block_root) } + /// See `ProtoArray::iter_block_roots` + pub fn iter_block_roots( + &self, + block_root: &Hash256, + ) -> impl Iterator + use<'_> { + self.proto_array.iter_block_roots(block_root) + } + pub fn as_bytes(&self) -> Vec { SszContainer::from(self).as_ssz_bytes() } From bd093d95337ea7e1b0b010bc01994775f4be03c8 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Sat, 1 Mar 2025 00:23:22 +1100 Subject: [PATCH 2/2] Check if the start slot is newer than finalization (`start_slot >= finalized_slot`), and use fork choice in that case. --- beacon_node/beacon_chain/src/beacon_chain.rs | 20 +- .../network_beacon_processor/rpc_methods.rs | 302 ++++++------------ 2 files changed, 109 insertions(+), 213 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 90c0be5e422..f7f186704f5 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -7283,13 +7283,8 @@ impl BeaconChain { Ok(None) } - /// Retrieves block roots within some slot range from fork choice. - /// Returns `None` if the provided slot is not found. - pub fn block_roots_from_fork_choice( - &self, - start_slot: u64, - count: u64, - ) -> Option> { + /// Retrieves block roots (in ascending slot order) within some slot range from fork choice. + pub fn block_roots_from_fork_choice(&self, start_slot: u64, count: u64) -> Vec { let head_block_root = self.canonical_head.cached_head().head_block_root(); let fork_choice_read_lock = self.canonical_head.fork_choice_read_lock(); let block_roots_iter = fork_choice_read_lock @@ -7307,13 +7302,10 @@ impl BeaconChain { } } - if roots.is_empty() { - None - } else { - // return in ascending slot order - roots.reverse(); - Some(roots) - } + drop(fork_choice_read_lock); + // return in ascending slot order + roots.reverse(); + roots } } diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index d78abe59285..341177e1371 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -675,46 +675,32 @@ impl NetworkBeaconProcessor { request_id: RequestId, req: BlocksByRangeRequest, ) -> Result<(), (RpcErrorResponse, &'static str)> { + let req_start_slot = *req.start_slot(); + let req_count = *req.count(); + debug!(self.log, "Received BlocksByRange Request"; "peer_id" => %peer_id, - "count" => req.count(), - "start_slot" => req.start_slot(), - ); - - let block_roots_timer = std::time::Instant::now(); - let (block_roots, block_roots_source) = if let Some(block_roots) = self - .chain - .block_roots_from_fork_choice(*req.start_slot(), *req.count()) - { - (block_roots, "fork choice") - } else { - (self.get_block_roots_from_store(&req, &peer_id)?, "store") - }; - - debug!( - self.log, - "BlocksByRange block roots retrieved"; - "start_slot" => req.start_slot(), - "block_roots_count" => block_roots.len(), - "block_roots_source" => block_roots_source, - "elapsed" => ?block_roots_timer.elapsed() + "start_slot" => req_start_slot, + "count" => req_count, ); + let block_roots = + self.get_block_roots_for_slot_range(req_start_slot, req_count, "BlocksByRange")?; let current_slot = self .chain .slot() .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); - let log_results = |req: BlocksByRangeRequest, peer_id, blocks_sent| { - if blocks_sent < (*req.count() as usize) { + let log_results = |peer_id, blocks_sent| { + if blocks_sent < (req_count as usize) { debug!( self.log, "BlocksByRange outgoing response processed"; "peer" => %peer_id, "msg" => "Failed to return all requested blocks", - "start_slot" => req.start_slot(), + "start_slot" => req_start_slot, "current_slot" => current_slot, - "requested" => req.count(), + "requested" => req_count, "returned" => blocks_sent ); } else { @@ -722,9 +708,9 @@ impl NetworkBeaconProcessor { self.log, "BlocksByRange outgoing response processed"; "peer" => %peer_id, - "start_slot" => req.start_slot(), + "start_slot" => req_start_slot, "current_slot" => current_slot, - "requested" => req.count(), + "requested" => req_count, "returned" => blocks_sent ); } @@ -745,8 +731,7 @@ impl NetworkBeaconProcessor { Ok(Some(block)) => { // Due to skip slots, blocks could be out of the range, we ensure they // are in the range before sending - if block.slot() >= *req.start_slot() - && block.slot() < req.start_slot() + req.count() + if block.slot() >= req_start_slot && block.slot() < req_start_slot + req.count() { blocks_sent += 1; self.send_network_message(NetworkMessage::SendResponse { @@ -765,7 +750,7 @@ impl NetworkBeaconProcessor { "peer" => %peer_id, "request_root" => ?root ); - log_results(req, peer_id, blocks_sent); + log_results(peer_id, blocks_sent); return Err((RpcErrorResponse::ServerError, "Database inconsistency")); } Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { @@ -775,7 +760,7 @@ impl NetworkBeaconProcessor { "block_root" => ?root, "reason" => "execution layer not synced", ); - log_results(req, peer_id, blocks_sent); + log_results(peer_id, blocks_sent); // send the stream terminator return Err(( RpcErrorResponse::ResourceUnavailable, @@ -803,73 +788,111 @@ impl NetworkBeaconProcessor { "error" => ?e ); } - log_results(req, peer_id, blocks_sent); + log_results(peer_id, blocks_sent); // send the stream terminator return Err((RpcErrorResponse::ServerError, "Failed fetching blocks")); } } } - log_results(req, peer_id, blocks_sent); + log_results(peer_id, blocks_sent); Ok(()) } - /// Get block roots for a `BlocksByRangeRequest` from the store using roots iterator. - fn get_block_roots_from_store( + fn get_block_roots_for_slot_range( &self, - req: &BlocksByRangeRequest, - peer_id: &PeerId, + req_start_slot: u64, + req_count: u64, + req_type: &str, ) -> Result, (RpcErrorResponse, &'static str)> { - let forwards_block_root_iter = match self + let block_roots_timer = std::time::Instant::now(); + let finalized_slot = self .chain - .forwards_iter_block_roots(Slot::from(*req.start_slot())) - { - Ok(iter) => iter, - Err(BeaconChainError::HistoricalBlockOutOfRange { - slot, - oldest_block_slot, - }) => { - debug!(self.log, "Range request failed during backfill"; - "requested_slot" => slot, - "oldest_known_slot" => oldest_block_slot - ); - return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); - } - Err(e) => { - error!(self.log, "Unable to obtain root iter"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } + .canonical_head + .cached_head() + .finalized_checkpoint() + .epoch + .start_slot(T::EthSpec::slots_per_epoch()); + + let (block_roots, block_roots_source) = if req_start_slot >= finalized_slot.as_u64() { + ( + self.chain + .block_roots_from_fork_choice(req_start_slot, req_count), + "fork_choice", + ) + } else { + ( + self.get_block_roots_from_store(req_start_slot, req_count)?, + "store", + ) }; + debug!( + self.log, + "Range request block roots retrieved"; + "req_type" => req_type, + "start_slot" => req_start_slot, + "count" => req_count, + "block_roots_count" => block_roots.len(), + "block_roots_source" => block_roots_source, + "elapsed" => ?block_roots_timer.elapsed() + ); + + Ok(block_roots) + } + + /// Get block roots for a `BlocksByRangeRequest` from the store using roots iterator. + fn get_block_roots_from_store( + &self, + start_slot: u64, + count: u64, + ) -> Result, (RpcErrorResponse, &'static str)> { + let forwards_block_root_iter = + match self.chain.forwards_iter_block_roots(Slot::from(start_slot)) { + Ok(iter) => iter, + Err(BeaconChainError::HistoricalBlockOutOfRange { + slot, + oldest_block_slot, + }) => { + debug!(self.log, "Range request failed during backfill"; + "requested_slot" => slot, + "oldest_known_slot" => oldest_block_slot + ); + return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); + } + Err(e) => { + error!(self.log, "Unable to obtain root iter for range request"; + "start_slot" => start_slot, + "count" => count, + "error" => ?e + ); + return Err((RpcErrorResponse::ServerError, "Database error")); + } + }; + // Pick out the required blocks, ignoring skip-slots. let mut last_block_root = None; let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - iter.take_while(|(_, slot)| { - slot.as_u64() < req.start_slot().saturating_add(*req.count()) - }) - // map skip slots to None - .map(|(root, _)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .collect::>>() + iter.take_while(|(_, slot)| slot.as_u64() < start_slot.saturating_add(count)) + // map skip slots to None + .map(|(root, _)| { + let result = if Some(root) == last_block_root { + None + } else { + Some(root) + }; + last_block_root = Some(root); + result + }) + .collect::>>() }); let block_roots = match maybe_block_roots { Ok(block_roots) => block_roots, Err(e) => { - error!(self.log, "Error during iteration over blocks"; - "request" => ?req, - "peer" => %peer_id, + error!(self.log, "Error during iteration over blocks for range request"; + "start_slot" => start_slot, + "count" => count, "error" => ?e ); return Err((RpcErrorResponse::ServerError, "Iteration error")); @@ -958,65 +981,8 @@ impl NetworkBeaconProcessor { }; } - let forwards_block_root_iter = - match self.chain.forwards_iter_block_roots(request_start_slot) { - Ok(iter) => iter, - Err(BeaconChainError::HistoricalBlockOutOfRange { - slot, - oldest_block_slot, - }) => { - debug!(self.log, "Range request failed during backfill"; - "requested_slot" => slot, - "oldest_known_slot" => oldest_block_slot - ); - return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); - } - Err(e) => { - error!(self.log, "Unable to obtain root iter"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; - - // Use `WhenSlotSkipped::Prev` to get the most recent block root prior to - // `request_start_slot` in order to check whether the `request_start_slot` is a skip. - let mut last_block_root = req.start_slot.checked_sub(1).and_then(|prev_slot| { - self.chain - .block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev) - .ok() - .flatten() - }); - - // Pick out the required blocks, ignoring skip-slots. - let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) - // map skip slots to None - .map(|(root, _)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .collect::>>() - }); - - let block_roots = match maybe_block_roots { - Ok(block_roots) => block_roots, - Err(e) => { - error!(self.log, "Error during iteration over blocks"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; + let block_roots = + self.get_block_roots_for_slot_range(req.start_slot, req.count, "BlobsByRange")?; let current_slot = self .chain @@ -1035,8 +1001,6 @@ impl NetworkBeaconProcessor { ); }; - // remove all skip slots - let block_roots = block_roots.into_iter().flatten(); let mut blobs_sent = 0; for root in block_roots { @@ -1162,68 +1126,8 @@ impl NetworkBeaconProcessor { }; } - let forwards_block_root_iter = - match self.chain.forwards_iter_block_roots(request_start_slot) { - Ok(iter) => iter, - Err(BeaconChainError::HistoricalBlockOutOfRange { - slot, - oldest_block_slot, - }) => { - debug!(self.log, "Range request failed during backfill"; - "requested_slot" => slot, - "oldest_known_slot" => oldest_block_slot - ); - return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); - } - Err(e) => { - error!(self.log, "Unable to obtain root iter"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; - - // Use `WhenSlotSkipped::Prev` to get the most recent block root prior to - // `request_start_slot` in order to check whether the `request_start_slot` is a skip. - let mut last_block_root = req.start_slot.checked_sub(1).and_then(|prev_slot| { - self.chain - .block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev) - .ok() - .flatten() - }); - - // Pick out the required blocks, ignoring skip-slots. - let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) - // map skip slots to None - .map(|(root, _)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .collect::>>() - }); - - let block_roots = match maybe_block_roots { - Ok(block_roots) => block_roots, - Err(e) => { - error!(self.log, "Error during iteration over blocks"; - "request" => ?req, - "peer" => %peer_id, - "error" => ?e - ); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; - - // remove all skip slots - let block_roots = block_roots.into_iter().flatten(); + let block_roots = + self.get_block_roots_for_slot_range(req.start_slot, req.count, "DataColumnsByRange")?; let mut data_columns_sent = 0; for root in block_roots {