Skip to content

Commit c181b95

Browse files
committed
Merge branch '846-sse-support'
2 parents add78ff + ba8c677 commit c181b95

13 files changed

+300
-13
lines changed

bot/connector-web/pom.xml

+9
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,15 @@
3333
<artifactId>tock-bot-engine-jackson</artifactId>
3434
</dependency>
3535

36+
<dependency>
37+
<groupId>org.litote.kmongo</groupId>
38+
<artifactId>kmongo</artifactId>
39+
</dependency>
40+
<dependency>
41+
<groupId>org.litote.kmongo</groupId>
42+
<artifactId>kmongo-async</artifactId>
43+
</dependency>
44+
3645
<dependency>
3746
<groupId>ai.tock</groupId>
3847
<artifactId>tock-shared</artifactId>

bot/connector-web/src/main/kotlin/WebConnector.kt

+47-3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import ai.tock.bot.connector.media.MediaCard
2525
import ai.tock.bot.connector.media.MediaCarousel
2626
import ai.tock.bot.connector.media.MediaFile
2727
import ai.tock.bot.connector.media.MediaMessage
28+
import ai.tock.bot.connector.web.channel.ChannelMongoDAO
29+
import ai.tock.bot.connector.web.channel.Channels
2830
import ai.tock.bot.connector.web.send.UrlButton
2931
import ai.tock.bot.connector.web.send.WebCard
3032
import ai.tock.bot.connector.web.send.WebCarousel
@@ -36,21 +38,27 @@ import ai.tock.bot.engine.event.Event
3638
import ai.tock.bot.engine.user.PlayerId
3739
import ai.tock.bot.engine.user.UserPreferences
3840
import ai.tock.shared.Executor
41+
import ai.tock.shared.booleanProperty
3942
import ai.tock.shared.injector
4043
import ai.tock.shared.jackson.mapper
4144
import ai.tock.shared.provide
45+
import com.fasterxml.jackson.databind.module.SimpleModule
46+
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer
4247
import com.fasterxml.jackson.module.kotlin.readValue
4348
import io.vertx.core.http.HttpMethod
4449
import io.vertx.ext.web.RoutingContext
4550
import io.vertx.ext.web.handler.CorsHandler
4651
import mu.KotlinLogging
4752

4853
internal const val WEB_CONNECTOR_ID = "web"
54+
4955
/**
5056
* The web (REST) connector type.
5157
*/
5258
val webConnectorType = ConnectorType(WEB_CONNECTOR_ID)
5359

