-
Notifications
You must be signed in to change notification settings - Fork 810
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Load block roots from fork choice where possible when serving BlocksByRange
requests
#7058
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -681,64 +681,24 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> { | |||||||||||||||||||||||||||
"start_slot" => req.start_slot(), | ||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
let forwards_block_root_iter = match self | ||||||||||||||||||||||||||||
let block_roots_timer = std::time::Instant::now(); | ||||||||||||||||||||||||||||
let (block_roots, block_roots_source) = if let Some(block_roots) = self | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rather than attempting to access fork choice and seeing if it works, I think we should check if the start slot is newer than finalization ( This will be more efficient, as it fixes the issue with skipped slots. If a peer requests a range of unfinalized slots that is empty, we can just return nothing. Currently, we will try fork choice, get lighthouse/beacon_node/network/src/network_beacon_processor/rpc_methods.rs Lines 746 to 758 in 2cb71e2
In other words, the new fork choice impl allows us to avoid looking up blocks from skipped slots and then discarding them. Something that this code change is not currently handling is requests that span across the finalized slot. If Longer term, we should use some kind of hybrid iterator so we can easily span the finalized slot without any mucking around. For now I think we don't need to worry about this though, because these requests should be infrequent. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the explanation. I've applied the change to check for finalization in bd093d9 |
||||||||||||||||||||||||||||
.chain | ||||||||||||||||||||||||||||
.forwards_iter_block_roots(Slot::from(*req.start_slot())) | ||||||||||||||||||||||||||||
.block_roots_from_fork_choice(*req.start_slot(), *req.count()) | ||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||
Ok(iter) => iter, | ||||||||||||||||||||||||||||
Err(BeaconChainError::HistoricalBlockOutOfRange { | ||||||||||||||||||||||||||||
slot, | ||||||||||||||||||||||||||||
oldest_block_slot, | ||||||||||||||||||||||||||||
}) => { | ||||||||||||||||||||||||||||
debug!(self.log, "Range request failed during backfill"; | ||||||||||||||||||||||||||||
"requested_slot" => slot, | ||||||||||||||||||||||||||||
"oldest_known_slot" => oldest_block_slot | ||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||
return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
Err(e) => { | ||||||||||||||||||||||||||||
error!(self.log, "Unable to obtain root iter"; | ||||||||||||||||||||||||||||
"request" => ?req, | ||||||||||||||||||||||||||||
"peer" => %peer_id, | ||||||||||||||||||||||||||||
"error" => ?e | ||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||
return Err((RpcErrorResponse::ServerError, "Database error")); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// Pick out the required blocks, ignoring skip-slots. | ||||||||||||||||||||||||||||
let mut last_block_root = None; | ||||||||||||||||||||||||||||
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { | ||||||||||||||||||||||||||||
iter.take_while(|(_, slot)| { | ||||||||||||||||||||||||||||
slot.as_u64() < req.start_slot().saturating_add(*req.count()) | ||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||
// map skip slots to None | ||||||||||||||||||||||||||||
.map(|(root, _)| { | ||||||||||||||||||||||||||||
let result = if Some(root) == last_block_root { | ||||||||||||||||||||||||||||
None | ||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||
Some(root) | ||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||
last_block_root = Some(root); | ||||||||||||||||||||||||||||
result | ||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||
.collect::<Vec<Option<Hash256>>>() | ||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
let block_roots = match maybe_block_roots { | ||||||||||||||||||||||||||||
Ok(block_roots) => block_roots, | ||||||||||||||||||||||||||||
Err(e) => { | ||||||||||||||||||||||||||||
error!(self.log, "Error during iteration over blocks"; | ||||||||||||||||||||||||||||
"request" => ?req, | ||||||||||||||||||||||||||||
"peer" => %peer_id, | ||||||||||||||||||||||||||||
"error" => ?e | ||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||
return Err((RpcErrorResponse::ServerError, "Iteration error")); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
(block_roots, "fork choice") | ||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||
(self.get_block_roots_from_store(&req, &peer_id)?, "store") | ||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// remove all skip slots | ||||||||||||||||||||||||||||
let block_roots = block_roots.into_iter().flatten().collect::<Vec<_>>(); | ||||||||||||||||||||||||||||
debug!( | ||||||||||||||||||||||||||||
self.log, | ||||||||||||||||||||||||||||
"BlocksByRange block roots retrieved"; | ||||||||||||||||||||||||||||
"start_slot" => req.start_slot(), | ||||||||||||||||||||||||||||
"block_roots_count" => block_roots.len(), | ||||||||||||||||||||||||||||
"block_roots_source" => block_roots_source, | ||||||||||||||||||||||||||||
"elapsed" => ?block_roots_timer.elapsed() | ||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
let current_slot = self | ||||||||||||||||||||||||||||
.chain | ||||||||||||||||||||||||||||
|
@@ -854,6 +814,72 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> { | |||||||||||||||||||||||||||
Ok(()) | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
/// Get block roots for a `BlocksByRangeRequest` from the store using roots iterator. | ||||||||||||||||||||||||||||
fn get_block_roots_from_store( | ||||||||||||||||||||||||||||
&self, | ||||||||||||||||||||||||||||
req: &BlocksByRangeRequest, | ||||||||||||||||||||||||||||
peer_id: &PeerId, | ||||||||||||||||||||||||||||
) -> Result<Vec<Hash256>, (RpcErrorResponse, &'static str)> { | ||||||||||||||||||||||||||||
let forwards_block_root_iter = match self | ||||||||||||||||||||||||||||
.chain | ||||||||||||||||||||||||||||
.forwards_iter_block_roots(Slot::from(*req.start_slot())) | ||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||
Ok(iter) => iter, | ||||||||||||||||||||||||||||
Err(BeaconChainError::HistoricalBlockOutOfRange { | ||||||||||||||||||||||||||||
slot, | ||||||||||||||||||||||||||||
oldest_block_slot, | ||||||||||||||||||||||||||||
}) => { | ||||||||||||||||||||||||||||
debug!(self.log, "Range request failed during backfill"; | ||||||||||||||||||||||||||||
"requested_slot" => slot, | ||||||||||||||||||||||||||||
"oldest_known_slot" => oldest_block_slot | ||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||
return Err((RpcErrorResponse::ResourceUnavailable, "Backfilling")); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
Err(e) => { | ||||||||||||||||||||||||||||
error!(self.log, "Unable to obtain root iter"; | ||||||||||||||||||||||||||||
"request" => ?req, | ||||||||||||||||||||||||||||
"peer" => %peer_id, | ||||||||||||||||||||||||||||
"error" => ?e | ||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||
return Err((RpcErrorResponse::ServerError, "Database error")); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// Pick out the required blocks, ignoring skip-slots. | ||||||||||||||||||||||||||||
let mut last_block_root = None; | ||||||||||||||||||||||||||||
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { | ||||||||||||||||||||||||||||
iter.take_while(|(_, slot)| { | ||||||||||||||||||||||||||||
slot.as_u64() < req.start_slot().saturating_add(*req.count()) | ||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||
// map skip slots to None | ||||||||||||||||||||||||||||
.map(|(root, _)| { | ||||||||||||||||||||||||||||
let result = if Some(root) == last_block_root { | ||||||||||||||||||||||||||||
None | ||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||
Some(root) | ||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||
last_block_root = Some(root); | ||||||||||||||||||||||||||||
result | ||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||
.collect::<Vec<Option<Hash256>>>() | ||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
let block_roots = match maybe_block_roots { | ||||||||||||||||||||||||||||
Ok(block_roots) => block_roots, | ||||||||||||||||||||||||||||
Err(e) => { | ||||||||||||||||||||||||||||
error!(self.log, "Error during iteration over blocks"; | ||||||||||||||||||||||||||||
"request" => ?req, | ||||||||||||||||||||||||||||
"peer" => %peer_id, | ||||||||||||||||||||||||||||
"error" => ?e | ||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||
return Err((RpcErrorResponse::ServerError, "Iteration error")); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// remove all skip slots | ||||||||||||||||||||||||||||
Ok(block_roots.into_iter().flatten().collect::<Vec<_>>()) | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
/// Handle a `BlobsByRange` request from the peer. | ||||||||||||||||||||||||||||
pub fn handle_blobs_by_range_request( | ||||||||||||||||||||||||||||
self: Arc<Self>, | ||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically we should probably avoid locking fork choice from an async task, but atm I think we have bigger fish to fry. It's nowhere near as bad as doing 10s of state loads 😱