Skip to content

Commit

Permalink
change exceptions handling
Browse files Browse the repository at this point in the history
  • Loading branch information
xfqwdsj committed Apr 19, 2024
1 parent 9af2116 commit 3e0ddd9
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 27 deletions.
5 changes: 4 additions & 1 deletion client-py/src/nativeMain/kotlin/Callbacks.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ package xyz.xfqlittlefan.fhraise.py
import kotlinx.cinterop.*

@ExperimentalForeignApi
typealias OnError = CPointer<CFunction<(throwablePtr: ThrowableVar) -> Unit>>
typealias OnError = CPointer<CFunction<(throwable: ThrowableVar) -> Unit>>

@ExperimentalForeignApi
typealias OnMessage = CPointer<CFunction<(type: CPointer<ByteVar>, ref: COpaquePointer) -> CPointer<*>>>

@ExperimentalForeignApi
typealias OnClose = CPointer<CFunction<() -> Unit>>
40 changes: 24 additions & 16 deletions client-py/src/nativeMain/kotlin/Client.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ import kotlin.experimental.ExperimentalNativeApi

class Client(private val host: String, private val port: UShort) {
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private val messageChannel = Channel<Message?>(3)
private val messageChannel = Channel<Message>(3)

@OptIn(ExperimentalForeignApi::class)
private val messageErrorChannel = Channel<ThrowableVar>(3)
private val resultChannel = Channel<Message>(3)

@OptIn(ExperimentalSerializationApi::class)
Expand All @@ -45,20 +43,33 @@ class Client(private val host: String, private val port: UShort) {
}

@OptIn(ExperimentalForeignApi::class, ExperimentalNativeApi::class)
fun connect(onError: OnError) = runBlocking {
fun connect(onError: OnError, onClose: OnClose) = runBlocking {
logger.debug("Connecting to $host:$port.")
suspendCancellableCoroutine { continuation ->
logger.debug("Launching coroutine.")
scope.launch(Dispatchers.IO) {
runCatching(onError) {
runCatchingC(onError) {
logger.debug("Starting connection.")
client.webSocket(host = host, port = port.toInt(), path = pyWsPath) {
continuation.resume(true)
logger.debug("Connected.")
while (true) {
if (!isActive) {
logger.info("Connection closed.")
onClose()
break
}
logger.debug("Waiting for message...")
val receiveResult =
runCatching(onError) { messageChannel.send(receiveDeserialized<Message>()) }
val receiveResult = runCatching {
val message = receiveDeserialized<Message>()
logger.debug("Received message.")
messageChannel.send(message)
}.onFailure { throwable ->
throwable.logger.debug("Caught throwable. Callback address: ${onError.rawValue}.")
throwable.cThrowable {
onError(it)
}
}
if (receiveResult.isFailure) continue
logger.debug("Waiting for result...")
sendSerialized<Message>(resultChannel.receive())
Expand All @@ -80,21 +91,18 @@ class Client(private val host: String, private val port: UShort) {
@OptIn(ExperimentalForeignApi::class, ExperimentalNativeApi::class)
fun receive(onMessage: OnMessage, onError: OnError): Boolean {
val message = runBlocking { messageChannel.receive() }
if (message == null) {
return false
}

logger.debug("Sending message to C.")

return runBlocking {
runCatching(onError) {
val ref = StableRef.create(message)
runCatchingC(onError) {
memScoped {
val type = message::class.qualifiedName!!.cstr.ptr
val ref = StableRef.create(message)

resultChannel.send(onMessage(type, ref.asCPointer()).asStableRef<Message>().get())

ref.dispose()
logger.debug("Received result from C.")
}
}.isSuccess
}.isSuccess.also { ref.dispose() }
}
}
}
4 changes: 3 additions & 1 deletion client-py/src/nativeMain/kotlin/Logger.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package xyz.xfqlittlefan.fhraise.py

import kotlinx.datetime.Clock
import kotlinx.datetime.TimeZone
import kotlinx.datetime.toLocalDateTime

class Logger internal constructor(private val tag: Any) {
@Deprecated("This constructor is for calling from C code only.", level = DeprecationLevel.HIDDEN)
Expand All @@ -42,7 +44,7 @@ class Logger internal constructor(private val tag: Any) {

private fun println(level: String, message: String) {
message.split("\n").forEach {
println("${Clock.System.now()} [$tag] $level: $it")
println("${Clock.System.now().toLocalDateTime(TimeZone.currentSystemDefault())} [$tag] $level: $it")
}
}
}
Expand Down
26 changes: 17 additions & 9 deletions client-py/src/nativeMain/kotlin/Throwable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -59,27 +59,35 @@ class ThrowableVar(rawPtr: NativePtr) : CStructVar(rawPtr) {

@ExperimentalNativeApi
@ExperimentalForeignApi
internal inline fun <R> Throwable.sendToC(block: (ThrowableVar) -> R) = memScoped {
logger.debug("Wrapping throwable.")
logger.warn(getStackTrace().joinToString("\n"))
internal inline fun <R> Throwable.cThrowable(block: (ThrowableVar) -> R) = memScoped {
this@cThrowable.logger.debug("Sending throwable to C.")

val throwable = alloc<ThrowableVar>()
val ref = StableRef.create(this)

throwable.type = this::class.qualifiedName?.cstr?.ptr
throwable.ref = ref.asCPointer()
throwable.message = this@sendToC.message?.cstr?.ptr
val stacktraceList = this@sendToC.getStackTrace().map { it.cstr.ptr.pointed }
throwable.message = this@cThrowable.message?.cstr?.ptr
val stacktraceList = this@cThrowable.getStackTrace().map { it.cstr.ptr.pointed }
val stacktraceArray = allocArrayOfPointersTo(stacktraceList)
throwable.stacktrace = stacktraceArray
throwable.stacktraceSize = stacktraceList.size
block(throwable).also { ref.dispose() }
block(throwable).also {
ref.dispose()
this@cThrowable.logger.debug("Throwable sent.")
}
}

@ExperimentalNativeApi
@ExperimentalForeignApi
internal inline fun <R> runCatching(
onError: OnError, block: () -> R
internal inline fun <E, R> runCatchingC(
onError: (ThrowableVar) -> E, block: () -> R
) = runCatching(block).onFailure { throwable ->
throwable.sendToC { onError(it) }
throwable.cThrowable(onError)
}

@ExperimentalNativeApi
@ExperimentalForeignApi
internal inline fun <R> runCatchingC(
onError: OnError, block: () -> R
) = runCatchingC({ onError(it) }, block)

0 comments on commit 3e0ddd9

Please sign in to comment.