Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load block roots from fork choice where possible when serving BlocksByRange requests #7058

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7282,6 +7282,39 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

Ok(None)
}

/// Retrieves block roots within some slot range from fork choice.
/// Returns `None` if the provided slot is not found.
pub fn block_roots_from_fork_choice(
&self,
start_slot: u64,
count: u64,
) -> Option<Vec<Hash256>> {
let head_block_root = self.canonical_head.cached_head().head_block_root();
let fork_choice_read_lock = self.canonical_head.fork_choice_read_lock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically we should probably avoid locking fork choice from an async task, but atm I think we have bigger fish to fry. It's nowhere near as bad as doing 10s of state loads 😱

let block_roots_iter = fork_choice_read_lock
.proto_array()
.iter_block_roots(&head_block_root);
let end_slot = start_slot.saturating_add(count);
let mut roots = vec![];

for (root, slot) in block_roots_iter {
if slot < end_slot && slot >= start_slot {
roots.push(root);
}
if slot < start_slot {
break;
}
}

if roots.is_empty() {
None
} else {
// return in ascending slot order
roots.reverse();
Some(roots)
}
}
}

impl<T: BeaconChainTypes> Drop for BeaconChain<T> {
Expand Down
134 changes: 80 additions & 54 deletions beacon_node/network/src/network_beacon_processor/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,64 +681,24 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"start_slot" => req.start_slot(),
);

let forwards_block_root_iter = match self
let block_roots_timer = std::time::Instant::now();
let (block_roots, block_roots_source) = if let Some(block_roots) = self
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than attempting to access fork choice and seeing if it works, I think we should check if the start slot is newer than finalization (start_slot >= finalized_slot), and use fork choice in that case. Fork choice has to have every chain newer than finalization.

This will be more efficient, as it fixes the issue with skipped slots. If a peer requests a range of unfinalized slots that is empty, we can just return nothing. Currently, we will try fork choice, get None, and then try get_block_roots_from_store and get the block roots from the skipped slots. This is not actually helpful, as later in the function we'll drop the blocks that are out of range anyway:

// Due to skip slots, blocks could be out of the range, we ensure they
// are in the range before sending
if block.slot() >= *req.start_slot()
&& block.slot() < req.start_slot() + req.count()
{
blocks_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
request_id,
response: Response::BlocksByRange(Some(block.clone())),
id: (connection_id, substream_id),
});
}

In other words, the new fork choice impl allows us to avoid looking up blocks from skipped slots and then discarding them.

Something that this code change is not currently handling is requests that span across the finalized slot. If start_slot < finalized_slot and end_slot > finalized_slot, then the current impl will return partial data (all the blocks newer than finalization). Instead of this, short-term I think it would be better to fallback on the old disk-based iterator, which is accomplished by the same code change I suggested above: only use fork choice if the start slot is newer than finalization.

Longer term, we should use some kind of hybrid iterator so we can easily span the finalized slot without any mucking around. For now I think we don't need to worry about this though, because these requests should be infrequent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. I've applied the change to check for finalization in bd093d9

.chain
.forwards_iter_block_roots(Slot::from(*req.start_slot()))
.block_roots_from_fork_choice(*req.start_slot(), *req.count())
{
Ok(iter) => iter,
Err(BeaconChainError::HistoricalBlockOutOfRange {
slot,
oldest_block_slot,
}) => {
debug!(self.log, "Range request failed during backfill";
"requested_slot" => slot,
"oldest_known_slot" => oldest_block_slot
);
return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling"));
}
Err(e) => {
error!(self.log, "Unable to obtain root iter";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
);
return Err((RpcErrorResponse::ServerError, "Database error"));
}
};

// Pick out the required blocks, ignoring skip-slots.
let mut last_block_root = None;
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
iter.take_while(|(_, slot)| {
slot.as_u64() < req.start_slot().saturating_add(*req.count())
})
// map skip slots to None
.map(|(root, _)| {
let result = if Some(root) == last_block_root {
None
} else {
Some(root)
};
last_block_root = Some(root);
result
})
.collect::<Vec<Option<Hash256>>>()
});

let block_roots = match maybe_block_roots {
Ok(block_roots) => block_roots,
Err(e) => {
error!(self.log, "Error during iteration over blocks";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
);
return Err((RpcErrorResponse::ServerError, "Iteration error"));
}
(block_roots, "fork choice")
} else {
(self.get_block_roots_from_store(&req, &peer_id)?, "store")
};

// remove all skip slots
let block_roots = block_roots.into_iter().flatten().collect::<Vec<_>>();
debug!(
self.log,
"BlocksByRange block roots retrieved";
"start_slot" => req.start_slot(),
"block_roots_count" => block_roots.len(),
"block_roots_source" => block_roots_source,
"elapsed" => ?block_roots_timer.elapsed()
);

let current_slot = self
.chain
Expand Down Expand Up @@ -854,6 +814,72 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Ok(())
}

/// Get block roots for a `BlocksByRangeRequest` from the store using roots iterator.
fn get_block_roots_from_store(
&self,
req: &BlocksByRangeRequest,
peer_id: &PeerId,
) -> Result<Vec<Hash256>, (RpcErrorResponse, &'static str)> {
let forwards_block_root_iter = match self
.chain
.forwards_iter_block_roots(Slot::from(*req.start_slot()))
{
Ok(iter) => iter,
Err(BeaconChainError::HistoricalBlockOutOfRange {
slot,
oldest_block_slot,
}) => {
debug!(self.log, "Range request failed during backfill";
"requested_slot" => slot,
"oldest_known_slot" => oldest_block_slot
);
return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling"));
}
Err(e) => {
error!(self.log, "Unable to obtain root iter";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
);
return Err((RpcErrorResponse::ServerError, "Database error"));
}
};

// Pick out the required blocks, ignoring skip-slots.
let mut last_block_root = None;
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
iter.take_while(|(_, slot)| {
slot.as_u64() < req.start_slot().saturating_add(*req.count())
})
// map skip slots to None
.map(|(root, _)| {
let result = if Some(root) == last_block_root {
None
} else {
Some(root)
};
last_block_root = Some(root);
result
})
.collect::<Vec<Option<Hash256>>>()
});

let block_roots = match maybe_block_roots {
Ok(block_roots) => block_roots,
Err(e) => {
error!(self.log, "Error during iteration over blocks";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
);
return Err((RpcErrorResponse::ServerError, "Iteration error"));
}
};

// remove all skip slots
Ok(block_roots.into_iter().flatten().collect::<Vec<_>>())
}

/// Handle a `BlobsByRange` request from the peer.
pub fn handle_blobs_by_range_request(
self: Arc<Self>,
Expand Down
10 changes: 9 additions & 1 deletion consensus/proto_array/src/proto_array_fork_choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -856,10 +856,18 @@ impl ProtoArrayForkChoice {
}

/// See `ProtoArray::iter_nodes`
pub fn iter_nodes<'a>(&'a self, block_root: &Hash256) -> Iter<'a> {
pub fn iter_nodes(&self, block_root: &Hash256) -> Iter {
self.proto_array.iter_nodes(block_root)
}

/// See `ProtoArray::iter_block_roots`
pub fn iter_block_roots(
&self,
block_root: &Hash256,
) -> impl Iterator<Item = (Hash256, Slot)> + use<'_> {
self.proto_array.iter_block_roots(block_root)
}

pub fn as_bytes(&self) -> Vec<u8> {
SszContainer::from(self).as_ssz_bytes()
}
Expand Down
Loading