@@ -7,8 +7,10 @@ import kotlinx.coroutines.CoroutineScope
7
7
import kotlinx.coroutines.TimeoutCancellationException
8
8
import kotlinx.coroutines.delay
9
9
import kotlinx.coroutines.flow.Flow
10
+ import kotlinx.coroutines.flow.MutableSharedFlow
10
11
import kotlinx.coroutines.flow.callbackFlow
11
12
import kotlinx.coroutines.flow.single
13
+ import kotlinx.coroutines.launch
12
14
import java.util.concurrent.atomic.AtomicInteger
13
15
import java.util.concurrent.atomic.AtomicReference
14
16
import kotlin.time.Duration
@@ -37,14 +39,6 @@ class RpcClient<RequestT, ResponseT>(
37
39
38
40
private val requestProducer = client.producer(topic, key, options, requestType, requestIsNullable)
39
41
private val requestConsumer = client.consumer(topic, key, options, requestType, requestIsNullable) { msg ->
40
- val responseProducer = client.producer(
41
- client.toResponseTopic(topic),
42
- client.toResponseKey(key),
43
- options,
44
- responseType,
45
- responseIsNullable,
46
- )
47
-
48
42
suspend fun sendResponse (response : ResponseT ? , status : RpcStatus , isException : Boolean , isUpdate : Boolean ) {
49
43
val responseMsg = RpcResponseMessage (
50
44
client.toResponseTopic(topic),
@@ -77,15 +71,30 @@ class RpcClient<RequestT, ResponseT>(
77
71
return @consumer
78
72
} catch (ex: Exception ) {
79
73
log.error(
80
- " Uncaught RPC callback error while processing message ${msg.headers.messageId} " +
74
+ " Uncaught RPC callbac#k error while processing message ${msg.headers.messageId} " +
81
75
" with key '$key ' in topic '$topic '" ,
82
76
ex,
83
77
)
84
78
return @consumer
85
- } finally {
86
- responseProducer.destroy()
87
79
}
88
80
}
81
+ private val responseProducer = client.producer(
82
+ client.toResponseTopic(topic),
83
+ client.toResponseKey(key),
84
+ options,
85
+ responseType,
86
+ responseIsNullable,
87
+ )
88
+ private val responseFlow = MutableSharedFlow <BaseBrokerMessage <ResponseT >>()
89
+ private val responseConsumer = client.consumer(
90
+ client.toResponseTopic(topic),
91
+ client.toResponseKey(key),
92
+ options,
93
+ responseType,
94
+ responseIsNullable,
95
+ ) {
96
+ responseFlow.emit(it)
97
+ }
89
98
90
99
suspend fun call (
91
100
request : RequestT ,
@@ -110,36 +119,29 @@ class RpcClient<RequestT, ResponseT>(
110
119
require(maxResponses > 0 ) { " maxResponses must be at least 1" }
111
120
}
112
121
return callbackFlow {
122
+ val cbFlow = this
113
123
val responseCounter = AtomicInteger (0 )
114
124
val timeoutLatch = maxResponses?.let { SuspendingCountDownLatch (it) }
115
125
val messageId = AtomicReference <String ?>(null )
116
126
117
- val responseConsumer = client.consumer(
118
- client.toResponseTopic(topic),
119
- client.toResponseKey(key),
120
- options,
121
- responseType,
122
- responseIsNullable,
123
- ) {
124
- val msg = it.toRpcResponseMessage()
125
- if (msg.headers.inReplyTo != messageId.get()) {
126
- return @consumer
127
+ launch { // Asynchronously consume responses; gets cancelled with callbackFlow
128
+ responseFlow.collect {
129
+ val msg = it.toRpcResponseMessage()
130
+ if (msg.headers.inReplyTo != messageId.get()) {
131
+ return @collect
132
+ }
133
+ // Close the callbackFlow if we receive an exception
134
+ if (msg.headers.isException) {
135
+ cbFlow.close(RpcException (msg.headers.status))
136
+ return @collect
137
+ }
138
+ cbFlow.send(msg)
139
+ timeoutLatch?.countDown()
140
+ val count = responseCounter.incrementAndGet()
141
+ if (maxResponses != null && count >= maxResponses) {
142
+ cbFlow.close()
143
+ }
127
144
}
128
- // Close the flow if we receive an exception
129
- if (msg.headers.isException) {
130
- close(RpcException (msg.headers.status))
131
- return @consumer
132
- }
133
- send(msg)
134
- timeoutLatch?.countDown()
135
- val count = responseCounter.incrementAndGet()
136
- if (maxResponses != null && count >= maxResponses) {
137
- close()
138
- }
139
- }
140
-
141
- invokeOnClose {
142
- responseConsumer.destroy()
143
145
}
144
146
145
147
messageId.set(requestProducer.send(request, services, instances))
@@ -161,6 +163,8 @@ class RpcClient<RequestT, ResponseT>(
161
163
override fun doDestroy () {
162
164
requestProducer.destroy()
163
165
requestConsumer.destroy()
166
+ responseProducer.destroy()
167
+ responseConsumer.destroy()
164
168
}
165
169
166
170
}
0 commit comments