From b05a0ecb3b7ccec5fef2403ae2b8195cabad01b1 Mon Sep 17 00:00:00 2001 From: Andy Balaam Date: Fri, 12 Jan 2024 14:45:05 +0000 Subject: [PATCH] indexeddb: Batch DB access in get_inbound_group_sessions Signed-off-by: Andy Balaam --- .../src/crypto_store/mod.rs | 112 ++++++++++++++++-- 1 file changed, 104 insertions(+), 8 deletions(-) diff --git a/crates/matrix-sdk-indexeddb/src/crypto_store/mod.rs b/crates/matrix-sdk-indexeddb/src/crypto_store/mod.rs index 3f895e32eea..6092a568f94 100644 --- a/crates/matrix-sdk-indexeddb/src/crypto_store/mod.rs +++ b/crates/matrix-sdk-indexeddb/src/crypto_store/mod.rs @@ -41,6 +41,7 @@ use ruma::{ use tokio::sync::Mutex; use tracing::{debug, warn}; use wasm_bindgen::JsValue; +use web_sys::IdbKeyRange; use crate::crypto_store::{ indexeddb_serializer::IndexeddbSerializer, migrations::open_and_upgrade_db, @@ -781,18 +782,22 @@ impl_crypto_store! { } async fn get_inbound_group_sessions(&self) -> Result> { - Ok(self + const INBOUND_GROUP_SESSIONS_BATCH_SIZE: usize = 1000; + + let transaction = self .inner .transaction_on_one_with_mode( keys::INBOUND_GROUP_SESSIONS_V2, IdbTransactionMode::Readonly, - )? - .object_store(keys::INBOUND_GROUP_SESSIONS_V2)? - .get_all()? - .await? - .iter() - .filter_map(|v| self.deserialize_inbound_group_session(v).ok()) - .collect()) + )?; + + let object_store = transaction.object_store(keys::INBOUND_GROUP_SESSIONS_V2)?; + + fetch_from_object_store_batched( + object_store, + |value| self.deserialize_inbound_group_session(value), + INBOUND_GROUP_SESSIONS_BATCH_SIZE + ).await } async fn inbound_group_session_counts(&self) -> Result { @@ -1189,6 +1194,97 @@ impl Drop for IndexeddbCryptoStore { } } +/// Fetch items from an object store in batches, transform each item using +/// the supplied function, and stuff the transformed items into a single +/// vector to return. +async fn fetch_from_object_store_batched( + object_store: IdbObjectStore<'_>, + f: F, + batch_size: usize, +) -> Result> +where + F: Fn(JsValue) -> Result, +{ + let mut result = Vec::new(); + let mut batch_n = 0; + + // The empty string is before all keys in Indexed DB - first batch starts there. + let mut latest_key: JsValue = "".into(); + + loop { + debug!("Fetching Indexed DB records starting from {}", batch_n * batch_size); + + // See https://github.com/Alorel/rust-indexed-db/issues/31 - we + // would like to use `get_all_with_key_and_limit` if it ever exists + // but for now we use a cursor and manually limit batch size. + + // Get hold of a cursor for this batch. (This should not panic in expect() + // because we always use "", or the result of cursor.key(), both of + // which are valid keys.) + let after_latest_key = + IdbKeyRange::lower_bound_with_open(&latest_key, true).expect("Key was not valid!"); + let cursor = object_store.open_cursor_with_range(&after_latest_key)?.await?; + + // Fetch batch_size records into result + let next_key = fetch_batch(cursor, batch_size, &f, &mut result).await?; + if let Some(next_key) = next_key { + latest_key = next_key; + } else { + break; + } + + batch_n += 1; + } + + Ok(result) +} + +/// Fetch batch_size records from the supplied cursor, +/// and return the last key we processed, or None if +/// we reached the end of the cursor. +async fn fetch_batch( + cursor: Option>, + batch_size: usize, + f: &F, + result: &mut Vec, +) -> Result> +where + F: Fn(JsValue) -> Result, + Q: IdbQuerySource, +{ + let Some(cursor) = cursor else { + // Cursor was None - there are no more records + return Ok(None); + }; + + let mut latest_key = None; + + for _ in 0..batch_size { + // Process the record + let processed = f(cursor.value()); + if let Ok(processed) = processed { + result.push(processed); + } + // else processing failed: don't return this record at all + + // Remember that we have processed this record, so if we hit + // the end of the batch, the next batch can start after this one + if let Some(key) = cursor.key() { + latest_key = Some(key); + } + + // Move on to the next record + let more_records = cursor.continue_cursor()?.await?; + if !more_records { + return Ok(None); + } + } + + // We finished the batch but there are more records - + // return the key of the last one we processed + Ok(latest_key) +} + /// The objects we store in the gossip_requests indexeddb object store #[derive(Debug, serde::Serialize, serde::Deserialize)] struct GossipRequestIndexedDbObject {