Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexeddb: Attempt to fix export crashes by batching DB operations in get_inbound_group_sessions #3012

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 104 additions & 8 deletions crates/matrix-sdk-indexeddb/src/crypto_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use ruma::{
use tokio::sync::Mutex;
use tracing::{debug, warn};
use wasm_bindgen::JsValue;
use web_sys::IdbKeyRange;

use crate::crypto_store::{
indexeddb_serializer::IndexeddbSerializer, migrations::open_and_upgrade_db,
Expand Down Expand Up @@ -781,18 +782,22 @@ impl_crypto_store! {
}

async fn get_inbound_group_sessions(&self) -> Result<Vec<InboundGroupSession>> {
Ok(self
const INBOUND_GROUP_SESSIONS_BATCH_SIZE: usize = 1000;

let transaction = self
.inner
.transaction_on_one_with_mode(
keys::INBOUND_GROUP_SESSIONS_V2,
IdbTransactionMode::Readonly,
)?
.object_store(keys::INBOUND_GROUP_SESSIONS_V2)?
.get_all()?
.await?
.iter()
.filter_map(|v| self.deserialize_inbound_group_session(v).ok())
.collect())
)?;

let object_store = transaction.object_store(keys::INBOUND_GROUP_SESSIONS_V2)?;

fetch_from_object_store_batched(
object_store,
|value| self.deserialize_inbound_group_session(value),
INBOUND_GROUP_SESSIONS_BATCH_SIZE
).await
}

async fn inbound_group_session_counts(&self) -> Result<RoomKeyCounts> {
Expand Down Expand Up @@ -1189,6 +1194,97 @@ impl Drop for IndexeddbCryptoStore {
}
}

/// Fetch items from an object store in batches, transform each item using
/// the supplied function, and stuff the transformed items into a single
/// vector to return.
async fn fetch_from_object_store_batched<R, F>(
object_store: IdbObjectStore<'_>,
f: F,
batch_size: usize,
) -> Result<Vec<R>>
where
F: Fn(JsValue) -> Result<R>,
{
let mut result = Vec::new();
let mut batch_n = 0;

// The empty string is before all keys in Indexed DB - first batch starts there.
let mut latest_key: JsValue = "".into();

loop {
debug!("Fetching Indexed DB records starting from {}", batch_n * batch_size);

// See https://github.com/Alorel/rust-indexed-db/issues/31 - we
// would like to use `get_all_with_key_and_limit` if it ever exists
// but for now we use a cursor and manually limit batch size.

// Get hold of a cursor for this batch. (This should not panic in expect()
// because we always use "", or the result of cursor.key(), both of
// which are valid keys.)
let after_latest_key =
IdbKeyRange::lower_bound_with_open(&latest_key, true).expect("Key was not valid!");
let cursor = object_store.open_cursor_with_range(&after_latest_key)?.await?;

// Fetch batch_size records into result
let next_key = fetch_batch(cursor, batch_size, &f, &mut result).await?;
if let Some(next_key) = next_key {
latest_key = next_key;
} else {
break;
}

batch_n += 1;
}

Ok(result)
}

/// Fetch batch_size records from the supplied cursor,
/// and return the last key we processed, or None if
/// we reached the end of the cursor.
async fn fetch_batch<R, F, Q>(
cursor: Option<IdbCursorWithValue<'_, Q>>,
batch_size: usize,
f: &F,
result: &mut Vec<R>,
) -> Result<Option<JsValue>>
where
F: Fn(JsValue) -> Result<R>,
Q: IdbQuerySource,
{
let Some(cursor) = cursor else {
// Cursor was None - there are no more records
return Ok(None);
};

let mut latest_key = None;

for _ in 0..batch_size {
// Process the record
let processed = f(cursor.value());
if let Ok(processed) = processed {
result.push(processed);
}
// else processing failed: don't return this record at all

// Remember that we have processed this record, so if we hit
// the end of the batch, the next batch can start after this one
if let Some(key) = cursor.key() {
latest_key = Some(key);
}

// Move on to the next record
let more_records = cursor.continue_cursor()?.await?;
if !more_records {
return Ok(None);
}
}

// We finished the batch but there are more records -
// return the key of the last one we processed
Ok(latest_key)
}

/// The objects we store in the gossip_requests indexeddb object store
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct GossipRequestIndexedDbObject {
Expand Down