-
Notifications
You must be signed in to change notification settings - Fork 222
Notification events resolving and rendering in batches #4722
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from 6 commits
e2724c7
8778a1b
77d63f9
5167305
ddc046c
735a88e
6b517e1
e5735c4
ad25233
d9bb5b7
b5ece84
bd27b50
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,10 +11,10 @@ import android.content.Context | |
import android.net.Uri | ||
import androidx.core.content.FileProvider | ||
import com.squareup.anvil.annotations.ContributesBinding | ||
import io.element.android.libraries.core.extensions.flatMap | ||
import io.element.android.libraries.core.log.logger.LoggerTag | ||
import io.element.android.libraries.di.AppScope | ||
import io.element.android.libraries.di.ApplicationContext | ||
import io.element.android.libraries.di.SingleIn | ||
import io.element.android.libraries.matrix.api.MatrixClient | ||
import io.element.android.libraries.matrix.api.MatrixClientProvider | ||
import io.element.android.libraries.matrix.api.core.EventId | ||
|
@@ -61,10 +61,14 @@ private val loggerTag = LoggerTag("DefaultNotifiableEventResolver", LoggerTag.No | |
* this pattern allow decoupling between the object responsible of displaying notifications and the matrix sdk. | ||
*/ | ||
interface NotifiableEventResolver { | ||
suspend fun resolveEvent(sessionId: SessionId, roomId: RoomId, eventId: EventId): Result<ResolvedPushEvent> | ||
suspend fun resolveEvents( | ||
sessionId: SessionId, | ||
notificationEventRequests: List<NotificationEventRequest> | ||
): Result<Map<EventId, ResolvedPushEvent?>> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same type of remark, why don't we have a return type like: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here it might actually make sense to do that for the Maybe it would even make sense to use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
} | ||
|
||
@ContributesBinding(AppScope::class) | ||
@SingleIn(AppScope::class) | ||
class DefaultNotifiableEventResolver @Inject constructor( | ||
private val stringProvider: StringProvider, | ||
private val clock: SystemClock, | ||
|
@@ -75,27 +79,21 @@ class DefaultNotifiableEventResolver @Inject constructor( | |
private val callNotificationEventResolver: CallNotificationEventResolver, | ||
private val appPreferencesStore: AppPreferencesStore, | ||
) : NotifiableEventResolver { | ||
override suspend fun resolveEvent(sessionId: SessionId, roomId: RoomId, eventId: EventId): Result<ResolvedPushEvent> { | ||
// Restore session | ||
val client = matrixClientProvider.getOrRestore(sessionId).getOrNull() ?: return Result.failure( | ||
ResolvingException("Unable to restore session for $sessionId") | ||
) | ||
val notificationService = client.notificationService() | ||
val notificationData = notificationService.getNotification( | ||
roomId = roomId, | ||
eventId = eventId, | ||
).onFailure { | ||
Timber.tag(loggerTag.value).e(it, "Unable to resolve event: $eventId.") | ||
override suspend fun resolveEvents( | ||
sessionId: SessionId, | ||
notificationEventRequests: List<NotificationEventRequest> | ||
): Result<Map<EventId, ResolvedPushEvent?>> { | ||
Timber.d("Queueing notifications: $notificationEventRequests") | ||
val client = matrixClientProvider.getOrRestore(sessionId).getOrElse { | ||
return Result.failure(IllegalStateException("Couldn't get or restore client for session $sessionId")) | ||
} | ||
val ids = notificationEventRequests.groupBy { it.roomId }.mapValues { (_, value) -> value.map { it.eventId } } | ||
val notifications = client.notificationService().getNotifications(sessionId, ids) | ||
|
||
// TODO this notificationData is not always valid at the moment, sometimes the Rust SDK can't fetch the matching event | ||
return notificationData.flatMap { | ||
if (it == null) { | ||
Timber.tag(loggerTag.value).d("No notification data found for event $eventId") | ||
return@flatMap Result.failure(ResolvingException("Unable to resolve event $eventId")) | ||
} else { | ||
Timber.tag(loggerTag.value).d("Found notification item for $eventId") | ||
it.asNotifiableEvent(client, sessionId) | ||
return notifications.mapCatching { map -> | ||
map.mapValues { (_, notificationData) -> | ||
notificationData?.asNotifiableEvent(client, sessionId)?.getOrNull() | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,8 +30,9 @@ class DefaultOnMissedCallNotificationHandler @Inject constructor( | |
// Resolve the event and add a notification for it, at this point it should no longer be a ringing one | ||
val notificationData = matrixClientProvider.getOrRestore(sessionId).getOrNull() | ||
?.notificationService() | ||
?.getNotification(roomId, eventId) | ||
?.getNotifications(sessionId, mapOf(roomId to listOf(eventId))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks weird to have to provide a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True, but There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes please. We're doing it for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Never mind, I looked at the wrong component. I have a severe case of the Mondays, it seems. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
?.getOrNull() | ||
?.get(eventId) | ||
?: return | ||
|
||
val notifiableEvent = callNotificationEventResolver.resolveEvent( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
/* | ||
* Copyright 2025 New Vector Ltd. | ||
* | ||
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial | ||
* Please see LICENSE files in the repository root for full details. | ||
*/ | ||
|
||
package io.element.android.libraries.push.impl.notifications | ||
|
||
import io.element.android.libraries.di.AppScope | ||
import io.element.android.libraries.di.SingleIn | ||
import io.element.android.libraries.matrix.api.core.EventId | ||
import io.element.android.libraries.matrix.api.core.RoomId | ||
import io.element.android.libraries.matrix.api.core.SessionId | ||
import io.element.android.libraries.push.impl.notifications.model.ResolvedPushEvent | ||
import kotlinx.coroutines.CoroutineScope | ||
import kotlinx.coroutines.ExperimentalCoroutinesApi | ||
import kotlinx.coroutines.channels.Channel | ||
import kotlinx.coroutines.delay | ||
import kotlinx.coroutines.flow.MutableSharedFlow | ||
import kotlinx.coroutines.flow.SharedFlow | ||
import kotlinx.coroutines.isActive | ||
import kotlinx.coroutines.launch | ||
import timber.log.Timber | ||
import javax.inject.Inject | ||
import kotlin.time.Duration.Companion.milliseconds | ||
|
||
@OptIn(ExperimentalCoroutinesApi::class) | ||
@SingleIn(AppScope::class) | ||
class NotificationResolverQueue @Inject constructor( | ||
private val notifiableEventResolver: NotifiableEventResolver, | ||
private val coroutineScope: CoroutineScope, | ||
) { | ||
companion object { | ||
private const val BATCH_WINDOW_MS = 250L | ||
} | ||
private val requestQueue = Channel<NotificationEventRequest>(capacity = 100) | ||
|
||
val results: SharedFlow<Pair<List<NotificationEventRequest>, List<ResolvedPushEvent>>> = MutableSharedFlow() | ||
|
||
init { | ||
coroutineScope.launch { | ||
while (coroutineScope.isActive) { | ||
// Wait for a batch of requests to be received in a specified time window | ||
delay(BATCH_WINDOW_MS.milliseconds) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you sure this code will behave as a first throttler? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't have to act as a throttle operation, but as a window one, grouping events that happen in an interval in batches. Those batches won't always be optimal, since as you say, if you receive one after one window closes it'll be processed on the next one. If you think it's worth it implementing it more in a throttle/debounce way (without discarding items as those operators usually do) I can try to do that. There is the possibility that this means we chain lots of 'received item, wait for X ms' operations and end up delaying the processing for longer, but it's not really likely to happen in the real world except maybe for when you have several pushes in firebase that couldn't be delivered, the device reconnects and it starts receiving the pending pushes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in bd27b50, it seems to work quite well. |
||
|
||
val groupedRequestsById = buildList { | ||
while (!requestQueue.isEmpty) { | ||
requestQueue.receiveCatching().getOrNull()?.let(this::add) | ||
} | ||
}.groupBy { it.sessionId } | ||
|
||
val sessionIds = groupedRequestsById.keys | ||
for (sessionId in sessionIds) { | ||
val requests = groupedRequestsById[sessionId].orEmpty() | ||
Timber.d("Fetching notifications for $sessionId: $requests. Pending requests: ${!requestQueue.isEmpty}") | ||
|
||
launch { | ||
// No need for a Mutex since the SDK already has one internally | ||
val notifications = notifiableEventResolver.resolveEvents(sessionId, requests).getOrNull().orEmpty() | ||
(results as MutableSharedFlow).emit(requests to notifications.values.filterNotNull()) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
suspend fun enqueue(request: NotificationEventRequest) { | ||
requestQueue.send(request) | ||
} | ||
} | ||
|
||
data class NotificationEventRequest( | ||
val sessionId: SessionId, | ||
val roomId: RoomId, | ||
val eventId: EventId, | ||
val providerInfo: String, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At first sight, it's strange that the return type is not
Result<Map<RoomId, List<NotificationData>>>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was like that originally, but where it's immediately used later (here) having the notifications grouped by room id doesn't really help, we'd have to first search for the room, then use
notifications.find { it.eventId == eventId }
. Using having the id -> data mapping is more optimised.