60+
private val sseEnabled = booleanProperty("tock_web_sse", false)
61+
5462
class WebConnector internal constructor(
5563
val applicationId: String,
5664
val path: String
@@ -62,18 +70,53 @@ class WebConnector internal constructor(
6270

6371
private val executor: Executor get() = injector.provide()
6472

73+
private val channels = Channels(ChannelMongoDAO)
74+
75+
private val webMapper = mapper.copy().registerModule(
76+
SimpleModule().apply {
77+
//fallback for serializing CharSequence
78+
addSerializer(CharSequence::class.java, ToStringSerializer())
79+
}
80+
)
81+
6582
override fun register(controller: ConnectorController) {
83+
6684
controller.registerServices(path) { router ->
6785
logger.debug("deploy web connector services for root path $path ")
6886

6987
router.route(path)
7088
.handler(
7189
CorsHandler.create("*")
7290
.allowedMethod(HttpMethod.POST)
91+
.run {
92+
if (sseEnabled) allowedMethod(HttpMethod.GET) else this
93+
}
7394
.allowedHeader("Access-Control-Allow-Origin")
7495
.allowedHeader("Content-Type")
7596
.allowedHeader("X-Requested-With")
7697
)
98+
if (sseEnabled) {
99+
router.route(path + "/sse")
100+
.handler { context ->
101+
try {
102+
val userId = context.queryParams()["userId"]
103+
val response = context.response()
104+
response.isChunked = true
105+
response.headers().add("Content-Type", "text/event-stream;charset=UTF-8")
106+
response.headers().add("Connection", "keep-alive")
107+
response.headers().add("Cache-Control", "no-cache")
108+
val channelId = channels.register(userId) { webConnectorResponse ->
109+
response.write("event: message\n")
110+
response.write("data: ${webMapper.writeValueAsString(webConnectorResponse)}\n\n")
111+
}
112+
response.closeHandler {
113+
channels.unregister(channelId)
114+
}
115+
} catch (t: Throwable) {
116+
context.fail(t)
117+
}
118+
}
119+
}
77120
router.post(path)
78121
.handler { context ->
79122
try {
@@ -96,7 +139,7 @@ class WebConnector internal constructor(
96139
try {
97140
logger.debug { "Web request input : $body" }
98141
val request: WebConnectorRequest = mapper.readValue(body)
99-
val callback = WebConnectorCallback(applicationId, request.locale, context)
142+
val callback = WebConnectorCallback(applicationId = applicationId, locale = request.locale, context = context, webMapper = webMapper)
100143
controller.handle(request.toEvent(applicationId), ConnectorData(callback))
101144
} catch (t: Throwable) {
102145
BotRepository.requestTimer.throwable(t, timerData)
@@ -110,6 +153,7 @@ class WebConnector internal constructor(
110153
val c = callback as? WebConnectorCallback
111154
c?.addAction(event)
112155
if (event is Action) {
156+
channels.send(event)
113157
if (event.metadata.lastAnswer) {
114158
c?.sendResponse()
115159
}
@@ -141,7 +185,7 @@ class WebConnector internal constructor(
141185
WebMessage(card = WebCard(
142186
title = message.title,
143187
subTitle = message.subTitle,
144-
file = message.file?.url?.let{MediaFile(message.file?.url as String, message.file?.name as String)},
188+
file = message.file?.url?.let { MediaFile(message.file?.url as String, message.file?.name as String) },
145189
buttons = message.actions.map { UrlButton(it.title.toString(), it.url.toString()) }
146190
))
147191
}
@@ -150,7 +194,7 @@ class WebConnector internal constructor(
150194
WebCard(
151195
title = mediaCard.title,
152196
subTitle = mediaCard.subTitle,
153-
file = mediaCard.file?.url?.let{MediaFile(mediaCard.file?.url as String, mediaCard.file?.name as String)},
197+
file = mediaCard.file?.url?.let { MediaFile(mediaCard.file?.url as String, mediaCard.file?.name as String) },
154198
buttons = mediaCard.actions.map { button -> UrlButton(button.title.toString(), button.url.toString()) }
155199
)
156200
}))

bot/connector-web/src/main/kotlin/WebConnectorCallback.kt

+3-10
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,11 @@
1616

1717
package ai.tock.bot.connector.web
1818

19-
import com.fasterxml.jackson.databind.module.SimpleModule
20-
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer
2119
import ai.tock.bot.connector.ConnectorCallbackBase
2220
import ai.tock.bot.engine.action.Action
2321
import ai.tock.bot.engine.action.SendSentence
2422
import ai.tock.bot.engine.event.Event
25-
import ai.tock.shared.jackson.mapper
23+
import com.fasterxml.jackson.databind.ObjectMapper
2624
import io.vertx.ext.web.RoutingContext
2725
import mu.KotlinLogging
2826
import java.util.Locale
@@ -32,16 +30,11 @@ internal class WebConnectorCallback(
3230
applicationId: String,
3331
val locale: Locale,
3432
private val context: RoutingContext,
35-
private val actions: MutableList<Action> = CopyOnWriteArrayList()
33+
private val actions: MutableList<Action> = CopyOnWriteArrayList(),
34+
private val webMapper: ObjectMapper
3635
) : ConnectorCallbackBase(applicationId, webConnectorType) {
3736

3837
private val logger = KotlinLogging.logger {}
39-
private val webMapper = mapper.copy().registerModule(
40-
SimpleModule().apply {
41-
//fallback for serializing CharSequence
42-
addSerializer(CharSequence::class.java, ToStringSerializer())
43-
}
44-
)
4538

4639
fun addAction(event: Event) {
4740
if (event is Action) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright (C) 2017/2020 e-voyageurs technologies
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package ai.tock.bot.connector.web.channel
17+
18+
import java.util.UUID
19+
20+
internal data class Channel(val uuid: UUID, val userId: String, val onAction: ChannelCallback)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright (C) 2017/2020 e-voyageurs technologies
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package ai.tock.bot.connector.web.channel
17+
18+
import ai.tock.bot.connector.web.WebConnectorResponse
19+
20+
internal typealias ChannelCallback = (webConnectorResponse: WebConnectorResponse) -> Unit
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright (C) 2017/2020 e-voyageurs technologies
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package ai.tock.bot.connector.web.channel
17+
18+
internal interface ChannelDAO {
19+
fun listenChanges(listener: (channelEvent: ChannelEvent) -> Unit)
20+
fun save(channelEvent: ChannelEvent)
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright (C) 2017/2020 e-voyageurs technologies
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package ai.tock.bot.connector.web.channel
17+
18+
import ai.tock.bot.connector.web.WebConnectorResponse
19+
20+
internal data class ChannelEvent(val recipientId: String, val webConnectorResponse: WebConnectorResponse)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright (C) 2017/2020 e-voyageurs technologies
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package ai.tock.bot.connector.web.channel
17+
18+
import ai.tock.shared.TOCK_BOT_DATABASE
19+
import ai.tock.shared.error
20+
import ai.tock.shared.injector
21+
import ai.tock.shared.watch
22+
import com.github.salomonbrys.kodein.instance
23+
import com.mongodb.client.MongoCollection
24+
import com.mongodb.client.MongoDatabase
25+
import com.mongodb.client.model.CreateCollectionOptions
26+
import mu.KotlinLogging
27+
import org.litote.kmongo.getCollection
28+
import org.litote.kmongo.reactivestreams.getCollectionOfName
29+
import org.litote.kmongo.save
30+
31+
internal object ChannelMongoDAO : ChannelDAO {
32+
private val collectionName = "web_channel_event"
33+
private val asyncDatabase: com.mongodb.reactivestreams.client.MongoDatabase by injector.instance(
34+
TOCK_BOT_DATABASE
35+
)
36+
private val database: MongoDatabase by injector.instance(
37+
TOCK_BOT_DATABASE
38+
)
39+
private val logger = KotlinLogging.logger {}
40+
private val asyncWebChannelResponseCol: com.mongodb.reactivestreams.client.MongoCollection<ChannelEvent>
41+
private val webChannelResponseCol: MongoCollection<ChannelEvent>
42+
43+
private fun MongoDatabase.collectionExists(collectionName: String): Boolean =
44+
listCollectionNames().contains(collectionName)
45+
46+
init {
47+
if(!database.collectionExists(collectionName)) {
48+
try {
49+
database
50+
.createCollection(
51+
collectionName,
52+
CreateCollectionOptions()
53+
.capped(true)
54+
.sizeInBytes(100000000)
55+
.maxDocuments(50000)
56+
)
57+
} catch (ex: Exception) {
58+
logger.error(ex)
59+
}
60+
}
61+
asyncWebChannelResponseCol = asyncDatabase.getCollectionOfName(collectionName)
62+
webChannelResponseCol = database.getCollection<ChannelEvent>(collectionName)
63+
}
64+
65+
override fun listenChanges(listener: (channelEvent: ChannelEvent) -> Unit) {
66+
asyncWebChannelResponseCol.watch {
67+
val channelEvent = it.fullDocument
68+
if (channelEvent != null)
69+
listener(channelEvent)
70+
}
71+
}
72+
73+
override fun save(channelEvent: ChannelEvent) {
74+
webChannelResponseCol.save(channelEvent)
75+
}
76+
77+
}

0 commit comments

Comments
 (0)