Skip to content

Commit 23e1afc

Browse files
authored
fixes #1557 web_connector: fix dropped messages after network failure (#1808)
* fixes #1557 web_connector: fix dropped messages after network failure
1 parent c4d5d0c commit 23e1afc

File tree

6 files changed

+50
-13
lines changed

6 files changed

+50
-13
lines changed

bot/connector-web/README.md

+11
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,17 @@ and attempts to send them if and when the SSE connection is re-established.
156156
The `tock_web_sse_keepalive_delay` optional property can be used to configure the number of seconds between
157157
two SSE pings (default: 10).
158158

159+
The `tock_web_sse_messages_ttl_days` optional property can be used to configure the number of days after which
160+
enqueued messages get deleted. Note: this expiration only works on MongoDB v7.1.0 and up. Set to a
161+
negative value to disable (default: -1).
162+
163+
The `tock_web_sse_message_queue_max_count` optional property can be used to configure the maximum number of enqueued
164+
messages in the database. When a new message is enqueued past this limit, the oldest message gets deleted (default: 50000).
165+
166+
The `tock_web_sse_message_queue_max_size_kb` optional property can be used to configure the maximum size in kilobytes
167+
of the message queue in the database. When a new message is enqueued past this limit, the oldest messages get deleted
168+
(default: `2 * tock_web_sse_message_queue_max_count`).
169+
159170
#### Push messages
160171

161172
When SSE is enabled, the web connector allows sending push messages through the

bot/connector-web/src/main/kotlin/channel/ChannelCallback.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,6 @@
1616
package ai.tock.bot.connector.web.channel
1717

1818
import ai.tock.bot.connector.web.WebConnectorResponse
19+
import io.vertx.core.Future
1920

20-
internal typealias ChannelCallback = (webConnectorResponse: WebConnectorResponse) -> Unit
21+
internal typealias ChannelCallback = (webConnectorResponse: WebConnectorResponse) -> Future<Void>

bot/connector-web/src/main/kotlin/channel/ChannelEvent.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package ai.tock.bot.connector.web.channel
1717

1818
import ai.tock.bot.connector.web.WebConnectorResponse
1919
import com.fasterxml.jackson.annotation.JsonValue
20+
import io.vertx.core.Future
2021
import java.time.Instant
2122
import org.litote.kmongo.Id
2223
import org.litote.kmongo.newId
@@ -42,6 +43,6 @@ internal data class ChannelEvent(
4243
/**
4344
* @return `true` if the event has been handled successfully
4445
*/
45-
operator fun invoke(event: ChannelEvent): Boolean
46+
operator fun invoke(event: ChannelEvent): Future<Boolean>
4647
}
4748
}

bot/connector-web/src/main/kotlin/channel/ChannelMongoDAO.kt

