Skip to content

Commit

Permalink
save receipts_and_outcomes split by chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
kobayurii committed Feb 25, 2025
1 parent d19c142 commit 10e15e5
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tx-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ clap = { version = "4.5.16", features = ["derive"] }
futures = "0.3.5"
futures-locks = "0.7.1"
humantime = "2.1.0"
itertools = "0.13.0"
lazy_static = "1.4.0"
prometheus = "0.13.1"
tokio = { version = "1.36.0", features = [
Expand Down
50 changes: 49 additions & 1 deletion tx-indexer/src/collector.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use near_indexer_primitives::IndexerTransactionWithOutcome;
use itertools::EitherOrBoth::{Both, Left, Right};
use itertools::Itertools;

use futures::{FutureExt, StreamExt};
use tokio_retry::{strategy::FixedInterval, Retry};

use near_indexer_primitives::IndexerTransactionWithOutcome;

use crate::metrics;
use crate::storage;

Expand Down Expand Up @@ -98,12 +101,57 @@ async fn save_outcomes_and_receipts(
Ok(())
}

/// Save receipts and outcomes to the DB
/// Split the receipts and outcomes into chunks of 1500 records and save them
/// It is necessary to split the records into chunks because Scylla has a limit per batch
/// We use 1500 records because we should be sure that the batch size is less than 50MB
#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))]
async fn save_receipts_and_outcomes_details(
tx_details_storage: &std::sync::Arc<crate::TxDetailsStorage>,
tx_collecting_storage: &std::sync::Arc<crate::storage::CacheStorage>,
receipts: Vec<readnode_primitives::ReceiptRecord>,
outcomes: Vec<readnode_primitives::OutcomeRecord>,
) {
let chunks: Vec<_> = receipts
.chunks(1500)
.zip_longest(outcomes.chunks(1500))
.collect();

let mut tasks = futures::stream::FuturesUnordered::new();

for pair in chunks {
let task = match pair {
Both(receipts, outcomes) => save_chunks_receipts_and_outcomes_details(
tx_details_storage,
tx_collecting_storage,
receipts.to_vec(),
outcomes.to_vec(),
),
Left(receipts) => save_chunks_receipts_and_outcomes_details(
tx_details_storage,
tx_collecting_storage,
receipts.to_vec(),
vec![],
),
Right(outcomes) => save_chunks_receipts_and_outcomes_details(
tx_details_storage,
tx_collecting_storage,
vec![],
outcomes.to_vec(),
),
};
tasks.push(task);
}

while tasks.next().await.is_some() {}
}

#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))]
async fn save_chunks_receipts_and_outcomes_details(
tx_details_storage: &std::sync::Arc<crate::TxDetailsStorage>,
tx_collecting_storage: &std::sync::Arc<crate::storage::CacheStorage>,
receipts: Vec<readnode_primitives::ReceiptRecord>,
outcomes: Vec<readnode_primitives::OutcomeRecord>,
) {
match save_outcome_and_receipt_to_scylla(tx_details_storage, receipts.clone(), outcomes.clone())
.await
Expand Down

0 comments on commit 10e15e5

Please sign in to comment.