Skip to content

Notification events resolving and rendering in batches #4722

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ package io.element.android.libraries.matrix.api.notification

import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.matrix.api.core.ThreadId
import io.element.android.libraries.matrix.api.core.UserId
import io.element.android.libraries.matrix.api.room.RoomMembershipState
import io.element.android.libraries.matrix.api.timeline.item.event.MessageType

data class NotificationData(
val sessionId: SessionId,
val eventId: EventId,
val threadId: ThreadId?,
val roomId: RoomId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ package io.element.android.libraries.matrix.api.notification

import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.core.SessionId

interface NotificationService {
suspend fun getNotification(roomId: RoomId, eventId: EventId): Result<NotificationData?>
suspend fun getNotifications(sessionId: SessionId, ids: Map<RoomId, List<EventId>>): Result<Map<EventId, NotificationData?>>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first sight, it's strange that the return type is not
Result<Map<RoomId, List<NotificationData>>>

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was like that originally, but where it's immediately used later (here) having the notifications grouped by room id doesn't really help, we'd have to first search for the room, then use notifications.find { it.eventId == eventId }. Using having the id -> data mapping is more optimised.

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package io.element.android.libraries.matrix.impl.notification
import io.element.android.libraries.core.bool.orFalse
import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.matrix.api.core.UserId
import io.element.android.libraries.matrix.api.notification.NotificationContent
import io.element.android.libraries.matrix.api.notification.NotificationData
Expand All @@ -25,6 +26,7 @@ class NotificationMapper(
private val notificationContentMapper = NotificationContentMapper()

fun map(
sessionId: SessionId,
eventId: EventId,
roomId: RoomId,
notificationItem: NotificationItem
Expand All @@ -35,6 +37,7 @@ class NotificationMapper(
activeMembersCount = item.roomInfo.joinedMembersCount.toInt(),
)
NotificationData(
sessionId = sessionId,
eventId = eventId,
// FIXME once the `NotificationItem` in the SDK returns the thread id
threadId = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
import io.element.android.libraries.core.coroutine.CoroutineDispatchers
import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.matrix.api.notification.NotificationData
import io.element.android.libraries.matrix.api.notification.NotificationService
import io.element.android.services.toolbox.api.systemclock.SystemClock
import kotlinx.coroutines.withContext
import org.matrix.rustcomponents.sdk.NotificationClient
import org.matrix.rustcomponents.sdk.use
import org.matrix.rustcomponents.sdk.NotificationItemsRequest
import timber.log.Timber

class RustNotificationService(
private val notificationClient: NotificationClient,
Expand All @@ -24,14 +26,29 @@
) : NotificationService {
private val notificationMapper: NotificationMapper = NotificationMapper(clock)

override suspend fun getNotification(
roomId: RoomId,
eventId: EventId,
): Result<NotificationData?> = withContext(dispatchers.io) {
override suspend fun getNotifications(
sessionId: SessionId,
ids: Map<RoomId, List<EventId>>
): Result<Map<EventId, NotificationData?>> = withContext(dispatchers.io) {
runCatching {
val item = notificationClient.getNotification(roomId.value, eventId.value)
item?.use {
notificationMapper.map(eventId, roomId, it)
val requests = ids.map { (roomId, eventIds) ->
NotificationItemsRequest(
roomId = roomId.value,
eventIds = eventIds.map { it.value }
)
}
val items = notificationClient.getNotifications(requests)
buildMap {
val eventIds = requests.flatMap { it.eventIds }
for (eventId in eventIds) {
val item = items[eventId]
if (item != null) {
val roomId = RoomId(requests.find { it.eventIds.contains(eventId) }?.roomId!!)
put(EventId(eventId), notificationMapper.map(sessionId, EventId(eventId), roomId, item))
} else {
Timber.e("Could not retrieve event for notification with $eventId")

Check warning on line 49 in libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/notification/RustNotificationService.kt

View check run for this annotation

Codecov / codecov/patch

libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/notification/RustNotificationService.kt#L49

Added line #L49 was not covered by tests
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@

package io.element.android.libraries.matrix.impl.fixtures.fakes

import io.element.android.tests.testutils.simulateLongTask
import org.matrix.rustcomponents.sdk.NoPointer
import org.matrix.rustcomponents.sdk.NotificationClient
import org.matrix.rustcomponents.sdk.NotificationItem
import org.matrix.rustcomponents.sdk.NotificationItemsRequest

class FakeRustNotificationClient(
var notificationItemResult: NotificationItem? = null
var notificationItemResult: Map<String, NotificationItem> = emptyMap(),
) : NotificationClient(NoPointer) {
override suspend fun getNotification(roomId: String, eventId: String): NotificationItem? = simulateLongTask {
notificationItemResult
override suspend fun getNotifications(requests: List<NotificationItemsRequest>): Map<String, NotificationItem> {
return notificationItemResult
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import io.element.android.libraries.matrix.impl.fixtures.fakes.FakeRustNotificat
import io.element.android.libraries.matrix.test.AN_EVENT_ID
import io.element.android.libraries.matrix.test.A_MESSAGE
import io.element.android.libraries.matrix.test.A_ROOM_ID
import io.element.android.libraries.matrix.test.A_SESSION_ID
import io.element.android.libraries.matrix.test.A_USER_ID_2
import io.element.android.services.toolbox.api.systemclock.SystemClock
import io.element.android.services.toolbox.test.systemclock.FakeSystemClock
Expand All @@ -28,12 +29,12 @@ class RustNotificationServiceTest {
@Test
fun test() = runTest {
val notificationClient = FakeRustNotificationClient(
notificationItemResult = aRustNotificationItem(),
notificationItemResult = mapOf(AN_EVENT_ID.value to aRustNotificationItem()),
)
val sut = createRustNotificationService(
notificationClient = notificationClient,
)
val result = sut.getNotification(A_ROOM_ID, AN_EVENT_ID).getOrThrow()!!
val result = sut.getNotifications(A_SESSION_ID, mapOf(A_ROOM_ID to listOf(AN_EVENT_ID))).getOrThrow()[AN_EVENT_ID]!!
assertThat(result.isEncrypted).isTrue()
assertThat(result.content).isEqualTo(
NotificationContent.MessageLike.RoomMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,18 @@ package io.element.android.libraries.matrix.test.notification

import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.matrix.api.notification.NotificationData
import io.element.android.libraries.matrix.api.notification.NotificationService

class FakeNotificationService : NotificationService {
private var getNotificationResult: Result<NotificationData?> = Result.success(null)
private var getNotificationsResult: Result<Map<EventId, NotificationData?>> = Result.success(emptyMap())

fun givenGetNotificationResult(result: Result<NotificationData?>) {
getNotificationResult = result
fun givenGetNotificationsResult(result: Result<Map<EventId, NotificationData?>>) {
getNotificationsResult = result
}

override suspend fun getNotification(
roomId: RoomId,
eventId: EventId,
): Result<NotificationData?> {
return getNotificationResult
override suspend fun getNotifications(sessionId: SessionId, ids: Map<RoomId, List<EventId>>): Result<Map<EventId, NotificationData?>> {
return getNotificationsResult
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import io.element.android.libraries.matrix.api.notification.NotificationData
import io.element.android.libraries.matrix.test.AN_EVENT_ID
import io.element.android.libraries.matrix.test.A_ROOM_ID
import io.element.android.libraries.matrix.test.A_ROOM_NAME
import io.element.android.libraries.matrix.test.A_SESSION_ID
import io.element.android.libraries.matrix.test.A_TIMESTAMP
import io.element.android.libraries.matrix.test.A_USER_NAME_2

Expand All @@ -27,6 +28,7 @@ fun aNotificationData(
roomDisplayName: String? = A_ROOM_NAME
): NotificationData {
return NotificationData(
sessionId = A_SESSION_ID,
eventId = AN_EVENT_ID,
threadId = threadId,
roomId = A_ROOM_ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import android.content.Context
import android.net.Uri
import androidx.core.content.FileProvider
import com.squareup.anvil.annotations.ContributesBinding
import io.element.android.libraries.core.extensions.flatMap
import io.element.android.libraries.core.log.logger.LoggerTag
import io.element.android.libraries.di.AppScope
import io.element.android.libraries.di.ApplicationContext
import io.element.android.libraries.di.SingleIn
import io.element.android.libraries.matrix.api.MatrixClient
import io.element.android.libraries.matrix.api.MatrixClientProvider
import io.element.android.libraries.matrix.api.core.EventId
Expand Down Expand Up @@ -61,10 +61,14 @@ private val loggerTag = LoggerTag("DefaultNotifiableEventResolver", LoggerTag.No
* this pattern allow decoupling between the object responsible of displaying notifications and the matrix sdk.
*/
interface NotifiableEventResolver {
suspend fun resolveEvent(sessionId: SessionId, roomId: RoomId, eventId: EventId): Result<ResolvedPushEvent>
suspend fun resolveEvents(
sessionId: SessionId,
notificationEventRequests: List<NotificationEventRequest>
): Result<Map<EventId, ResolvedPushEvent?>>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same type of remark, why don't we have a return type like:
Map<RoomId, List<ResolvedPushEvent>>?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it might actually make sense to do that for the pushHistoryService.onSuccess and failure callbacks, searching might be a bit faster even if later we need to either iterate by room then event id or just flatten everything.

Maybe it would even make sense to use the NotificationEventRequest as the key itself? It has the session id, room and event ids to make fetching a single operation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

@ContributesBinding(AppScope::class)
@SingleIn(AppScope::class)
class DefaultNotifiableEventResolver @Inject constructor(
private val stringProvider: StringProvider,
private val clock: SystemClock,
Expand All @@ -75,27 +79,21 @@ class DefaultNotifiableEventResolver @Inject constructor(
private val callNotificationEventResolver: CallNotificationEventResolver,
private val appPreferencesStore: AppPreferencesStore,
) : NotifiableEventResolver {
override suspend fun resolveEvent(sessionId: SessionId, roomId: RoomId, eventId: EventId): Result<ResolvedPushEvent> {
// Restore session
val client = matrixClientProvider.getOrRestore(sessionId).getOrNull() ?: return Result.failure(
ResolvingException("Unable to restore session for $sessionId")
)
val notificationService = client.notificationService()
val notificationData = notificationService.getNotification(
roomId = roomId,
eventId = eventId,
).onFailure {
Timber.tag(loggerTag.value).e(it, "Unable to resolve event: $eventId.")
override suspend fun resolveEvents(
sessionId: SessionId,
notificationEventRequests: List<NotificationEventRequest>
): Result<Map<EventId, ResolvedPushEvent?>> {
Timber.d("Queueing notifications: $notificationEventRequests")
val client = matrixClientProvider.getOrRestore(sessionId).getOrElse {
return Result.failure(IllegalStateException("Couldn't get or restore client for session $sessionId"))
}
val ids = notificationEventRequests.groupBy { it.roomId }.mapValues { (_, value) -> value.map { it.eventId } }
val notifications = client.notificationService().getNotifications(sessionId, ids)

// TODO this notificationData is not always valid at the moment, sometimes the Rust SDK can't fetch the matching event
return notificationData.flatMap {
if (it == null) {
Timber.tag(loggerTag.value).d("No notification data found for event $eventId")
return@flatMap Result.failure(ResolvingException("Unable to resolve event $eventId"))
} else {
Timber.tag(loggerTag.value).d("Found notification item for $eventId")
it.asNotifiableEvent(client, sessionId)
return notifications.mapCatching { map ->
map.mapValues { (_, notificationData) ->
notificationData?.asNotifiableEvent(client, sessionId)?.getOrNull()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@
renderEvents(listOf(notifiableEvent))
}

suspend fun onNotifiableEventsReceived(notifiableEvents: List<NotifiableEvent>) {
val eventsToNotify = notifiableEvents.filter { !it.shouldIgnoreEventInRoom(appNavigationStateService.appNavigationState.value) }
renderEvents(eventsToNotify)

Check warning on line 118 in libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/notifications/DefaultNotificationDrawerManager.kt

View check run for this annotation

Codecov / codecov/patch

libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/notifications/DefaultNotificationDrawerManager.kt#L117-L118

Added lines #L117 - L118 were not covered by tests
}

/**
* Clear all known message events for a [sessionId].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ class DefaultOnMissedCallNotificationHandler @Inject constructor(
// Resolve the event and add a notification for it, at this point it should no longer be a ringing one
val notificationData = matrixClientProvider.getOrRestore(sessionId).getOrNull()
?.notificationService()
?.getNotification(roomId, eventId)
?.getNotifications(sessionId, mapOf(roomId to listOf(eventId)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks weird to have to provide a sessionId here, since we are using a method from a service of a matrix client.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but RustNotificationService doesn't have a reference to the matrix client. Maybe I should just add a SessionId as a constructor param for it. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I should just add a SessionId as a constructor param for it. WDYT?

Yes please. We're doing it for RustRoomFactory already for instance.

Copy link
Member Author

@jmartinesp jmartinesp May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RustRoomFactory doesn't use injection though. Adding it here would mean creating a factory abstraction.

Never mind, I looked at the wrong component. I have a severe case of the Mondays, it seems.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?.getOrNull()
?.get(eventId)
?: return

val notifiableEvent = callNotificationEventResolver.resolveEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@
roomIsDm = room.isDm(),
outGoingMessage = true,
)
onNotifiableEventReceived.onNotifiableEventReceived(notifiableMessageEvent)
onNotifiableEventReceived.onNotifiableEventsReceived(listOf(notifiableMessageEvent))

if (threadId != null && replyToEventId != null) {
room.liveTimeline.replyMessage(
Expand All @@ -177,9 +177,11 @@
)
}.onFailure {
Timber.e(it, "Failed to send smart reply message")
onNotifiableEventReceived.onNotifiableEventReceived(
notifiableMessageEvent.copy(
outGoingMessageFailed = true
onNotifiableEventReceived.onNotifiableEventsReceived(
listOf(
notifiableMessageEvent.copy(
outGoingMessageFailed = true

Check warning on line 183 in libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/notifications/NotificationBroadcastReceiverHandler.kt

View check run for this annotation

Codecov / codecov/patch

libraries/push/impl/src/main/kotlin/io/element/android/libraries/push/impl/notifications/NotificationBroadcastReceiverHandler.kt#L180-L183

Added lines #L180 - L183 were not covered by tests
)
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class DefaultNotificationDisplayer @Inject constructor(
return false
}
notificationManager.notify(tag, id, notification)
Timber.d("Notifying with tag: $tag, id: $id")
return true
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2025 New Vector Ltd.
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
* Please see LICENSE files in the repository root for full details.
*/

package io.element.android.libraries.push.impl.notifications

import io.element.android.libraries.di.AppScope
import io.element.android.libraries.di.SingleIn
import io.element.android.libraries.matrix.api.core.EventId
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.push.impl.notifications.model.ResolvedPushEvent
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import timber.log.Timber
import javax.inject.Inject
import kotlin.time.Duration.Companion.milliseconds

@OptIn(ExperimentalCoroutinesApi::class)
@SingleIn(AppScope::class)
class NotificationResolverQueue @Inject constructor(
private val notifiableEventResolver: NotifiableEventResolver,
private val coroutineScope: CoroutineScope,
) {
companion object {
private const val BATCH_WINDOW_MS = 250L
}
private val requestQueue = Channel<NotificationEventRequest>(capacity = 100)

val results: SharedFlow<Pair<List<NotificationEventRequest>, List<ResolvedPushEvent>>> = MutableSharedFlow()

init {
coroutineScope.launch {
while (coroutineScope.isActive) {
// Wait for a batch of requests to be received in a specified time window
delay(BATCH_WINDOW_MS.milliseconds)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure this code will behave as a first throttler?
If we get a Push just before the delay is ended, it will trigger the request without waiting for any other potential push. I have not checked in practice how much time we have between mutliple pending push received from Firebase though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't have to act as a throttle operation, but as a window one, grouping events that happen in an interval in batches. Those batches won't always be optimal, since as you say, if you receive one after one window closes it'll be processed on the next one.

If you think it's worth it implementing it more in a throttle/debounce way (without discarding items as those operators usually do) I can try to do that. There is the possibility that this means we chain lots of 'received item, wait for X ms' operations and end up delaying the processing for longer, but it's not really likely to happen in the real world except maybe for when you have several pushes in firebase that couldn't be delivered, the device reconnects and it starts receiving the pending pushes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in bd27b50, it seems to work quite well.


val groupedRequestsById = buildList {
while (!requestQueue.isEmpty) {
requestQueue.receiveCatching().getOrNull()?.let(this::add)
}
}.groupBy { it.sessionId }

val sessionIds = groupedRequestsById.keys
for (sessionId in sessionIds) {
val requests = groupedRequestsById[sessionId].orEmpty()
Timber.d("Fetching notifications for $sessionId: $requests. Pending requests: ${!requestQueue.isEmpty}")

launch {
// No need for a Mutex since the SDK already has one internally
val notifications = notifiableEventResolver.resolveEvents(sessionId, requests).getOrNull().orEmpty()
(results as MutableSharedFlow).emit(requests to notifications.values.filterNotNull())
}
}
}
}
}

suspend fun enqueue(request: NotificationEventRequest) {
requestQueue.send(request)
}
}

data class NotificationEventRequest(
val sessionId: SessionId,
val roomId: RoomId,
val eventId: EventId,
val providerInfo: String,
)
Loading
Loading