Skip to content

Commit

Permalink
Store data columns in early attester cache and blobs db.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmygchen committed Feb 8, 2024
1 parent c55e959 commit 64061e2
Show file tree
Hide file tree
Showing 17 changed files with 341 additions and 51 deletions.
37 changes: 36 additions & 1 deletion beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ use tokio_stream::Stream;
use tree_hash::TreeHash;
use types::beacon_state::CloneConfig;
use types::blob_sidecar::{BlobSidecarList, FixedBlobSidecarList};
use types::data_column_sidecar::DataColumnSidecarList;
use types::payload::BlockProductionVersion;
use types::*;

Expand Down Expand Up @@ -1179,6 +1180,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_or_else(|| self.get_blobs(block_root), Ok)
}

pub fn get_data_columns_checking_early_attester_cache(
&self,
block_root: &Hash256,
) -> Result<DataColumnSidecarList<T::EthSpec>, Error> {
self.early_attester_cache
.get_data_columns(*block_root)
.map_or_else(|| self.get_data_columns(block_root), Ok)
}

/// Returns the block at the given root, if any.
///
/// ## Errors
Expand Down Expand Up @@ -1254,6 +1264,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

/// Returns the data columns at the given root, if any.
///
/// ## Errors
/// May return a database error.
pub fn get_data_columns(
&self,
block_root: &Hash256,
) -> Result<DataColumnSidecarList<T::EthSpec>, Error> {
match self.store.get_data_columns(block_root)? {
Some(data_columns) => Ok(data_columns),
None => Ok(DataColumnSidecarList::default()),
}
}

