Skip to content

Commit

Permalink
feat(etl): add stream length checks to prevent overflow
Browse files Browse the repository at this point in the history
  • Loading branch information
armyhaylenko committed Feb 10, 2025
1 parent bae5e43 commit 854212e
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 17 deletions.
2 changes: 1 addition & 1 deletion plerkle/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "plerkle"
description = "Geyser plugin with dynamic config reloading, message bus agnostic abstractions and a whole lot of fun."
version = "1.10.0"
version = "1.11.0"
authors = ["Metaplex Developers <dev@metaplex.com>"]
repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin"
license = "AGPL-3.0"
Expand Down
2 changes: 1 addition & 1 deletion plerkle_messenger/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "plerkle_messenger"
description = "Metaplex Messenger trait for Geyser plugin producer/consumer patterns."
version = "1.10.0"
version = "1.11.0"
authors = ["Metaplex Developers <dev@metaplex.com>"]
repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin"
license = "AGPL-3.0"
Expand Down
7 changes: 7 additions & 0 deletions plerkle_messenger/src/redis/redis_messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ impl RedisMessenger {
}
Ok(())
}

pub async fn stream_len(&mut self, stream_key: &'static str) -> Result<u64, MessengerError> {
Ok(self.connection.xlen(stream_key).await.map_err(|e| {
error!("Failed to read stream length: {}", e);
MessengerError::ConnectionError { msg: e.to_string() }
})?)
}
}

#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion plerkle_snapshot/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
# Renamed from original "solana-snapshot-etl"
name = "plerkle_snapshot"
version = "0.5.0"
version = "0.6.0"
edition = "2021"
license = "Apache-2.0"
documentation = "https://docs.rs/solana-snapshot-etl"
Expand Down
44 changes: 36 additions & 8 deletions plerkle_snapshot/src/bin/solana-snapshot-etl/geyser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,29 @@ use plerkle_serialization::serializer::serialize_account;
use plerkle_snapshot::append_vec::StoredMeta;
use solana_sdk::account::{Account, AccountSharedData, ReadableAccount};
use std::error::Error;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::sync::Mutex;

use crate::accounts_selector::AccountsSelector;

const ACCOUNT_STREAM_KEY: &str = "ACC";
// the upper limit of accounts stream length for when the snapshot is in progress
const MAX_INTERMEDIATE_STREAM_LEN: u64 = 50_000_000;
// every PROCESSED_CHECKPOINT we check the stream length and reset the local stream_counter
const PROCESSED_CHECKPOINT: u64 = 20_000_000;

#[derive(Clone)]
pub(crate) struct GeyserDumper {
messenger: Arc<Mutex<RedisMessenger>>,
throttle_nanos: u64,
accounts_selector: AccountsSelector,
pub accounts_spinner: ProgressBar,
pub accounts_count: Arc<AtomicU64>,
/// how many accounts were processed in total during the snapshot run.
pub accounts_count: u64,
/// intermediate counter of accounts sent to regulate XLEN checks.
/// the reason for a separate field is that we initialize it as the current
/// stream length, which might be non-zero.
pub stream_counter: u64,
}

impl GeyserDumper {
Expand Down Expand Up @@ -61,13 +68,18 @@ impl GeyserDumper {
messenger
.set_buffer_size(ACCOUNT_STREAM_KEY, 100_000_000)
.await;
let initial_stream_len = messenger
.stream_len(&ACCOUNT_STREAM_KEY)
.await
.expect("get initial stream len of accounts");

Self {
messenger: Arc::new(Mutex::new(messenger)),
accounts_spinner,
accounts_selector,
accounts_count: Arc::new(AtomicU64::new(0)),
accounts_count: 0,
throttle_nanos,
stream_counter: initial_stream_len,
}
}

Expand All @@ -76,6 +88,21 @@ impl GeyserDumper {
(meta, account): (StoredMeta, AccountSharedData),
slot: u64,
) -> Result<(), Box<dyn Error>> {
if self.stream_counter >= PROCESSED_CHECKPOINT {
loop {
let stream_len = self
.messenger
.lock()
.await
.stream_len(ACCOUNT_STREAM_KEY)
.await?;
if stream_len >= MAX_INTERMEDIATE_STREAM_LEN {
self.stream_counter = 0;
break;
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
if self
.accounts_selector
.is_account_selected(meta.pubkey.as_ref(), account.owner().as_ref())
Expand Down Expand Up @@ -116,18 +143,19 @@ impl GeyserDumper {
return Ok(());
}

let prev = self.accounts_count.fetch_add(1, Ordering::Relaxed);
self.accounts_spinner.set_position(prev + 1);
self.accounts_count += 1;
self.accounts_spinner.set_position(self.accounts_count);
self.stream_counter += 1;

if self.throttle_nanos > 0 {
tokio::time::sleep(std::time::Duration::from_nanos(self.throttle_nanos)).await;
}

Ok(())
}

pub async fn force_flush(self) {
self.accounts_spinner
.set_position(self.accounts_count.load(Ordering::Relaxed));
self.accounts_spinner.set_position(self.accounts_count);
self.accounts_spinner
.finish_with_message("Finished processing snapshot!");
let messenger_mutex = Arc::into_inner(self.messenger)
Expand Down
7 changes: 1 addition & 6 deletions plerkle_snapshot/src/bin/solana-snapshot-etl/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}

info!(
"Done! Accounts: {}",
dumper
.accounts_count
.load(std::sync::atomic::Ordering::Relaxed)
);
info!("Done! Accounts: {}", dumper.accounts_count);

dumper.force_flush().await;

Expand Down

0 comments on commit 854212e

Please sign in to comment.