Skip to content

Commit

Permalink
Merge remote-tracking branch 'sigp/epoch-single-pass' into tree-states
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Feb 23, 2024
1 parent 20f53e7 commit a5d3408
Show file tree
Hide file tree
Showing 21 changed files with 126 additions and 111 deletions.
54 changes: 36 additions & 18 deletions beacon_node/beacon_chain/src/attestation_rewards.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
use eth2::lighthouse::attestation_rewards::{IdealAttestationRewards, TotalAttestationRewards};
use eth2::lighthouse::StandardAttestationRewards;
use participation_cache::ParticipationCache;
use safe_arith::SafeArith;
use serde_utils::quoted_u64::Quoted;
use slog::debug;
Expand All @@ -10,7 +9,7 @@ use state_processing::per_epoch_processing::altair::{
};
use state_processing::{
common::altair::BaseRewardPerIncrement,
per_epoch_processing::altair::{participation_cache, rewards_and_penalties::get_flag_weight},
per_epoch_processing::altair::rewards_and_penalties::get_flag_weight,
};
use std::collections::HashMap;
use store::consts::altair::{
Expand Down Expand Up @@ -134,8 +133,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let spec = &self.spec;

// Calculate ideal_rewards
let participation_cache = ParticipationCache::new(&state, spec)
.map_err(|_| BeaconChainError::AttestationRewardsError)?;
process_justification_and_finalization(&state)?.apply_changes_to_state(&mut state);
process_inactivity_updates_slow(&mut state, spec)?;

Expand All @@ -147,14 +144,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let weight = get_flag_weight(flag_index)
.map_err(|_| BeaconChainError::AttestationRewardsError)?;

let unslashed_participating_balance = participation_cache
.previous_epoch_flag_attesting_balance(flag_index)
.map_err(|_| BeaconChainError::AttestationRewardsError)?;
let unslashed_participating_balance = state
.progressive_balances_cache()
.previous_epoch_flag_attesting_balance(flag_index)?;

let unslashed_participating_increments =
unslashed_participating_balance.safe_div(spec.effective_balance_increment)?;

let total_active_balance = participation_cache.current_epoch_total_active_balance();
let total_active_balance = state.get_total_active_balance()?;

let active_increments =
total_active_balance.safe_div(spec.effective_balance_increment)?;
Expand Down Expand Up @@ -190,15 +187,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let mut total_rewards: Vec<TotalAttestationRewards> = Vec::new();

let validators = if validators.is_empty() {
participation_cache.eligible_validator_indices().to_vec()
Self::all_eligible_validator_indices(&state, previous_epoch)?
} else {
Self::validators_ids_to_indices(&mut state, validators)?
};

for &validator_index in &validators {
// Return 0s for unknown/inactive validator indices. This is a bit different from stable
// where we error for unknown pubkeys.
let Ok(validator) = participation_cache.get_validator(validator_index) else {
let Ok(validator) = state.get_validator(validator_index) else {
debug!(
self.log,
"No rewards for inactive/unknown validator";
Expand All @@ -215,22 +212,25 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
});
continue;
};
let eligible = validator.is_eligible;
let previous_epoch_participation_flags = state
.previous_epoch_participation()?
.get(validator_index)
.ok_or(BeaconChainError::AttestationRewardsError)?;
let eligible = state.is_eligible_validator(previous_epoch, validator)?;
let mut head_reward = 0i64;
let mut target_reward = 0i64;
let mut source_reward = 0i64;
let mut inactivity_penalty = 0i64;

if eligible {
let effective_balance = validator.effective_balance;
let effective_balance = validator.effective_balance();

for flag_index in 0..PARTICIPATION_FLAG_WEIGHTS.len() {
let (ideal_reward, penalty) = ideal_rewards_hashmap
.get(&(flag_index, effective_balance))
.ok_or(BeaconChainError::AttestationRewardsError)?;
let voted_correctly = validator
.is_unslashed_participating_index(flag_index)
.map_err(|_| BeaconChainError::AttestationRewardsError)?;
let voted_correctly = !validator.slashed()
&& previous_epoch_participation_flags.has_flag(flag_index)?;
if voted_correctly {
if flag_index == TIMELY_HEAD_FLAG_INDEX {
head_reward += *ideal_reward as i64;
Expand All @@ -246,9 +246,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

let penalty_numerator = effective_balance
.safe_mul(state.get_inactivity_score(validator_index)?)?;
let penalty_denominator = spec
.inactivity_score_bias
.safe_mul(spec.inactivity_penalty_quotient_for_state(&state))?;
let penalty_denominator = spec.inactivity_score_bias.safe_mul(
spec.inactivity_penalty_quotient_for_fork(state.fork_name_unchecked()),
)?;
inactivity_penalty =
-(penalty_numerator.safe_div(penalty_denominator)? as i64);
} else if flag_index == TIMELY_SOURCE_FLAG_INDEX {
Expand Down Expand Up @@ -314,6 +314,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(max_steps)
}

fn all_eligible_validator_indices(
state: &BeaconState<T::EthSpec>,
previous_epoch: Epoch,
) -> Result<Vec<usize>, BeaconChainError> {
state
.validators()
.iter()
.enumerate()
.filter_map(|(i, validator)| {
state
.is_eligible_validator(previous_epoch, validator)
.map(|eligible| eligible.then_some(i))
.map_err(BeaconChainError::BeaconStateError)
.transpose()
})
.collect()
}

fn validators_ids_to_indices(
state: &mut BeaconState<T::EthSpec>,
validators: Vec<ValidatorId>,
Expand Down
8 changes: 0 additions & 8 deletions beacon_node/http_api/src/attestation_performance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use eth2::lighthouse::{
AttestationPerformance, AttestationPerformanceQuery, AttestationPerformanceStatistics,
};
use state_processing::{
per_epoch_processing::altair::participation_cache::Error as ParticipationCacheError,
per_epoch_processing::EpochProcessingSummary, BlockReplayError, BlockReplayer,
};
use std::sync::Arc;
Expand All @@ -18,7 +17,6 @@ const BLOCK_ROOT_CHUNK_SIZE: usize = 100;
enum AttestationPerformanceError {
BlockReplay(#[allow(dead_code)] BlockReplayError),
BeaconState(#[allow(dead_code)] BeaconStateError),
ParticipationCache(#[allow(dead_code)] ParticipationCacheError),
UnableToFindValidator(#[allow(dead_code)] usize),
}

Expand All @@ -34,12 +32,6 @@ impl From<BeaconStateError> for AttestationPerformanceError {
}
}

impl From<ParticipationCacheError> for AttestationPerformanceError {
fn from(e: ParticipationCacheError) -> Self {
Self::ParticipationCache(e)
}
}

pub fn get_attestation_performance<T: BeaconChainTypes>(
target: String,
query: AttestationPerformanceQuery,
Expand Down
1 change: 0 additions & 1 deletion beacon_node/http_api/src/validator_inclusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ pub fn global_validator_inclusion_data<T: BeaconChainTypes>(

Ok(GlobalValidatorInclusionData {
current_epoch_active_gwei: summary.current_epoch_total_active_balance(),
previous_epoch_active_gwei: summary.previous_epoch_total_active_balance(),
current_epoch_target_attesting_gwei: summary
.current_epoch_target_attesting_balance()
.map_err(convert_cache_error)?,
Expand Down
2 changes: 0 additions & 2 deletions common/eth2/src/lighthouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ pub struct Peer<T: EthSpec> {
pub struct GlobalValidatorInclusionData {
/// The total effective balance of all active validators during the _current_ epoch.
pub current_epoch_active_gwei: u64,
/// The total effective balance of all active validators during the _previous_ epoch.
pub previous_epoch_active_gwei: u64,
/// The total effective balance of all validators who attested during the _current_ epoch and
/// agreed with the state about the beacon block at the first slot of the _current_ epoch.
pub current_epoch_target_attesting_gwei: u64,
Expand Down
6 changes: 5 additions & 1 deletion consensus/fork_choice/src/fork_choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,11 @@ where
| BeaconBlockRef::Capella(_)
| BeaconBlockRef::Merge(_)
| BeaconBlockRef::Altair(_) => {
// FIXME(sproul): initialize progressive balances
// NOTE: Processing justification & finalization requires the progressive
// balances cache, but we cannot initialize it here as we only have an
// immutable reference. The state *should* have come straight from block
// processing, which initialises the cache, but if we add other `on_block`
// calls in future it could be worth passing a mutable reference.
per_epoch_processing::altair::process_justification_and_finalization(state)?
}
BeaconBlockRef::Base(_) => {
Expand Down
2 changes: 1 addition & 1 deletion consensus/state_processing/src/all_caches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl<E: EthSpec> AllCaches for BeaconState<E> {
fn build_all_caches(&mut self, spec: &ChainSpec) -> Result<(), EpochCacheError> {
self.build_caches(spec)?;
initialize_epoch_cache(self, spec)?;
initialize_progressive_balances_cache(self, None, spec)?;
initialize_progressive_balances_cache(self, spec)?;
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,16 @@ use crate::metrics::{
PARTICIPATION_CURR_EPOCH_TARGET_ATTESTING_GWEI_PROGRESSIVE_TOTAL,
PARTICIPATION_PREV_EPOCH_TARGET_ATTESTING_GWEI_PROGRESSIVE_TOTAL,
};
use crate::per_epoch_processing::altair::ParticipationCache;
use crate::{BlockProcessingError, EpochProcessingError};
use lighthouse_metrics::set_gauge;
use std::borrow::Cow;
use types::{
is_progressive_balances_enabled, BeaconState, BeaconStateError, ChainSpec, Epoch,
EpochTotalBalances, EthSpec, ProgressiveBalancesCache,
EpochTotalBalances, EthSpec, ParticipationFlags, ProgressiveBalancesCache, Validator,
};

/// Initializes the `ProgressiveBalancesCache` cache using balance values from the
/// `ParticipationCache`. If the optional `&ParticipationCache` is not supplied, it will be computed
/// from the `BeaconState`.
/// Initializes the `ProgressiveBalancesCache` if it is unbuilt.
pub fn initialize_progressive_balances_cache<E: EthSpec>(
state: &mut BeaconState<E>,
maybe_participation_cache: Option<&ParticipationCache>,
spec: &ChainSpec,
) -> Result<(), BeaconStateError> {
if !is_progressive_balances_enabled(state)
Expand All @@ -26,29 +21,37 @@ pub fn initialize_progressive_balances_cache<E: EthSpec>(
return Ok(());
}

// FIXME(sproul): simplify the participation cache
let participation_cache = match maybe_participation_cache {
Some(cache) => Cow::Borrowed(cache),
None => {
state.build_total_active_balance_cache_at(state.current_epoch(), spec)?;
Cow::Owned(
ParticipationCache::new(state, spec)
.map_err(|e| BeaconStateError::ParticipationCacheError(format!("{e:?}")))?,
)
// Calculate the total flag balances for previous & current epoch in a single iteration.
// This calculates `get_total_balance(unslashed_participating_indices(..))` for each flag in
// the current and previous epoch.
let current_epoch = state.current_epoch();
let previous_epoch = state.previous_epoch();
let mut previous_epoch_cache = EpochTotalBalances::new(spec);
let mut current_epoch_cache = EpochTotalBalances::new(spec);
for ((validator, current_epoch_flags), previous_epoch_flags) in state
.validators()
.iter()
.zip(state.current_epoch_participation()?)
.zip(state.previous_epoch_participation()?)
{
// Exclude slashed validators. We are calculating *unslashed* participating totals.
if validator.slashed() {
continue;
}
};

let current_epoch = state.current_epoch();
let previous_epoch_cache = EpochTotalBalances {
total_flag_balances: participation_cache
.previous_epoch_participation
.total_flag_balances,
};
let current_epoch_cache = EpochTotalBalances {
total_flag_balances: participation_cache
.current_epoch_participation
.total_flag_balances,
};
// Update current epoch flag balances.
if validator.is_active_at(current_epoch) {
update_flag_total_balances(&mut current_epoch_cache, *current_epoch_flags, validator)?;
}
// Update previous epoch flag balances.
if validator.is_active_at(previous_epoch) {
update_flag_total_balances(
&mut previous_epoch_cache,
*previous_epoch_flags,
validator,
)?;
}
}

state.progressive_balances_cache_mut().initialize(
current_epoch,
Expand All @@ -61,6 +64,26 @@ pub fn initialize_progressive_balances_cache<E: EthSpec>(
Ok(())
}

/// During the initialization of the progressive balances for a single epoch, add
/// `validator.effective_balance` to the flag total, for each flag present in `participation_flags`.
///
/// Pre-conditions:
///
/// - `validator` must not be slashed
/// - the `participation_flags` must be for `validator` in the same epoch as the `total_balances`
fn update_flag_total_balances(
total_balances: &mut EpochTotalBalances,
participation_flags: ParticipationFlags,
validator: &Validator,
) -> Result<(), BeaconStateError> {
for (flag, balance) in total_balances.total_flag_balances.iter_mut().enumerate() {
if participation_flags.has_flag(flag)? {
balance.safe_add_assign(validator.effective_balance())?;
}
}
Ok(())
}

/// Updates the `ProgressiveBalancesCache` when a new target attestation has been processed.
pub fn update_progressive_balances_on_attestation<T: EthSpec>(
state: &mut BeaconState<T>,
Expand Down
2 changes: 1 addition & 1 deletion consensus/state_processing/src/per_block_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub fn per_block_processing<T: EthSpec, Payload: AbstractExecPayload<T>>(

// Build epoch cache if it hasn't already been built, or if it is no longer valid
initialize_epoch_cache(state, spec)?;
initialize_progressive_balances_cache(state, None, spec)?;
initialize_progressive_balances_cache(state, spec)?;
state.build_slashings_cache()?;

let verify_signatures = match block_signature_strategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use crate::{signature_sets::sync_aggregate_signature_set, VerifySignatures};
use safe_arith::SafeArith;
use std::borrow::Cow;
use types::consts::altair::{PROPOSER_WEIGHT, SYNC_REWARD_WEIGHT, WEIGHT_DENOMINATOR};
use types::{BeaconState, ChainSpec, EthSpec, PublicKeyBytes, SyncAggregate, Unsigned};
use types::{
BeaconState, BeaconStateError, ChainSpec, EthSpec, PublicKeyBytes, SyncAggregate, Unsigned,
};

pub fn process_sync_aggregate<T: EthSpec>(
state: &mut BeaconState<T>,
Expand Down Expand Up @@ -47,20 +49,34 @@ pub fn process_sync_aggregate<T: EthSpec>(
// Apply participant and proposer rewards
let committee_indices = state.get_sync_committee_indices(&current_sync_committee)?;

let mut total_proposer_reward = 0;
let proposer_index = proposer_index as usize;
let mut proposer_balance = *state
.balances()
.get(proposer_index)
.ok_or(BeaconStateError::BalancesOutOfBounds(proposer_index))?;

for (participant_index, participation_bit) in committee_indices
.into_iter()
.zip(aggregate.sync_committee_bits.iter())
{
// FIXME(sproul): double-check this for Capella, proposer shouldn't have 0 effective balance
if participation_bit {
increase_balance(state, participant_index, participant_reward)?;
total_proposer_reward.safe_add_assign(proposer_reward)?;
// Accumulate proposer rewards in a temp var in case the proposer has very low balance, is
// part of the sync committee, does not participate and its penalties saturate.
if participant_index == proposer_index {
proposer_balance.safe_add_assign(participant_reward)?;
} else {
increase_balance(state, participant_index, participant_reward)?;
}
proposer_balance.safe_add_assign(proposer_reward)?;
} else if participant_index == proposer_index {
proposer_balance = proposer_balance.saturating_sub(participant_reward);
} else {
decrease_balance(state, participant_index, participant_reward)?;
}
}
increase_balance(state, proposer_index as usize, total_proposer_reward)?;

*state.get_balance_mut(proposer_index)? = proposer_balance;

Ok(())
}
Expand Down
9 changes: 0 additions & 9 deletions consensus/state_processing/src/per_block_processing/errors.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use super::signature_sets::Error as SignatureSetError;
use crate::per_epoch_processing::altair::participation_cache;
use crate::{ContextError, EpochCacheError};
use merkle_proof::MerkleTreeError;
use participation_cache::Error as ParticipationCacheError;
use safe_arith::ArithError;
use ssz::DecodeError;
use types::*;
Expand Down Expand Up @@ -91,7 +89,6 @@ pub enum BlockProcessingError {
found: Hash256,
},
WithdrawalCredentialsInvalid,
ParticipationCacheError(ParticipationCacheError),
}

impl From<BeaconStateError> for BlockProcessingError {
Expand Down Expand Up @@ -161,12 +158,6 @@ impl From<BlockOperationError<HeaderInvalid>> for BlockProcessingError {
}
}

impl From<ParticipationCacheError> for BlockProcessingError {
fn from(e: ParticipationCacheError) -> Self {
BlockProcessingError::ParticipationCacheError(e)
}
}

/// A conversion that consumes `self` and adds an `index` variable to resulting struct.
///
/// Used here to allow converting an error into an upstream error that points to the object that
Expand Down
Loading

0 comments on commit a5d3408

Please sign in to comment.