pub fn get_blinded_block(
&self,
block_root: &Hash256,
Expand Down Expand Up @@ -3515,7 +3539,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// If the write fails, revert fork choice to the version from disk, else we can
// end up with blocks in fork choice that are missing from disk.
// See https://github.com/sigp/lighthouse/issues/2028
let (_, signed_block, blobs) = signed_block.deconstruct();
let (_, signed_block, blobs, data_columns) = signed_block.deconstruct();
let block = signed_block.message();
ops.extend(
confirmed_state_roots
Expand All @@ -3536,6 +3560,17 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

if let Some(data_columns) = data_columns {
if !data_columns.is_empty() {
debug!(
self.log, "Writing data_columns to store";
"block_root" => %block_root,
"count" => data_columns.len(),
);
ops.push(StoreOp::PutDataColumns(block_root, data_columns));
}
}

let txn_lock = self.store.hot_db.begin_rw_transaction();

if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) {
Expand Down
33 changes: 26 additions & 7 deletions beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use ssz_types::VariableList;
use state_processing::ConsensusContext;
use std::sync::Arc;
use types::blob_sidecar::{BlobIdentifier, BlobSidecarError, FixedBlobSidecarList};
use types::data_column_sidecar::DataColumnSidecarList;
use types::{
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, Epoch, EthSpec, Hash256,
SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
Expand Down Expand Up @@ -43,13 +44,15 @@ impl<E: EthSpec> RpcBlock<E> {
match &self.block {
RpcBlockInner::Block(block) => block,
RpcBlockInner::BlockAndBlobs(block, _) => block,
RpcBlockInner::BlockAndDataColumns(block, _) => block,
}
}

pub fn blobs(&self) -> Option<&BlobSidecarList<E>> {
match &self.block {
RpcBlockInner::Block(_) => None,
RpcBlockInner::BlockAndBlobs(_, blobs) => Some(blobs),
RpcBlockInner::BlockAndDataColumns(_, _) => None,
}
}
}
Expand All @@ -65,6 +68,9 @@ enum RpcBlockInner<E: EthSpec> {
/// This variant is used with parent lookups and by-range responses. It should have all blobs
/// ordered, all block roots matching, and the correct number of blobs for this block.
BlockAndBlobs(Arc<SignedBeaconBlock<E>>, BlobSidecarList<E>),
/// This variant is used with parent lookups and by-range responses. It should have all blobs
/// ordered, all block roots matching, and the correct number of data columns for this block.
BlockAndDataColumns(Arc<SignedBeaconBlock<E>>, DataColumnSidecarList<E>),
}

impl<E: EthSpec> RpcBlock<E> {
Expand Down Expand Up @@ -140,19 +146,29 @@ impl<E: EthSpec> RpcBlock<E> {
Hash256,
Arc<SignedBeaconBlock<E>>,
Option<BlobSidecarList<E>>,
Option<DataColumnSidecarList<E>>,
) {
let block_root = self.block_root();
match self.block {
RpcBlockInner::Block(block) => (block_root, block, None),
RpcBlockInner::BlockAndBlobs(block, blobs) => (block_root, block, Some(blobs)),
RpcBlockInner::Block(block) => (block_root, block, None, None),
RpcBlockInner::BlockAndBlobs(block, blobs) => (block_root, block, Some(blobs), None),
RpcBlockInner::BlockAndDataColumns(block, data_columns) => {
(block_root, block, None, Some(data_columns))
}
}
}
pub fn n_blobs(&self) -> usize {
match &self.block {
RpcBlockInner::Block(_) => 0,
RpcBlockInner::Block(_) | RpcBlockInner::BlockAndDataColumns(_, _) => 0,
RpcBlockInner::BlockAndBlobs(_, blobs) => blobs.len(),
}
}
pub fn n_data_columns(&self) -> usize {
match &self.block {
RpcBlockInner::Block(_) | RpcBlockInner::BlockAndBlobs(_, _) => 0,
RpcBlockInner::BlockAndDataColumns(_, data_columns) => data_columns.len(),
}
}
}

/// A block that has gone through all pre-deneb block processing checks including block processing
Expand Down Expand Up @@ -478,12 +494,13 @@ impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
}

fn into_rpc_block(self) -> RpcBlock<E> {
let (block_root, block, blobs_opt) = self.deconstruct();
let (block_root, block, blobs_opt, data_columns_opt) = self.deconstruct();
// Circumvent the constructor here, because an Available block will have already had
// consistency checks performed.
let inner = match blobs_opt {
None => RpcBlockInner::Block(block),
Some(blobs) => RpcBlockInner::BlockAndBlobs(block, blobs),
let inner = match (blobs_opt, data_columns_opt) {
(None, None) => RpcBlockInner::Block(block),
(Some(blobs), _) => RpcBlockInner::BlockAndBlobs(block, blobs),
(_, Some(data_columns)) => RpcBlockInner::BlockAndDataColumns(block, data_columns),
};
RpcBlock {
block_root,
Expand Down Expand Up @@ -515,12 +532,14 @@ impl<E: EthSpec> AsBlock<E> for RpcBlock<E> {
match &self.block {
RpcBlockInner::Block(block) => block,
RpcBlockInner::BlockAndBlobs(block, _) => block,
RpcBlockInner::BlockAndDataColumns(block, _) => block,
}
}
fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
match &self.block {
RpcBlockInner::Block(block) => block.clone(),
RpcBlockInner::BlockAndBlobs(block, _) => block.clone(),
RpcBlockInner::BlockAndDataColumns(block, _) => block.clone(),
}
}
fn canonical_root(&self) -> Hash256 {
Expand Down
77 changes: 50 additions & 27 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ mod overflow_lru_cache;
mod processing_cache;
mod state_lru_cache;

use crate::data_column_verification::GossipVerifiedDataColumn;
use crate::data_column_verification::{verify_kzg_for_data_column_list, GossipVerifiedDataColumn};
pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory};
use types::data_column_sidecar::DataColumnIdentifier;
use types::data_column_sidecar::{DataColumnIdentifier, DataColumnSidecarList};
use types::non_zero_usize::new_non_zero_usize;

/// The LRU Cache stores `PendingComponents` which can store up to
Expand Down Expand Up @@ -269,35 +269,45 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
&self,
block: RpcBlock<T::EthSpec>,
) -> Result<MaybeAvailableBlock<T::EthSpec>, AvailabilityCheckError> {
let (block_root, block, blobs) = block.deconstruct();
match blobs {
None => {
let (block_root, block, blobs, data_columns) = block.deconstruct();
match (blobs, data_columns) {
(None, None) => {
if self.blobs_required_for_block(&block) {
Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block })
} else {
Ok(MaybeAvailableBlock::Available(AvailableBlock {
block_root,
block,
blobs: None,
data_columns: None,
}))
}
}
Some(blob_list) => {
let verified_blobs = if self.blobs_required_for_block(&block) {
let kzg = self
.kzg
.as_ref()
.ok_or(AvailabilityCheckError::KzgNotInitialized)?;
verify_kzg_for_blob_list(blob_list.iter(), kzg)
.map_err(AvailabilityCheckError::Kzg)?;
Some(blob_list)
} else {
None
};
(maybe_blob_list, maybe_data_column_list) => {
let (verified_blobs, verified_data_column) =
if self.blobs_required_for_block(&block) {
let kzg = self
.kzg
.as_ref()
.ok_or(AvailabilityCheckError::KzgNotInitialized)?;

if let Some(blob_list) = maybe_blob_list.as_ref() {
verify_kzg_for_blob_list(blob_list.iter(), kzg)
.map_err(AvailabilityCheckError::Kzg)?;
}
if let Some(data_column_list) = maybe_data_column_list.as_ref() {
verify_kzg_for_data_column_list(data_column_list.iter(), kzg)
.map_err(AvailabilityCheckError::Kzg)?;
}
(maybe_blob_list, maybe_data_column_list)
} else {
(None, None)
};
Ok(MaybeAvailableBlock::Available(AvailableBlock {
block_root,
block,
blobs: verified_blobs,
data_columns: verified_data_column,
}))
}
}
Expand Down Expand Up @@ -333,30 +343,33 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
}

for block in blocks {
let (block_root, block, blobs) = block.deconstruct();
match blobs {
None => {
let (block_root, block, blobs, data_columns) = block.deconstruct();
match (blobs, data_columns) {
(None, None) => {
if self.blobs_required_for_block(&block) {
results.push(MaybeAvailableBlock::AvailabilityPending { block_root, block })
} else {
results.push(MaybeAvailableBlock::Available(AvailableBlock {
block_root,
block,
blobs: None,
data_columns: None,
}))
}
}
Some(blob_list) => {
let verified_blobs = if self.blobs_required_for_block(&block) {
Some(blob_list)
} else {
None
};
(maybe_blob_list, maybe_data_column_list) => {
let (verified_blobs, verified_data_column) =
if self.blobs_required_for_block(&block) {
(maybe_blob_list, maybe_data_column_list)
} else {
(None, None)
};
// already verified kzg for all blobs
results.push(MaybeAvailableBlock::Available(AvailableBlock {
block_root,
block,
blobs: verified_blobs,
data_columns: verified_data_column,
}))
}
}
Expand Down Expand Up @@ -569,18 +582,21 @@ pub struct AvailableBlock<E: EthSpec> {
block_root: Hash256,
block: Arc<SignedBeaconBlock<E>>,
blobs: Option<BlobSidecarList<E>>,
data_columns: Option<DataColumnSidecarList<E>>,
}

impl<E: EthSpec> AvailableBlock<E> {
pub fn __new_for_testing(
block_root: Hash256,
block: Arc<SignedBeaconBlock<E>>,
blobs: Option<BlobSidecarList<E>>,
data_columns: Option<DataColumnSidecarList<E>>,
) -> Self {
Self {
block_root,
block,
blobs,
data_columns,
}
}

Expand All @@ -595,19 +611,26 @@ impl<E: EthSpec> AvailableBlock<E> {
self.blobs.as_ref()
}

pub fn data_columns(&self) -> Option<&DataColumnSidecarList<E>> {
self.data_columns.as_ref()
}

#[allow(clippy::type_complexity)]
pub fn deconstruct(
self,
) -> (
Hash256,
Arc<SignedBeaconBlock<E>>,
Option<BlobSidecarList<E>>,
Option<DataColumnSidecarList<E>>,
) {
let AvailableBlock {
block_root,
block,
blobs,
data_columns,
} = self;
(block_root, block, blobs)
(block_root, block, blobs, data_columns)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ pub struct ChildComponents<E: EthSpec> {

impl<E: EthSpec> From<RpcBlock<E>> for ChildComponents<E> {
fn from(value: RpcBlock<E>) -> Self {
let (block_root, block, blobs) = value.deconstruct();
let (block_root, block, blobs, data_columns) = value.deconstruct();
let fixed_blobs = blobs.map(|blobs| {
FixedBlobSidecarList::from(blobs.into_iter().map(Some).collect::<Vec<_>>())
});
Self::new(block_root, Some(block), fixed_blobs)
let fixed_data_columns = data_columns.map(|data_columns| {
FixedDataColumnSidecarList::from(data_columns.into_iter().map(Some).collect::<Vec<_>>())
});
Self::new(block_root, Some(block), fixed_blobs, fixed_data_columns)
}
}

Expand All @@ -40,6 +43,7 @@ impl<E: EthSpec> ChildComponents<E> {
block_root: Hash256,
block: Option<Arc<SignedBeaconBlock<E>>>,
blobs: Option<FixedBlobSidecarList<E>>,
data_columns: Option<FixedDataColumnSidecarList<E>>,
) -> Self {
let mut cache = Self::empty(block_root);
if let Some(block) = block {
Expand All @@ -48,6 +52,9 @@ impl<E: EthSpec> ChildComponents<E> {
if let Some(blobs) = blobs {
cache.merge_blobs(blobs);
}
if let Some(data_columns) = data_columns {
cache.merge_data_columns(data_columns);
}
cache
}

Expand Down
Loading

0 comments on commit 64061e2

Please sign in to comment.