Skip to content

Commit 6f1d807

Browse files
authored
Updates to Stream All Messages (#131)
* first pass on upgrading to the subscribe2 functions * update to use the new subscibe2 and make sure all the tests pass * fix up linter issue * try to fix the grpc error * fix up the tests * do todos for tests
1 parent 3ac6990 commit 6f1d807

File tree

7 files changed

+60
-13
lines changed

7 files changed

+60
-13
lines changed

library/src/androidTest/java/org/xmtp/android/library/ConversationsTest.kt

+2-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import androidx.test.ext.junit.runners.AndroidJUnit4
44
import kotlinx.coroutines.CoroutineScope
55
import kotlinx.coroutines.Dispatchers
66
import kotlinx.coroutines.launch
7-
import kotlinx.coroutines.runBlocking
87
import org.junit.Assert.assertEquals
98
import org.junit.Ignore
109
import org.junit.Test
@@ -79,8 +78,8 @@ class ConversationsTest {
7978
}
8079

8180
@Test
82-
@Ignore("Flaky Test")
83-
fun testStreamAllMessages() = runBlocking {
81+
@Ignore("CI Issues")
82+
fun testStreamAllMessages() {
8483
val bo = PrivateKeyBuilder()
8584
val alix = PrivateKeyBuilder()
8685
val clientOptions =
@@ -110,7 +109,6 @@ class ConversationsTest {
110109
val caro = PrivateKeyBuilder()
111110
val caroClient = Client().create(caro, clientOptions)
112111
val caroConversation = caroClient.conversations.newConversation(alixClient.address)
113-
114112
sleep(2500)
115113

116114
for (i in 0 until 5) {

library/src/androidTest/java/org/xmtp/android/library/LocalInstrumentedTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.xmtp.proto.message.contents.PrivateKeyOuterClass.PrivateKeyBundle
3030
import java.util.Date
3131

3232
@RunWith(AndroidJUnit4::class)
33-
@Ignore("All Flaky")
33+
@Ignore("CI Issues")
3434
class LocalInstrumentedTest {
3535
@Test
3636
fun testPublishingAndFetchingContactBundlesWithWhileGeneratingKeys() {

library/src/androidTest/java/org/xmtp/android/library/TestHelpers.kt

+9
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,15 @@ class FakeApiClient : ApiClient {
199199
}
200200
return flowOf()
201201
}
202+
203+
override suspend fun subscribe2(request: Flow<MessageApiOuterClass.SubscribeRequest>): Flow<MessageApiOuterClass.Envelope> {
204+
val env = stream.counts().first()
205+
206+
if (request.first().contentTopicsList.contains(env.contentTopic)) {
207+
return flowOf(env)
208+
}
209+
return flowOf()
210+
}
202211
}
203212

204213
data class Fixtures(

library/src/main/java/org/xmtp/android/library/ApiClient.kt

+17-3
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import kotlinx.coroutines.flow.Flow
99
import org.xmtp.android.library.messages.Pagination
1010
import org.xmtp.android.library.messages.Topic
1111
import org.xmtp.proto.message.api.v1.MessageApiGrpcKt
12-
import org.xmtp.proto.message.api.v1.MessageApiOuterClass
1312
import org.xmtp.proto.message.api.v1.MessageApiOuterClass.BatchQueryRequest
1413
import org.xmtp.proto.message.api.v1.MessageApiOuterClass.BatchQueryResponse
1514
import org.xmtp.proto.message.api.v1.MessageApiOuterClass.Cursor
@@ -18,6 +17,7 @@ import org.xmtp.proto.message.api.v1.MessageApiOuterClass.PublishRequest
1817
import org.xmtp.proto.message.api.v1.MessageApiOuterClass.PublishResponse
1918
import org.xmtp.proto.message.api.v1.MessageApiOuterClass.QueryRequest
2019
import org.xmtp.proto.message.api.v1.MessageApiOuterClass.QueryResponse
20+
import org.xmtp.proto.message.api.v1.MessageApiOuterClass.SubscribeRequest
2121
import java.io.Closeable
2222
import java.util.concurrent.TimeUnit
2323

@@ -35,6 +35,7 @@ interface ApiClient {
3535
suspend fun envelopes(topic: String, pagination: Pagination? = null): List<Envelope>
3636
suspend fun publish(envelopes: List<Envelope>): PublishResponse
3737
suspend fun subscribe(topics: List<String>): Flow<Envelope>
38+
suspend fun subscribe2(request: Flow<SubscribeRequest>): Flow<Envelope>
3839
}
3940

4041
data class GRPCApiClient(
@@ -81,6 +82,10 @@ data class GRPCApiClient(
8182
}.build()
8283
}
8384
}.build()
85+
86+
fun makeSubscribeRequest(
87+
topics: List<String>,
88+
): SubscribeRequest = SubscribeRequest.newBuilder().addAllContentTopics(topics).build()
8489
}
8590

8691
private val channel: ManagedChannel =
@@ -174,8 +179,7 @@ data class GRPCApiClient(
174179
}
175180

176181
override suspend fun subscribe(topics: List<String>): Flow<Envelope> {
177-
val request =
178-
MessageApiOuterClass.SubscribeRequest.newBuilder().addAllContentTopics(topics).build()
182+
val request = makeSubscribeRequest(topics)
179183
val headers = Metadata()
180184

181185
headers.put(CLIENT_VERSION_HEADER_KEY, Constants.VERSION)
@@ -185,6 +189,16 @@ data class GRPCApiClient(
185189

186190
return client.subscribe(request, headers)
187191
}
192+
override suspend fun subscribe2(request: Flow<SubscribeRequest>): Flow<Envelope> {
193+
val headers = Metadata()
194+
195+
headers.put(CLIENT_VERSION_HEADER_KEY, Constants.VERSION)
196+
if (appVersion != null) {
197+
headers.put(APP_VERSION_HEADER_KEY, appVersion)
198+
}
199+
200+
return client.subscribe2(request, headers)
201+
}
188202

189203
override fun close() {
190204
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS)

library/src/main/java/org/xmtp/android/library/Client.kt

+4
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,10 @@ class Client() {
255255
return apiClient.subscribe(topics = topics)
256256
}
257257

258+
suspend fun subscribe2(request: Flow<MessageApiOuterClass.SubscribeRequest>): Flow<Envelope> {
259+
return apiClient.subscribe2(request = request)
260+
}
261+
258262
suspend fun subscribeTopic(topics: List<Topic>): Flow<Envelope> {
259263
return subscribe(topics.map { it.description })
260264
}

library/src/main/java/org/xmtp/android/library/Conversations.kt

+18-5
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package org.xmtp.android.library
22

33
import android.util.Log
4-
import kotlinx.coroutines.currentCoroutineContext
4+
import io.grpc.StatusException
5+
import kotlinx.coroutines.CancellationException
56
import kotlinx.coroutines.flow.Flow
7+
import kotlinx.coroutines.flow.MutableStateFlow
68
import kotlinx.coroutines.flow.flow
7-
import kotlinx.coroutines.job
89
import kotlinx.coroutines.runBlocking
910
import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest
11+
import org.xmtp.android.library.GRPCApiClient.Companion.makeSubscribeRequest
1012
import org.xmtp.android.library.messages.Envelope
1113
import org.xmtp.android.library.messages.EnvelopeBuilder
1214
import org.xmtp.android.library.messages.InvitationV1
@@ -365,9 +367,12 @@ data class Conversations(
365367
for (conversation in list()) {
366368
topics.add(conversation.topic)
367369
}
370+
371+
val subscribeFlow = MutableStateFlow(makeSubscribeRequest(topics))
372+
368373
while (true) {
369374
try {
370-
client.subscribe(topics = topics).collect { envelope ->
375+
client.subscribe2(request = subscribeFlow).collect { envelope ->
371376
when {
372377
conversationsByTopic.containsKey(envelope.contentTopic) -> {
373378
val conversation = conversationsByTopic[envelope.contentTopic]
@@ -379,7 +384,7 @@ data class Conversations(
379384
val conversation = fromInvite(envelope = envelope)
380385
conversationsByTopic[conversation.topic] = conversation
381386
topics.add(conversation.topic)
382-
currentCoroutineContext().job.cancel()
387+
subscribeFlow.value = makeSubscribeRequest(topics)
383388
}
384389

385390
envelope.contentTopic.startsWith("/xmtp/0/intro-") -> {
@@ -388,12 +393,20 @@ data class Conversations(
388393
val decoded = conversation.decode(envelope)
389394
emit(decoded)
390395
topics.add(conversation.topic)
391-
currentCoroutineContext().job.cancel()
396+
subscribeFlow.value = makeSubscribeRequest(topics)
392397
}
393398

394399
else -> {}
395400
}
396401
}
402+
} catch (error: CancellationException) {
403+
break
404+
} catch (error: StatusException) {
405+
if (error.status.code == io.grpc.Status.Code.UNAVAILABLE) {
406+
continue
407+
} else {
408+
break
409+
}
397410
} catch (error: Exception) {
398411
continue
399412
}

library/src/test/java/org/xmtp/android/library/TestHelpers.kt

+9
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,15 @@ class FakeApiClient : ApiClient {
195195
}
196196
return flowOf()
197197
}
198+
199+
override suspend fun subscribe2(request: Flow<MessageApiOuterClass.SubscribeRequest>): Flow<MessageApiOuterClass.Envelope> {
200+
val env = stream.counts().first()
201+
202+
if (request.first().contentTopicsList.contains(env.contentTopic)) {
203+
return flowOf(env)
204+
}
205+
return flowOf()
206+
}
198207
}
199208

200209
data class Fixtures(val aliceAccount: PrivateKeyBuilder, val bobAccount: PrivateKeyBuilder) {

0 commit comments

Comments
 (0)