Skip to content

Commit

Permalink
Load block roots from fork choice where possible to avoid loading sta…
Browse files Browse the repository at this point in the history
…te from disk when serving block by range requests.
  • Loading branch information
jimmygchen committed Feb 28, 2025
1 parent 9f15f31 commit d961ca9
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 55 deletions.
31 changes: 31 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7282,6 +7282,37 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

Ok(None)
}

/// Retrieves block roots within some slot range from fork choice.
/// Returns `None` if the provided slot is not found.
pub fn block_roots_from_fork_choice(
&self,
start_slot: u64,
count: u64,
) -> Option<Vec<Hash256>> {
let head_block_root = self.canonical_head.cached_head().head_block_root();
let fork_choice_read_lock = self.canonical_head.fork_choice_read_lock();
let block_roots_iter = fork_choice_read_lock
.proto_array()
.iter_block_roots(&head_block_root);
let end_slot = start_slot.saturating_add(count);
let mut roots = vec![];

for (root, slot) in block_roots_iter {
if slot < end_slot && slot >= start_slot {
roots.push(root);
}
if slot < start_slot {
break;
}
}

if roots.is_empty() {
None
} else {
Some(roots)
}
}
}

impl<T: BeaconChainTypes> Drop for BeaconChain<T> {
Expand Down
134 changes: 80 additions & 54 deletions beacon_node/network/src/network_beacon_processor/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,64 +681,24 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"start_slot" => req.start_slot(),
);

let forwards_block_root_iter = match self
let block_roots_timer = std::time::Instant::now();
let (block_roots, block_roots_source) = if let Some(block_roots) = self
.chain
.forwards_iter_block_roots(Slot::from(*req.start_slot()))
.block_roots_from_fork_choice(*req.start_slot(), *req.count())
{
Ok(iter) => iter,
Err(BeaconChainError::HistoricalBlockOutOfRange {
slot,
oldest_block_slot,
}) => {
debug!(self.log, "Range request failed during backfill";
"requested_slot" => slot,
"oldest_known_slot" => oldest_block_slot
);
return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling"));
}
Err(e) => {
error!(self.log, "Unable to obtain root iter";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
);
return Err((RpcErrorResponse::ServerError, "Database error"));
}
};

// Pick out the required blocks, ignoring skip-slots.
let mut last_block_root = None;
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
iter.take_while(|(_, slot)| {
slot.as_u64() < req.start_slot().saturating_add(*req.count())
})
// map skip slots to None
.map(|(root, _)| {
let result = if Some(root) == last_block_root {
None
} else {
Some(root)
};
last_block_root = Some(root);
result
})
.collect::<Vec<Option<Hash256>>>()
});

let block_roots = match maybe_block_roots {
Ok(block_roots) => block_roots,
Err(e) => {
error!(self.log, "Error during iteration over blocks";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
);
return Err((RpcErrorResponse::ServerError, "Iteration error"));
}
(block_roots, "fork choice")
} else {
(self.get_block_roots_from_store(&req, &peer_id)?, "store")
};

// remove all skip slots
let block_roots = block_roots.into_iter().flatten().collect::<Vec<_>>();
debug!(
self.log,
"BlocksByRange block roots retrieved";
"start_slot" => req.start_slot(),
"count" => req.count(),
"block_roots_source" => block_roots_source,
"elapsed" => ?block_roots_timer.elapsed()
);

let current_slot = self
.chain
Expand Down Expand Up @@ -854,6 +814,72 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Ok(())
}

/// Get block roots for a `BlocksByRangeRequest` from the store using roots iterator.
fn get_block_roots_from_store(
&self,
req: &BlocksByRangeRequest,
peer_id: &PeerId,
) -> Result<Vec<Hash256>, (RpcErrorResponse, &'static str)> {
let forwards_block_root_iter = match self
.chain
.forwards_iter_block_roots(Slot::from(*req.start_slot()))
{
Ok(iter) => iter,
Err(BeaconChainError::HistoricalBlockOutOfRange {
slot,
oldest_block_slot,
}) => {
debug!(self.log, "Range request failed during backfill";
"requested_slot" => slot,
"oldest_known_slot" => oldest_block_slot
);
return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling"));
}
Err(e) => {
error!(self.log, "Unable to obtain root iter";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
);
return Err((RpcErrorResponse::ServerError, "Database error"));
}
};

// Pick out the required blocks, ignoring skip-slots.
let mut last_block_root = None;
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
iter.take_while(|(_, slot)| {
slot.as_u64() < req.start_slot().saturating_add(*req.count())
})
// map skip slots to None
.map(|(root, _)| {
let result = if Some(root) == last_block_root {
None
} else {
Some(root)
};
last_block_root = Some(root);
result
})
.collect::<Vec<Option<Hash256>>>()
});

let block_roots = match maybe_block_roots {
Ok(block_roots) => block_roots,
Err(e) => {
error!(self.log, "Error during iteration over blocks";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
);
return Err((RpcErrorResponse::ServerError, "Iteration error"));
}
};

// remove all skip slots
Ok(block_roots.into_iter().flatten().collect::<Vec<_>>())
}

/// Handle a `BlobsByRange` request from the peer.
pub fn handle_blobs_by_range_request(
self: Arc<Self>,
Expand Down
10 changes: 9 additions & 1 deletion consensus/proto_array/src/proto_array_fork_choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -856,10 +856,18 @@ impl ProtoArrayForkChoice {
}

/// See `ProtoArray::iter_nodes`
pub fn iter_nodes<'a>(&'a self, block_root: &Hash256) -> Iter<'a> {
pub fn iter_nodes(&self, block_root: &Hash256) -> Iter {
self.proto_array.iter_nodes(block_root)
}

/// See `ProtoArray::iter_block_roots`
pub fn iter_block_roots(
&self,
block_root: &Hash256,
) -> impl Iterator<Item = (Hash256, Slot)> + use<'_> {
self.proto_array.iter_block_roots(block_root)
}

pub fn as_bytes(&self) -> Vec<u8> {
SszContainer::from(self).as_ssz_bytes()
}
Expand Down

0 comments on commit d961ca9

Please sign in to comment.