Skip to content

Commit

Permalink
fix(event cache): wait for the initial previous-batch token, if there…
Browse files Browse the repository at this point in the history
… wasn't any
  • Loading branch information
bnjbvr committed Feb 26, 2025
1 parent 19df945 commit 14dc0ed
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 4 deletions.
25 changes: 21 additions & 4 deletions crates/matrix-sdk/src/event_cache/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -849,9 +849,16 @@ mod private {
new_first_chunk
}
Ok(None) => {
// No previous chunk: no events to insert. Better, it means we've reached
// the start of the timeline!
return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
// No previous chunk: no events to insert. This means one of two things:
// - either the linked chunk is at the start of the timeline,
// - or we haven't received any back-pagination token yet, and we should
// wait for one.
if self.waited_for_initial_prev_token {
return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
}
// If we haven't waited yet, we request to resolve the gap, once we get the
// previous-batch token from sync.
return Ok(LoadMoreEventsBackwardsOutcome::Gap);
}
Err(err) => {
error!("error when loading the previous chunk of a linked chunk: {err}");
Expand Down Expand Up @@ -1070,7 +1077,6 @@ mod private {
#[instrument(skip_all)]
async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
let updates = self.events.store_updates().take();

self.send_updates_to_store(updates).await
}

Expand Down Expand Up @@ -1182,8 +1188,19 @@ mod private {
func: F,
) -> Result<(O, Vec<VectorDiff<TimelineEvent>>), EventCacheError> {
let output = func(&mut self.events);

self.propagate_changes().await?;

// If we've never waited for an initial previous-batch token, and we now have at
// least one gap in the chunk, no need to wait for a previous-batch token later.
if !self.waited_for_initial_prev_token
&& self.events.chunks().any(|chunk| chunk.is_gap())
{
self.waited_for_initial_prev_token = true;
}

let updates_as_vector_diffs = self.events.updates_as_vector_diffs();

Ok((output, updates_as_vector_diffs))
}
}
Expand Down
64 changes: 64 additions & 0 deletions crates/matrix-sdk/tests/integration/event_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,70 @@ async fn test_limited_timeline_resets_pagination() {
assert!(room_stream.is_empty());
}

#[async_test]
async fn test_persistent_storage_waits_for_pagination_token() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;

let event_cache = client.event_cache();

// Immediately subscribe the event cache to sync updates.
event_cache.subscribe().unwrap();
// TODO: remove this test when persistent storage is enabled by default, as it's
// doing the same as the one above.
event_cache.enable_storage().unwrap();

// If I sync and get informed I've joined The Room, without a previous batch
// token,
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c"));
let room = server.sync_joined_room(&client, room_id).await;

let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();

let (events, mut room_stream) = room_event_cache.subscribe().await;

assert!(events.is_empty());
assert!(room_stream.is_empty());

server
.mock_room_messages()
.ok(RoomMessagesResponseTemplate::default()
.events(vec![f.text_msg("hi").event_id(event_id!("$2")).into_raw_timeline()]))
.mock_once()
.mount()
.await;

// At the beginning, the paginator is in the initial state.
let pagination = room_event_cache.pagination();
let mut pagination_status = pagination.status();
assert_eq!(pagination_status.get(), PaginatorState::Initial);

// If we try to back-paginate with a token, it will hit the end of the timeline
// and give us the resulting event.
let BackPaginationOutcome { events, reached_start } =
pagination.run_backwards_once(20).await.unwrap();

assert_eq!(events.len(), 1);
assert!(reached_start);

assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv()
);
assert_eq!(diffs.len(), 1);
assert_matches!(&diffs[0], VectorDiff::Append { values: events } => {
assert_eq!(events.len(), 1);
assert_event_matches_msg(&events[0], "hi");
});

// And the paginator state delivers this as an update, and is internally
// consistent with it:
assert_next_matches_with_timeout!(pagination_status, PaginatorState::Idle);
assert!(pagination.hit_timeline_start());

assert!(room_stream.is_empty());
}

#[async_test]
async fn test_limited_timeline_with_storage() {
let server = MatrixMockServer::new().await;
Expand Down

0 comments on commit 14dc0ed

Please sign in to comment.