Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

M3l4 websocket #3

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ springDependencyManagementVersion=1.1.0
pluginSpringVersion=1.8.10
pluginJpa=1.8.10
springdocOpenapiUiVersion=2.2.0

pluginShadow=7.1.2
yandexCloudSdkVersion=2.5.1
4 changes: 4 additions & 0 deletions ok-marketplace-app-ktor/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ plugins {
kotlin("multiplatform")
id("io.ktor.plugin")
}
dependencies {
implementation("io.ktor:ktor-server-core-jvm:2.2.4")
implementation("io.ktor:ktor-server-websockets-jvm:2.2.4")
}

repositories {
maven { url = uri("https://maven.pkg.jetbrains.space/public/p/ktor/eap") }
Expand Down
16 changes: 12 additions & 4 deletions ok-marketplace-app-ktor/src/commonMain/kotlin/Application.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,18 @@ import io.ktor.server.engine.*
import io.ktor.server.plugins.contentnegotiation.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.server.websocket.*
import ru.otus.otuskotlin.marketplace.api.v2.apiV2Mapper
import ru.otus.otuskotlin.marketplace.app.v2.v2Ad
import ru.otus.otuskotlin.marketplace.app.v2.v2Offer
import ru.otus.otuskotlin.marketplace.app.v2.wsHandlerV2
import ru.otus.otuskotlin.marketplace.biz.MkplAdProcessor

fun Application.module(processor: MkplAdProcessor = MkplAdProcessor()) {
fun Application.module(processor: MkplAdProcessor = MkplAdProcessor(), installPlugins: Boolean = true) {
if (installPlugins) {
install(WebSockets)
}

routing {
get("/") {
call.respondText("Hello, world!")
Expand All @@ -25,11 +31,13 @@ fun Application.module(processor: MkplAdProcessor = MkplAdProcessor()) {
v2Ad(processor)
v2Offer(processor)
}

webSocket("/ws/v2") {
wsHandlerV2(processor)
}
}
}

fun main() {
embeddedServer(CIO, port = 8080) {
module()
}.start(wait = true)
embeddedServer(CIO, port = 8080, module = Application::module).start(wait = true)
}
68 changes: 68 additions & 0 deletions ok-marketplace-app-ktor/src/commonMain/kotlin/v2/WsController.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package ru.otus.otuskotlin.marketplace.app.v2

import io.ktor.websocket.*
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.serialization.decodeFromString
import ru.otus.otuskotlin.marketplace.api.v2.apiV2Mapper
import ru.otus.otuskotlin.marketplace.api.v2.apiV2ResponseSerialize
import ru.otus.otuskotlin.marketplace.api.v2.models.IRequest
import ru.otus.otuskotlin.marketplace.biz.MkplAdProcessor
import ru.otus.otuskotlin.marketplace.common.MkplContext
import ru.otus.otuskotlin.marketplace.common.helpers.addError
import ru.otus.otuskotlin.marketplace.common.helpers.asMkplError
import ru.otus.otuskotlin.marketplace.common.helpers.isUpdatableCommand
import ru.otus.otuskotlin.marketplace.common.models.MkplWorkMode
import ru.otus.otuskotlin.marketplace.mappers.v2.fromTransport
import ru.otus.otuskotlin.marketplace.mappers.v2.toTransportAd
import ru.otus.otuskotlin.marketplace.mappers.v2.toTransportInit
import ru.otus.otuskotlin.marketplace.stubs.MkplAdStub

val sessions = mutableSetOf<WebSocketSession>()

suspend fun WebSocketSession.wsHandlerV2(processor: MkplAdProcessor) {
sessions.add(this)

// Handle init request
val ctx = MkplContext()
ctx.workMode = MkplWorkMode.STUB
processor.exec(ctx)

val init = apiV2ResponseSerialize(ctx.toTransportInit())
outgoing.send(Frame.Text(init))

// Handle flow
incoming.receiveAsFlow().mapNotNull { it ->
val frame = it as? Frame.Text ?: return@mapNotNull

val jsonStr = frame.readText()
val context = MkplContext()

// Handle without flow destruction
try {
val request = apiV2Mapper.decodeFromString<IRequest>(jsonStr)
context.fromTransport(request)
processor.exec(context)

val result = apiV2ResponseSerialize(context.toTransportAd())

// If change request, response is sent to everyone
if (context.isUpdatableCommand()) {
sessions.forEach {
it.send(Frame.Text(result))
}
} else {
outgoing.send(Frame.Text(result))
}
} catch (_: ClosedReceiveChannelException) {
sessions.clear()
} catch (t: Throwable) {
context.addError(t.asMkplError())

val result = apiV2ResponseSerialize(context.toTransportInit())
outgoing.send(Frame.Text(result))
}
}.collect()
}
19 changes: 18 additions & 1 deletion ok-marketplace-app-ktor/src/jvmMain/kotlin/ApplicationJvm.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,26 @@ import org.slf4j.event.Level
import ru.otus.otuskotlin.marketplace.api.v1.apiV1Mapper
import ru.otus.otuskotlin.marketplace.app.v1.v1Ad
import ru.otus.otuskotlin.marketplace.app.v1.v1Offer
import ru.otus.otuskotlin.marketplace.app.v1.wsHandlerV1
import ru.otus.otuskotlin.marketplace.app.v2.wsHandlerV2
import ru.otus.otuskotlin.marketplace.biz.MkplAdProcessor
import java.time.Duration
import ru.otus.otuskotlin.marketplace.app.module as commonModule

// function with config (application.conf)
fun main(args: Array<String>): Unit = io.ktor.server.cio.EngineMain.main(args)

@Suppress("unused") // Referenced in application.conf
fun Application.moduleJvm() {
install(WebSockets) {
pingPeriod = Duration.ofSeconds(15)
timeout = Duration.ofSeconds(15)
maxFrameSize = Long.MAX_VALUE
masking = false
}

val processor = MkplAdProcessor()
commonModule(processor)
commonModule(processor, false)

install(CachingHeaders)
install(DefaultHeaders)
Expand Down Expand Up @@ -68,6 +78,13 @@ fun Application.moduleJvm() {
v1Offer(processor)
}

webSocket("/ws/v1") {
wsHandlerV1(processor)
}
webSocket("/ws/v2") {
wsHandlerV2(processor)
}

static("static") {
resources("static")
}
Expand Down
64 changes: 64 additions & 0 deletions ok-marketplace-app-ktor/src/jvmMain/kotlin/v1/WsController.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package ru.otus.otuskotlin.marketplace.app.v1

import com.fasterxml.jackson.module.kotlin.readValue
import io.ktor.websocket.*
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.receiveAsFlow
import ru.otus.otuskotlin.marketplace.api.v1.apiV1Mapper
import ru.otus.otuskotlin.marketplace.api.v1.models.IRequest
import ru.otus.otuskotlin.marketplace.biz.MkplAdProcessor
import ru.otus.otuskotlin.marketplace.common.MkplContext
import ru.otus.otuskotlin.marketplace.common.helpers.addError
import ru.otus.otuskotlin.marketplace.common.helpers.asMkplError
import ru.otus.otuskotlin.marketplace.common.helpers.isUpdatableCommand
import ru.otus.otuskotlin.marketplace.common.models.MkplWorkMode
import ru.otus.otuskotlin.marketplace.mappers.v1.*
import ru.otus.otuskotlin.marketplace.stubs.MkplAdStub

val sessions = mutableSetOf<WebSocketSession>()

suspend fun WebSocketSession.wsHandlerV1(processor: MkplAdProcessor) {
sessions.add(this)

// Handle init request
val ctx = MkplContext()
ctx.workMode = MkplWorkMode.STUB
processor.exec(ctx)
val init = apiV1Mapper.writeValueAsString(ctx.toTransportInit())
outgoing.send(Frame.Text(init))

// Handle flow
incoming.receiveAsFlow().mapNotNull { it ->
val frame = it as? Frame.Text ?: return@mapNotNull

val jsonStr = frame.readText()
val context = MkplContext()

// Handle without flow destruction
try {
val request = apiV1Mapper.readValue<IRequest>(jsonStr)
context.fromTransport(request)
processor.exec(context)

val result = apiV1Mapper.writeValueAsString(context.toTransportAd())

// If change request, response is sent to everyone
if (context.isUpdatableCommand()) {
sessions.forEach {
it.send(Frame.Text(result))
}
} else {
outgoing.send(Frame.Text(result))
}
} catch (_: ClosedReceiveChannelException) {
sessions.clear()
} catch (t: Throwable) {
context.addError(t.asMkplError())

val result = apiV1Mapper.writeValueAsString(context.toTransportInit())
outgoing.send(Frame.Text(result))
}
}.collect()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package ru.otus.otuskotlin.marketplace.app.stubs

import io.ktor.client.plugins.websocket.*
import io.ktor.server.testing.*
import io.ktor.websocket.*
import kotlinx.coroutines.withTimeout
import ru.otus.otuskotlin.marketplace.api.v1.apiV1Mapper
import ru.otus.otuskotlin.marketplace.api.v1.models.*
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertIs

class V1WebsocketStubTest {

@Test
fun createStub() {
val request = AdCreateRequest(
requestId = "12345",
ad = AdCreateObject(
title = "Болт",
description = "КРУТЕЙШИЙ",
adType = DealSide.DEMAND,
visibility = AdVisibility.PUBLIC,
),
debug = AdDebug(
mode = AdRequestDebugMode.STUB,
stub = AdRequestDebugStubs.SUCCESS
)
)

testMethod<IResponse>(request) {
assertEquals("12345", it.requestId)
}
}

@Test
fun readStub() {
val request = AdReadRequest(
requestId = "12345",
ad = AdReadObject("666"),
debug = AdDebug(
mode = AdRequestDebugMode.STUB,
stub = AdRequestDebugStubs.SUCCESS
)
)

testMethod<IResponse>(request) {
assertEquals("12345", it.requestId)
}
}

@Test
fun updateStub() {
val request = AdUpdateRequest(
requestId = "12345",
ad = AdUpdateObject(
id = "666",
title = "Болт",
description = "КРУТЕЙШИЙ",
adType = DealSide.DEMAND,
visibility = AdVisibility.PUBLIC,
),
debug = AdDebug(
mode = AdRequestDebugMode.STUB,
stub = AdRequestDebugStubs.SUCCESS
)
)

testMethod<IResponse>(request) {
assertEquals("12345", it.requestId)
}
}

@Test
fun deleteStub() {
val request = AdDeleteRequest(
requestId = "12345",
ad = AdDeleteObject(
id = "666",
),
debug = AdDebug(
mode = AdRequestDebugMode.STUB,
stub = AdRequestDebugStubs.SUCCESS
)
)

testMethod<IResponse>(request) {
assertEquals("12345", it.requestId)
}
}

@Test
fun searchStub() {
val request = AdSearchRequest(
requestId = "12345",
adFilter = AdSearchFilter(),
debug = AdDebug(
mode = AdRequestDebugMode.STUB,
stub = AdRequestDebugStubs.SUCCESS
)
)

testMethod<IResponse>(request) {
assertEquals("12345", it.requestId)
}
}

@Test
fun offersStub() {
val request = AdOffersRequest(
requestId = "12345",
ad = AdReadObject(
id = "666",
),
debug = AdDebug(
mode = AdRequestDebugMode.STUB,
stub = AdRequestDebugStubs.SUCCESS
)
)

testMethod<IResponse>(request) {
assertEquals("12345", it.requestId)
}
}

private inline fun <reified T> testMethod(
request: Any,
crossinline assertBlock: (T) -> Unit
) = testApplication {
val client = createClient {
install(WebSockets)
}

client.webSocket("/ws/v1") {
withTimeout(3000) {
val incame = incoming.receive() as Frame.Text
val response = apiV1Mapper.readValue(incame.readText(), T::class.java)
assertIs<AdInitResponse>(response)
}
send(Frame.Text(apiV1Mapper.writeValueAsString(request)))
withTimeout(3000) {
val incame = incoming.receive() as Frame.Text
val response = apiV1Mapper.readValue(incame.readText(), T::class.java)

assertBlock(response)
}
}
}
}
Loading