Skip to content

Commit

Permalink
Merge pull request #3012 from matrix-org/andybalaam/batch-indexeddb-f…
Browse files Browse the repository at this point in the history
…etch-for-get_inbound_group_sessions

indexeddb: Attempt to fix export crashes by batching DB operations in get_inbound_group_sessions
  • Loading branch information
andybalaam authored Jan 18, 2024
2 parents 784c745 + b05a0ec commit ff38760
Showing 1 changed file with 104 additions and 8 deletions.
112 changes: 104 additions & 8 deletions crates/matrix-sdk-indexeddb/src/crypto_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use ruma::{
use tokio::sync::Mutex;
use tracing::{debug, warn};
use wasm_bindgen::JsValue;
use web_sys::IdbKeyRange;

use crate::crypto_store::{
indexeddb_serializer::IndexeddbSerializer, migrations::open_and_upgrade_db,
Expand Down Expand Up @@ -781,18 +782,22 @@ impl_crypto_store! {
}

async fn get_inbound_group_sessions(&self) -> Result<Vec<InboundGroupSession>> {
Ok(self
const INBOUND_GROUP_SESSIONS_BATCH_SIZE: usize = 1000;

let transaction = self
.inner
.transaction_on_one_with_mode(
keys::INBOUND_GROUP_SESSIONS_V2,
IdbTransactionMode::Readonly,
)?
.object_store(keys::INBOUND_GROUP_SESSIONS_V2)?
.get_all()?
.await?
.iter()
.filter_map(|v| self.deserialize_inbound_group_session(v).ok())
.collect())
)?;

let object_store = transaction.object_store(keys::INBOUND_GROUP_SESSIONS_V2)?;

fetch_from_object_store_batched(
object_store,
|value| self.deserialize_inbound_group_session(value),
INBOUND_GROUP_SESSIONS_BATCH_SIZE
).await
}

async fn inbound_group_session_counts(&self) -> Result<RoomKeyCounts> {
Expand Down Expand Up @@ -1189,6 +1194,97 @@ impl Drop for IndexeddbCryptoStore {
}
}

/// Fetch items from an object store in batches, transform each item using
/// the supplied function, and stuff the transformed items into a single
/// vector to return.
async fn fetch_from_object_store_batched<R, F>(
object_store: IdbObjectStore<'_>,
f: F,
batch_size: usize,
) -> Result<Vec<R>>
where
F: Fn(JsValue) -> Result<R>,
{
let mut result = Vec::new();
let mut batch_n = 0;

// The empty string is before all keys in Indexed DB - first batch starts there.
let mut latest_key: JsValue = "".into();

loop {
debug!("Fetching Indexed DB records starting from {}", batch_n * batch_size);

// See https://github.com/Alorel/rust-indexed-db/issues/31 - we
// would like to use `get_all_with_key_and_limit` if it ever exists
// but for now we use a cursor and manually limit batch size.

// Get hold of a cursor for this batch. (This should not panic in expect()
// because we always use "", or the result of cursor.key(), both of
// which are valid keys.)
let after_latest_key =
IdbKeyRange::lower_bound_with_open(&latest_key, true).expect("Key was not valid!");
let cursor = object_store.open_cursor_with_range(&after_latest_key)?.await?;

// Fetch batch_size records into result
let next_key = fetch_batch(cursor, batch_size, &f, &mut result).await?;
if let Some(next_key) = next_key {
latest_key = next_key;
} else {
break;
}

batch_n += 1;
}

Ok(result)
}

/// Fetch batch_size records from the supplied cursor,
/// and return the last key we processed, or None if
/// we reached the end of the cursor.
async fn fetch_batch<R, F, Q>(
cursor: Option<IdbCursorWithValue<'_, Q>>,
batch_size: usize,
f: &F,
result: &mut Vec<R>,
) -> Result<Option<JsValue>>
where
F: Fn(JsValue) -> Result<R>,
Q: IdbQuerySource,
{
let Some(cursor) = cursor else {
// Cursor was None - there are no more records
return Ok(None);
};

let mut latest_key = None;

for _ in 0..batch_size {
// Process the record
let processed = f(cursor.value());
if let Ok(processed) = processed {
result.push(processed);
}
// else processing failed: don't return this record at all

// Remember that we have processed this record, so if we hit
// the end of the batch, the next batch can start after this one
if let Some(key) = cursor.key() {
latest_key = Some(key);
}

// Move on to the next record
let more_records = cursor.continue_cursor()?.await?;
if !more_records {
return Ok(None);
}
}

// We finished the batch but there are more records -
// return the key of the last one we processed
Ok(latest_key)
}

/// The objects we store in the gossip_requests indexeddb object store
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct GossipRequestIndexedDbObject {
Expand Down

0 comments on commit ff38760

Please sign in to comment.