diff --git a/Cargo.lock b/Cargo.lock index dd96f5146..a2b0f7e02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4069,6 +4069,7 @@ dependencies = [ "compat", "evm_arithmetization", "futures", + "lru", "mpt_trie", "primitive-types 0.12.2", "prover", diff --git a/Cargo.toml b/Cargo.toml index aff00a946..b64e627fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,6 +101,7 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } uint = "0.9.5" url = "2.5.2" +lru = "0.12.3" # zero-bin related dependencies ops = { path = "zero_bin/ops" } diff --git a/mpt_trie/src/trie_hashing.rs b/mpt_trie/src/trie_hashing.rs index 9502ebe1c..60890aae2 100644 --- a/mpt_trie/src/trie_hashing.rs +++ b/mpt_trie/src/trie_hashing.rs @@ -121,6 +121,7 @@ mod tests { const NUM_INSERTS_FOR_ETH_TRIE_CRATE_MASSIVE_TEST: usize = 1000; const NODES_PER_BRANCH_FOR_HASH_REPLACEMENT_TEST: usize = 200; + #[allow(dead_code)] #[derive(Copy, Clone, Debug)] struct U256Rlpable(U256); diff --git a/zero_bin/leader/src/client.rs b/zero_bin/leader/src/client.rs index 932e195cc..555bc74aa 100644 --- a/zero_bin/leader/src/client.rs +++ b/zero_bin/leader/src/client.rs @@ -34,12 +34,14 @@ pub(crate) async fn client_main( block_interval: BlockInterval, mut params: ProofParams, ) -> Result<()> { + let cached_provider = rpc::provider::CachedProvider::new(build_http_retry_provider( + rpc_params.rpc_url.clone(), + rpc_params.backoff, + rpc_params.max_retries, + )); + let prover_input = rpc::prover_input( - &build_http_retry_provider( - rpc_params.rpc_url, - rpc_params.backoff, - rpc_params.max_retries, - ), + &cached_provider, block_interval, params.checkpoint_block_number.into(), rpc_params.rpc_type, diff --git a/zero_bin/rpc/Cargo.toml b/zero_bin/rpc/Cargo.toml index 0cf1a2d6c..0bb5a2df3 100644 --- a/zero_bin/rpc/Cargo.toml +++ b/zero_bin/rpc/Cargo.toml @@ -23,6 +23,7 @@ futures = { workspace = true } url = { workspace = true } __compat_primitive_types = { workspace = true } tower = { workspace = true, features = ["retry"] } +lru = { workspace = true } # Local dependencies compat = { workspace = true } diff --git a/zero_bin/rpc/src/jerigon.rs b/zero_bin/rpc/src/jerigon.rs index d1d29ed00..6a04c331e 100644 --- a/zero_bin/rpc/src/jerigon.rs +++ b/zero_bin/rpc/src/jerigon.rs @@ -9,6 +9,7 @@ use trace_decoder::trace_protocol::{ }; use super::fetch_other_block_data; +use crate::provider::CachedProvider; /// Transaction traces retrieved from Erigon zeroTracer. #[derive(Debug, Deserialize)] @@ -23,7 +24,7 @@ pub struct ZeroTxResult { pub struct ZeroBlockWitness(TrieCompact); pub async fn block_prover_input( - provider: ProviderT, + cached_provider: &CachedProvider, target_block_id: BlockId, checkpoint_state_trie_root: B256, ) -> anyhow::Result @@ -32,7 +33,8 @@ where TransportT: Transport + Clone, { // Grab trace information - let tx_results = provider + let tx_results = cached_provider + .as_provider() .raw_request::<_, Vec>( "debug_traceBlockByNumber".into(), (target_block_id, json!({"tracer": "zeroTracer"})), @@ -40,12 +42,14 @@ where .await?; // Grab block witness info (packed as combined trie pre-images) - let block_witness = provider + let block_witness = cached_provider + .as_provider() .raw_request::<_, ZeroBlockWitness>("eth_getWitness".into(), vec![target_block_id]) .await?; let other_data = - fetch_other_block_data(provider, target_block_id, checkpoint_state_trie_root).await?; + fetch_other_block_data(cached_provider, target_block_id, checkpoint_state_trie_root) + .await?; // Assemble Ok(BlockProverInput { diff --git a/zero_bin/rpc/src/lib.rs b/zero_bin/rpc/src/lib.rs index 130b791e1..4f3646a45 100644 --- a/zero_bin/rpc/src/lib.rs +++ b/zero_bin/rpc/src/lib.rs @@ -15,8 +15,11 @@ use zero_bin_common::block_interval::BlockInterval; pub mod jerigon; pub mod native; +pub mod provider; pub mod retry; +use crate::provider::CachedProvider; + const PREVIOUS_HASHES_COUNT: usize = 256; /// The RPC type. @@ -28,7 +31,7 @@ pub enum RpcType { /// Obtain the prover input for a given block interval pub async fn prover_input( - provider: &ProviderT, + cached_provider: &CachedProvider, block_interval: BlockInterval, checkpoint_block_id: BlockId, rpc_type: RpcType, @@ -38,10 +41,9 @@ where TransportT: Transport + Clone, { // Grab interval checkpoint block state trie - let checkpoint_state_trie_root = provider + let checkpoint_state_trie_root = cached_provider .get_block(checkpoint_block_id, BlockTransactionsKind::Hashes) .await? - .context("block does not exist")? .header .state_root; @@ -52,10 +54,12 @@ where let block_id = BlockId::Number(BlockNumberOrTag::Number(block_num)); let block_prover_input = match rpc_type { RpcType::Jerigon => { - jerigon::block_prover_input(&provider, block_id, checkpoint_state_trie_root).await? + jerigon::block_prover_input(cached_provider, block_id, checkpoint_state_trie_root) + .await? } RpcType::Native => { - native::block_prover_input(&provider, block_id, checkpoint_state_trie_root).await? + native::block_prover_input(cached_provider, block_id, checkpoint_state_trie_root) + .await? } }; @@ -68,7 +72,7 @@ where /// Fetches other block data async fn fetch_other_block_data( - provider: ProviderT, + cached_provider: &CachedProvider, target_block_id: BlockId, checkpoint_state_trie_root: B256, ) -> anyhow::Result @@ -76,27 +80,35 @@ where ProviderT: Provider, TransportT: Transport + Clone, { - let target_block = provider + let target_block = cached_provider .get_block(target_block_id, BlockTransactionsKind::Hashes) - .await? - .context("target block does not exist")?; + .await?; let target_block_number = target_block .header .number .context("target block is missing field `number`")?; - let chain_id = provider.get_chain_id().await?; + let chain_id = cached_provider.as_provider().get_chain_id().await?; + + // For one block, we will fetch 128 previous blocks to get hashes instead of + // 256. But for two consecutive blocks (odd and even) we would fetch 256 + // previous blocks in total. To overcome this, we add an offset so that we + // always start fetching from an odd index and eventually skip the additional + // block for an even `target_block_number`. + let odd_offset: i128 = target_block_number as i128 % 2; let previous_block_numbers = - std::iter::successors(Some(target_block_number as i128 - 1), |&it| Some(it - 1)) - .take(PREVIOUS_HASHES_COUNT) - .filter(|i| *i >= 0) - .collect::>(); + std::iter::successors(Some(target_block_number as i128 - 1 + odd_offset), |&it| { + Some(it - 1) + }) + .take(PREVIOUS_HASHES_COUNT) + .filter(|i| *i >= 0) + .collect::>(); let concurrency = previous_block_numbers.len(); let collected_hashes = futures::stream::iter( previous_block_numbers .chunks(2) // we get hash for previous and current block with one request .map(|block_numbers| { - let provider = &provider; + let cached_provider = &cached_provider; let block_num = &block_numbers[0]; let previos_block_num = if block_numbers.len() > 1 { Some(block_numbers[1]) @@ -105,11 +117,10 @@ where None }; async move { - let block = provider + let block = cached_provider .get_block((*block_num as u64).into(), BlockTransactionsKind::Hashes) .await - .context("couldn't get block")? - .context("no such block")?; + .context("couldn't get block")?; anyhow::Ok([ (block.header.hash, Some(*block_num)), (Some(block.header.parent_hash), previos_block_num), @@ -126,6 +137,7 @@ where collected_hashes .into_iter() .flatten() + .skip(odd_offset as usize) .for_each(|(hash, block_num)| { if let (Some(hash), Some(block_num)) = (hash, block_num) { // Most recent previous block hash is expected at the end of the array diff --git a/zero_bin/rpc/src/main.rs b/zero_bin/rpc/src/main.rs index 8e26498a6..197e387de 100644 --- a/zero_bin/rpc/src/main.rs +++ b/zero_bin/rpc/src/main.rs @@ -2,6 +2,7 @@ use std::io; use alloy::rpc::types::eth::BlockId; use clap::{Parser, ValueHint}; +use rpc::provider::CachedProvider; use rpc::{retry::build_http_retry_provider, RpcType}; use tracing_subscriber::{prelude::*, EnvFilter}; use url::Url; @@ -53,9 +54,15 @@ impl Cli { checkpoint_block_number.unwrap_or((start_block - 1).into()); let block_interval = BlockInterval::Range(start_block..end_block + 1); + let cached_provider = CachedProvider::new(build_http_retry_provider( + rpc_url.clone(), + backoff, + max_retries, + )); + // Retrieve prover input from the Erigon node let prover_input = rpc::prover_input( - &build_http_retry_provider(rpc_url, backoff, max_retries), + &cached_provider, block_interval, checkpoint_block_number, rpc_type, diff --git a/zero_bin/rpc/src/native/mod.rs b/zero_bin/rpc/src/native/mod.rs index 75de3d5de..7d0af2de4 100644 --- a/zero_bin/rpc/src/native/mod.rs +++ b/zero_bin/rpc/src/native/mod.rs @@ -6,11 +6,12 @@ use alloy::{ rpc::types::eth::{BlockId, BlockTransactionsKind}, transports::Transport, }; -use anyhow::Context as _; use futures::try_join; use prover::BlockProverInput; use trace_decoder::trace_protocol::BlockTrace; +use crate::provider::CachedProvider; + mod state; mod txn; @@ -18,7 +19,7 @@ type CodeDb = HashMap<__compat_primitive_types::H256, Vec>; /// Fetches the prover input for the given BlockId. pub async fn block_prover_input( - provider: &ProviderT, + provider: &CachedProvider, block_number: BlockId, checkpoint_state_trie_root: B256, ) -> anyhow::Result @@ -27,8 +28,8 @@ where TransportT: Transport + Clone, { let (block_trace, other_data) = try_join!( - process_block_trace(&provider, block_number), - crate::fetch_other_block_data(&provider, block_number, checkpoint_state_trie_root,) + process_block_trace(provider, block_number), + crate::fetch_other_block_data(provider, block_number, checkpoint_state_trie_root,) )?; Ok(BlockProverInput { @@ -39,20 +40,20 @@ where /// Processes the block with the given block number and returns the block trace. async fn process_block_trace( - provider: &ProviderT, + cached_provider: &CachedProvider, block_number: BlockId, ) -> anyhow::Result where ProviderT: Provider, TransportT: Transport + Clone, { - let block = provider + let block = cached_provider .get_block(block_number, BlockTransactionsKind::Full) - .await? - .context("target block does not exist")?; + .await?; - let (code_db, txn_info) = txn::process_transactions(&block, provider).await?; - let trie_pre_images = state::process_state_witness(provider, block, &txn_info).await?; + let (code_db, txn_info) = + txn::process_transactions(&block, cached_provider.as_provider()).await?; + let trie_pre_images = state::process_state_witness(cached_provider, block, &txn_info).await?; Ok(BlockTrace { txn_info, diff --git a/zero_bin/rpc/src/native/state.rs b/zero_bin/rpc/src/native/state.rs index d69b48cb9..331647c82 100644 --- a/zero_bin/rpc/src/native/state.rs +++ b/zero_bin/rpc/src/native/state.rs @@ -14,11 +14,12 @@ use trace_decoder::trace_protocol::{ SeparateTriePreImages, TrieDirect, TxnInfo, }; +use crate::provider::CachedProvider; use crate::Compat; /// Processes the state witness for the given block. pub async fn process_state_witness( - provider: &ProviderT, + cached_provider: &CachedProvider, block: Block, txn_infos: &[TxnInfo], ) -> anyhow::Result @@ -32,15 +33,15 @@ where .header .number .context("Block number not returned with block")?; - let prev_state_root = provider + let prev_state_root = cached_provider .get_block((block_number - 1).into(), BlockTransactionsKind::Hashes) .await? - .context("Failed to get previous block")? .header .state_root; let (state, storage_proofs) = - generate_state_witness(prev_state_root, state_access, provider, block_number).await?; + generate_state_witness(prev_state_root, state_access, cached_provider, block_number) + .await?; Ok(BlockTraceTriePreImages::Separate(SeparateTriePreImages { state: SeparateTriePreImage::Direct(TrieDirect(state.build())), @@ -97,7 +98,7 @@ pub fn process_states_access( async fn generate_state_witness( prev_state_root: B256, accounts_state: HashMap>, - provider: &ProviderT, + cached_provider: &CachedProvider, block_number: u64, ) -> anyhow::Result<( PartialTrieBuilder, @@ -111,7 +112,7 @@ where let mut storage_proofs = HashMap::>::new(); let (account_proofs, next_account_proofs) = - fetch_proof_data(accounts_state, provider, block_number).await?; + fetch_proof_data(accounts_state, cached_provider, block_number).await?; // Insert account proofs for (address, proof) in account_proofs.into_iter() { @@ -146,7 +147,7 @@ where /// Fetches the proof data for the given accounts and associated storage keys. async fn fetch_proof_data( accounts_state: HashMap>, - provider: &ProviderT, + provider: &CachedProvider, block_number: u64, ) -> anyhow::Result<( Vec<(Address, EIP1186AccountProofResponse)>, @@ -161,6 +162,7 @@ where .into_iter() .map(|(address, keys)| async move { let proof = provider + .as_provider() .get_proof(address, keys.into_iter().collect()) .block_id((block_number - 1).into()) .await @@ -173,6 +175,7 @@ where .into_iter() .map(|(address, keys)| async move { let proof = provider + .as_provider() .get_proof(address, keys.into_iter().collect()) .block_id(block_number.into()) .await diff --git a/zero_bin/rpc/src/native/txn.rs b/zero_bin/rpc/src/native/txn.rs index 9e1e8721a..40af45523 100644 --- a/zero_bin/rpc/src/native/txn.rs +++ b/zero_bin/rpc/src/native/txn.rs @@ -40,7 +40,7 @@ where .as_transactions() .context("No transactions in block")? .iter() - .map(|tx| super::txn::process_transaction(provider, tx)) + .map(|tx| process_transaction(provider, tx)) .collect::>() .try_fold( (HashMap::new(), Vec::new()), diff --git a/zero_bin/rpc/src/provider.rs b/zero_bin/rpc/src/provider.rs new file mode 100644 index 000000000..fc782ff43 --- /dev/null +++ b/zero_bin/rpc/src/provider.rs @@ -0,0 +1,94 @@ +use std::sync::Arc; + +use alloy::primitives::BlockHash; +use alloy::rpc::types::{Block, BlockId, BlockTransactionsKind}; +use alloy::{providers::Provider, transports::Transport}; +use anyhow::Context; +use tokio::sync::Mutex; + +const CACHE_SIZE: usize = 1024; + +/// Wrapper around alloy provider to cache blocks and other +/// frequently used data. +pub struct CachedProvider { + provider: ProviderT, + blocks_by_number: Arc>>, + blocks_by_hash: Arc>>, + _phantom: std::marker::PhantomData, +} + +impl CachedProvider +where + ProviderT: Provider, + TransportT: Transport + Clone, +{ + pub fn new(provider: ProviderT) -> Self { + Self { + provider, + blocks_by_number: Arc::new(Mutex::new(lru::LruCache::new( + std::num::NonZero::new(CACHE_SIZE).unwrap(), + ))), + blocks_by_hash: Arc::new(Mutex::new(lru::LruCache::new( + std::num::NonZero::new(CACHE_SIZE).unwrap(), + ))), + _phantom: std::marker::PhantomData, + } + } + + pub fn as_mut_provider(&mut self) -> &mut ProviderT { + &mut self.provider + } + + pub fn as_provider(&self) -> &ProviderT { + &self.provider + } + + /// Retrieves block by number or hash, caching it if it's not already + /// cached. + pub async fn get_block( + &self, + id: BlockId, + kind: BlockTransactionsKind, + ) -> anyhow::Result { + let cached_block = match id { + BlockId::Hash(hash) => { + let block_num = self + .blocks_by_hash + .lock() + .await + .get(&hash.block_hash) + .copied(); + if let Some(block_num) = block_num { + self.blocks_by_number.lock().await.get(&block_num).cloned() + } else { + None + } + } + BlockId::Number(alloy::rpc::types::BlockNumberOrTag::Number(number)) => { + self.blocks_by_number.lock().await.get(&number).cloned() + } + _ => None, + }; + + if let Some(block) = cached_block { + Ok(block) + } else { + let block = self + .provider + .get_block(id, kind) + .await? + .context(format!("target block {:?} does not exist", id))?; + + if let Some(block_num) = block.header.number { + self.blocks_by_number + .lock() + .await + .put(block_num, block.clone()); + if let Some(hash) = block.header.hash { + self.blocks_by_hash.lock().await.put(hash, block_num); + } + } + Ok(block) + } + } +}