From 854212ed8d58b4a50ed4c83897d6babb200cbc5e Mon Sep 17 00:00:00 2001 From: armyhaylenko Date: Mon, 10 Feb 2025 15:29:39 +0200 Subject: [PATCH] feat(etl): add stream length checks to prevent overflow --- plerkle/Cargo.toml | 2 +- plerkle_messenger/Cargo.toml | 2 +- .../src/redis/redis_messenger.rs | 7 +++ plerkle_snapshot/Cargo.toml | 2 +- .../src/bin/solana-snapshot-etl/geyser.rs | 44 +++++++++++++++---- .../src/bin/solana-snapshot-etl/main.rs | 7 +-- 6 files changed, 47 insertions(+), 17 deletions(-) diff --git a/plerkle/Cargo.toml b/plerkle/Cargo.toml index 19c67351..eba49dda 100644 --- a/plerkle/Cargo.toml +++ b/plerkle/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "plerkle" description = "Geyser plugin with dynamic config reloading, message bus agnostic abstractions and a whole lot of fun." -version = "1.10.0" +version = "1.11.0" authors = ["Metaplex Developers "] repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin" license = "AGPL-3.0" diff --git a/plerkle_messenger/Cargo.toml b/plerkle_messenger/Cargo.toml index c8aa9400..076f3b7a 100644 --- a/plerkle_messenger/Cargo.toml +++ b/plerkle_messenger/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "plerkle_messenger" description = "Metaplex Messenger trait for Geyser plugin producer/consumer patterns." -version = "1.10.0" +version = "1.11.0" authors = ["Metaplex Developers "] repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin" license = "AGPL-3.0" diff --git a/plerkle_messenger/src/redis/redis_messenger.rs b/plerkle_messenger/src/redis/redis_messenger.rs index d3c54edc..69de8046 100644 --- a/plerkle_messenger/src/redis/redis_messenger.rs +++ b/plerkle_messenger/src/redis/redis_messenger.rs @@ -245,6 +245,13 @@ impl RedisMessenger { } Ok(()) } + + pub async fn stream_len(&mut self, stream_key: &'static str) -> Result { + Ok(self.connection.xlen(stream_key).await.map_err(|e| { + error!("Failed to read stream length: {}", e); + MessengerError::ConnectionError { msg: e.to_string() } + })?) + } } #[async_trait] diff --git a/plerkle_snapshot/Cargo.toml b/plerkle_snapshot/Cargo.toml index 11ab767f..60feedc7 100644 --- a/plerkle_snapshot/Cargo.toml +++ b/plerkle_snapshot/Cargo.toml @@ -1,7 +1,7 @@ [package] # Renamed from original "solana-snapshot-etl" name = "plerkle_snapshot" -version = "0.5.0" +version = "0.6.0" edition = "2021" license = "Apache-2.0" documentation = "https://docs.rs/solana-snapshot-etl" diff --git a/plerkle_snapshot/src/bin/solana-snapshot-etl/geyser.rs b/plerkle_snapshot/src/bin/solana-snapshot-etl/geyser.rs index bed148bc..e097d3a7 100644 --- a/plerkle_snapshot/src/bin/solana-snapshot-etl/geyser.rs +++ b/plerkle_snapshot/src/bin/solana-snapshot-etl/geyser.rs @@ -9,14 +9,16 @@ use plerkle_serialization::serializer::serialize_account; use plerkle_snapshot::append_vec::StoredMeta; use solana_sdk::account::{Account, AccountSharedData, ReadableAccount}; use std::error::Error; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering; use std::sync::Arc; use tokio::sync::Mutex; use crate::accounts_selector::AccountsSelector; const ACCOUNT_STREAM_KEY: &str = "ACC"; +// the upper limit of accounts stream length for when the snapshot is in progress +const MAX_INTERMEDIATE_STREAM_LEN: u64 = 50_000_000; +// every PROCESSED_CHECKPOINT we check the stream length and reset the local stream_counter +const PROCESSED_CHECKPOINT: u64 = 20_000_000; #[derive(Clone)] pub(crate) struct GeyserDumper { @@ -24,7 +26,12 @@ pub(crate) struct GeyserDumper { throttle_nanos: u64, accounts_selector: AccountsSelector, pub accounts_spinner: ProgressBar, - pub accounts_count: Arc, + /// how many accounts were processed in total during the snapshot run. + pub accounts_count: u64, + /// intermediate counter of accounts sent to regulate XLEN checks. + /// the reason for a separate field is that we initialize it as the current + /// stream length, which might be non-zero. + pub stream_counter: u64, } impl GeyserDumper { @@ -61,13 +68,18 @@ impl GeyserDumper { messenger .set_buffer_size(ACCOUNT_STREAM_KEY, 100_000_000) .await; + let initial_stream_len = messenger + .stream_len(&ACCOUNT_STREAM_KEY) + .await + .expect("get initial stream len of accounts"); Self { messenger: Arc::new(Mutex::new(messenger)), accounts_spinner, accounts_selector, - accounts_count: Arc::new(AtomicU64::new(0)), + accounts_count: 0, throttle_nanos, + stream_counter: initial_stream_len, } } @@ -76,6 +88,21 @@ impl GeyserDumper { (meta, account): (StoredMeta, AccountSharedData), slot: u64, ) -> Result<(), Box> { + if self.stream_counter >= PROCESSED_CHECKPOINT { + loop { + let stream_len = self + .messenger + .lock() + .await + .stream_len(ACCOUNT_STREAM_KEY) + .await?; + if stream_len >= MAX_INTERMEDIATE_STREAM_LEN { + self.stream_counter = 0; + break; + } + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } if self .accounts_selector .is_account_selected(meta.pubkey.as_ref(), account.owner().as_ref()) @@ -116,18 +143,19 @@ impl GeyserDumper { return Ok(()); } - let prev = self.accounts_count.fetch_add(1, Ordering::Relaxed); - self.accounts_spinner.set_position(prev + 1); + self.accounts_count += 1; + self.accounts_spinner.set_position(self.accounts_count); + self.stream_counter += 1; if self.throttle_nanos > 0 { tokio::time::sleep(std::time::Duration::from_nanos(self.throttle_nanos)).await; } + Ok(()) } pub async fn force_flush(self) { - self.accounts_spinner - .set_position(self.accounts_count.load(Ordering::Relaxed)); + self.accounts_spinner.set_position(self.accounts_count); self.accounts_spinner .finish_with_message("Finished processing snapshot!"); let messenger_mutex = Arc::into_inner(self.messenger) diff --git a/plerkle_snapshot/src/bin/solana-snapshot-etl/main.rs b/plerkle_snapshot/src/bin/solana-snapshot-etl/main.rs index 5a14aa8e..3f6ca707 100644 --- a/plerkle_snapshot/src/bin/solana-snapshot-etl/main.rs +++ b/plerkle_snapshot/src/bin/solana-snapshot-etl/main.rs @@ -71,12 +71,7 @@ async fn main() -> Result<(), Box> { } } - info!( - "Done! Accounts: {}", - dumper - .accounts_count - .load(std::sync::atomic::Ordering::Relaxed) - ); + info!("Done! Accounts: {}", dumper.accounts_count); dumper.force_flush().await;