Skip to content

Commit 0935df3

Browse files
committed
some subscribe tweaks
1 parent 895820b commit 0935df3

File tree

2 files changed

+25
-30
lines changed

2 files changed

+25
-30
lines changed

library/src/androidTest/java/org/xmtp/android/library/ConversationTest.kt

+15-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ import androidx.test.ext.junit.runners.AndroidJUnit4
44
import app.cash.turbine.test
55
import com.google.protobuf.kotlin.toByteString
66
import com.google.protobuf.kotlin.toByteStringUtf8
7+
import kotlinx.coroutines.cancel
8+
import kotlinx.coroutines.flow.catch
9+
import kotlinx.coroutines.launch
710
import kotlinx.coroutines.runBlocking
811
import org.junit.Assert.assertEquals
912
import org.junit.Assert.assertFalse
@@ -588,11 +591,19 @@ class ConversationTest {
588591

589592
@Test
590593
fun testCanStreamConversationsV2() = kotlinx.coroutines.test.runTest {
591-
bobClient.conversations.stream().test {
592-
val conversation = bobClient.conversations.newConversation(alice.walletAddress)
593-
conversation.send(content = "hi")
594-
assertEquals("hi", awaitItem().messages(limit = 1).first().body)
594+
val flow = bobClient.conversations.stream()
595+
val job = launch {
596+
flow.catch { e ->
597+
throw Exception("Error collecting flow: $e")
598+
}.collect { convo ->
599+
assert(convo.topic.isNotEmpty())
600+
this.cancel()
601+
}
595602
}
603+
604+
bobClient.conversations.newConversation(alice.walletAddress)
605+
606+
job.join()
596607
}
597608

598609
@Test

library/src/main/java/org/xmtp/android/library/ApiClient.kt

+10-26
Original file line numberDiff line numberDiff line change
@@ -149,39 +149,23 @@ data class GRPCApiClient(
149149
rustV2Client.publish(request = request, authToken = authToken ?: "")
150150
}
151151

152-
override suspend fun subscribe(topics: List<String>): Flow<Envelope> = callbackFlow {
153-
val request = SubscribeRequest.newBuilder().apply {
154-
addAllContentTopics(topics)
155-
}.build()
156-
val subscription = rustV2Client.subscribe(
157-
FfiV2SubscribeRequest(contentTopics = request.contentTopicsList)
158-
)
152+
override suspend fun subscribe(topics: List<String>): Flow<Envelope> = flow {
159153
try {
160-
// Launch a separate coroutine for subscription
161-
val job = launch {
162-
try {
163-
// Continuously emit envelopes received from the subscription.
164-
while (true) {
165-
val nextEnvelope = envelopeFromFFi(subscription.next())
166-
trySend(nextEnvelope).isSuccess
167-
}
168-
} catch (e: Exception) {
169-
if (e !is CancellationException) {
170-
subscription.close()
171-
throw XMTPException("ApiClientError.subscribeError: ${e.message}", e)
172-
}
154+
val subscription = rustV2Client.subscribe(FfiV2SubscribeRequest(topics))
155+
try {
156+
while (true) {
157+
val nextEnvelope = subscription.next()
158+
emit(envelopeFromFFi(nextEnvelope))
173159
}
174-
}
175-
awaitClose {
176-
job.cancel()
177-
subscription.close()
160+
} catch (e: Exception) {
161+
throw e
162+
} finally {
163+
subscription.end()
178164
}
179165
} catch (e: Exception) {
180-
subscription.close()
181166
throw e
182167
}
183168
}
184-
185169
override fun close() {
186170
rustV2Client.close()
187171
}

0 commit comments

Comments
 (0)