diff --git a/Cargo.lock b/Cargo.lock index 25a423cc..97420aca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8956,6 +8956,7 @@ dependencies = [ "futures", "futures-locks", "humantime", + "itertools 0.13.0", "lazy_static", "near-indexer-primitives", "near-jsonrpc-client 0.15.1", diff --git a/tx-indexer/Cargo.toml b/tx-indexer/Cargo.toml index 32ce3497..0a1a805e 100644 --- a/tx-indexer/Cargo.toml +++ b/tx-indexer/Cargo.toml @@ -18,6 +18,7 @@ clap = { version = "4.5.16", features = ["derive"] } futures = "0.3.5" futures-locks = "0.7.1" humantime = "2.1.0" +itertools = "0.13.0" lazy_static = "1.4.0" prometheus = "0.13.1" tokio = { version = "1.36.0", features = [ diff --git a/tx-indexer/src/collector.rs b/tx-indexer/src/collector.rs index 4ef29c75..b8505bb4 100644 --- a/tx-indexer/src/collector.rs +++ b/tx-indexer/src/collector.rs @@ -1,8 +1,11 @@ -use near_indexer_primitives::IndexerTransactionWithOutcome; +use itertools::EitherOrBoth::{Both, Left, Right}; +use itertools::Itertools; use futures::{FutureExt, StreamExt}; use tokio_retry::{strategy::FixedInterval, Retry}; +use near_indexer_primitives::IndexerTransactionWithOutcome; + use crate::metrics; use crate::storage; @@ -98,12 +101,57 @@ async fn save_outcomes_and_receipts( Ok(()) } +/// Save receipts and outcomes to the DB +/// Split the receipts and outcomes into chunks of 1500 records and save them +/// It is necessary to split the records into chunks because Scylla has a limit per batch +/// We use 1500 records because we should be sure that the batch size is less than 50MB #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))] async fn save_receipts_and_outcomes_details( tx_details_storage: &std::sync::Arc, tx_collecting_storage: &std::sync::Arc, receipts: Vec, outcomes: Vec, +) { + let chunks: Vec<_> = receipts + .chunks(1500) + .zip_longest(outcomes.chunks(1500)) + .collect(); + + let mut tasks = futures::stream::FuturesUnordered::new(); + + for pair in chunks { + let task = match pair { + Both(receipts, outcomes) => save_chunks_receipts_and_outcomes_details( + tx_details_storage, + tx_collecting_storage, + receipts.to_vec(), + outcomes.to_vec(), + ), + Left(receipts) => save_chunks_receipts_and_outcomes_details( + tx_details_storage, + tx_collecting_storage, + receipts.to_vec(), + vec![], + ), + Right(outcomes) => save_chunks_receipts_and_outcomes_details( + tx_details_storage, + tx_collecting_storage, + vec![], + outcomes.to_vec(), + ), + }; + tasks.push(task); + } + + while tasks.next().await.is_some() {} +} + +#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))] +async fn save_chunks_receipts_and_outcomes_details( + tx_details_storage: &std::sync::Arc, + tx_collecting_storage: &std::sync::Arc, + receipts: Vec, + outcomes: Vec, ) { match save_outcome_and_receipt_to_scylla(tx_details_storage, receipts.clone(), outcomes.clone()) .await