+29-7
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@ import ai.tock.shared.TOCK_BOT_DATABASE
2020
import ai.tock.shared.ensureIndex
2121
import ai.tock.shared.error
2222
import ai.tock.shared.injector
23+
import ai.tock.shared.longProperty
2324
import ai.tock.shared.watch
2425
import com.github.salomonbrys.kodein.instance
2526
import com.mongodb.client.MongoCollection
2627
import com.mongodb.client.MongoDatabase
2728
import com.mongodb.client.model.CreateCollectionOptions
29+
import com.mongodb.client.model.IndexOptions
30+
import java.util.concurrent.TimeUnit
2831
import mu.KotlinLogging
2932
import org.litote.kmongo.and
3033
import org.litote.kmongo.eq
@@ -49,6 +52,10 @@ internal object ChannelMongoDAO : ChannelDAO {
4952
private fun MongoDatabase.collectionExists(collectionName: String): Boolean =
5053
listCollectionNames().contains(collectionName)
5154

55+
private val messageQueueTtl = longProperty("tock_web_sse_message_queue_ttl_days", -1)
56+
private val messageQueueMaxCount = longProperty("tock_web_sse_message_queue_max_count", 50000)
57+
private val messageQueueMaxSize = longProperty("tock_web_sse_message_queue_max_size_kb", 2 * messageQueueMaxCount)
58+
5259
init {
5360
if (!database.collectionExists(COLLECTION_NAME)) {
5461
try {
@@ -57,8 +64,8 @@ internal object ChannelMongoDAO : ChannelDAO {
5764
COLLECTION_NAME,
5865
CreateCollectionOptions()
5966
.capped(true)
60-
.sizeInBytes(100000000)
61-
.maxDocuments(50000)
67+
.sizeInBytes(messageQueueMaxSize * 1000)
68+
.maxDocuments(messageQueueMaxCount)
6269
)
6370
} catch (e: Exception) {
6471
logger.error(e)
@@ -68,7 +75,14 @@ internal object ChannelMongoDAO : ChannelDAO {
6875
webChannelResponseCol = database.getCollection<ChannelEvent>(COLLECTION_NAME)
6976
try {
7077
webChannelResponseCol.ensureIndex(ChannelEvent::appId, ChannelEvent::recipientId, ChannelEvent::status)
71-
// TODO add an index with TTL on ChannelEvent::enqueuedAt once MongoDB supports it (cf. https://jira.mongodb.org/browse/SERVER-77586)
78+
if (messageQueueTtl > 0) {
79+
webChannelResponseCol.ensureIndex(
80+
ChannelEvent::enqueuedAt, indexOptions = IndexOptions().expireAfter(
81+
messageQueueTtl,
82+
TimeUnit.DAYS
83+
)
84+
)
85+
}
7286
} catch (e: Exception) {
7387
logger.error(e)
7488
}
@@ -101,11 +115,19 @@ internal object ChannelMongoDAO : ChannelDAO {
101115

102116
private fun process(event: ChannelEvent, handler: ChannelEvent.Handler) {
103117
try {
104-
if (handler(event)) {
105-
webChannelResponseCol.updateOneById(event._id, ChannelEvent::status setTo ChannelEvent.Status.PROCESSED)
106-
}
118+
handler(event).onComplete({ processed ->
119+
if (processed) {
120+
webChannelResponseCol.updateOneById(event._id, ChannelEvent::status setTo ChannelEvent.Status.PROCESSED)
121+
}
122+
}, { e ->
123+
logger.error(e) {
124+
"Failed to send SSE message"
125+
}
126+
})
107127
} catch (e: Exception) {
108-
logger.error(e)
128+
logger.error(e) {
129+
"Failed to send SSE message"
130+
}
109131
}
110132
}
111133

bot/connector-web/src/main/kotlin/channel/Channels.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import ai.tock.bot.connector.web.WebConnectorResponse
1919
import ai.tock.bot.engine.user.PlayerId
2020
import ai.tock.shared.injector
2121
import ai.tock.shared.provide
22+
import io.vertx.core.Future
2223
import java.util.UUID
2324
import java.util.concurrent.ConcurrentHashMap
2425
import java.util.concurrent.CopyOnWriteArrayList
@@ -30,9 +31,9 @@ internal class Channels {
3031

3132
init {
3233
channelDAO.listenChanges { (appId, recipientId, response) ->
33-
channelsByUser[recipientId]?.filter { it.appId == appId }?.onEach { channel ->
34+
Future.all((channelsByUser[recipientId] ?: emptyList()).filter { it.appId == appId }.map { channel ->
3435
channel.onAction(response)
35-
}?.isNotEmpty() ?: false
36+
}).map { it.list<Unit?>().isNotEmpty() }
3637
}
3738
}
3839

@@ -43,8 +44,7 @@ internal class Channels {
4344
val channel = Channel(appId, UUID.randomUUID(), userId, onAction)
4445
channels.add(channel)
4546
channelDAO.handleMissedEvents(appId, userId) { (_, _, response) ->
46-
channel.onAction(response)
47-
true
47+
channel.onAction(response).map { true }
4848
}
4949
return channel
5050
}

bot/connector-web/src/test/kotlin/ChannelsTest.kt

+2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import io.mockk.just
2828
import io.mockk.mockk
2929
import io.mockk.runs
3030
import io.mockk.slot
31+
import io.vertx.core.Future
3132
import org.junit.jupiter.api.Assertions.assertEquals
3233
import org.junit.jupiter.api.BeforeEach
3334
import org.junit.jupiter.api.Test
@@ -66,6 +67,7 @@ internal class ChannelsTest {
6667
val responses = mutableListOf<WebConnectorResponse>()
6768
channels.register(appId, recipientId) {
6869
responses.add(it)
70+
Future.succeededFuture()
6971
}
7072
assertEquals(expectedMissedResponses, responses)
7173
expectedNewResponses.forEach {

0 commit comments

Comments
 (0)