Skip to content

Commit 2bbeb13

Browse files
committed
revert subscription work
1 parent 40181b7 commit 2bbeb13

File tree

7 files changed

+87
-100
lines changed

7 files changed

+87
-100
lines changed

library/src/androidTest/java/org/xmtp/android/library/ConversationTest.kt

+12-40
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
package org.xmtp.android.library
22

33
import androidx.test.ext.junit.runners.AndroidJUnit4
4+
import app.cash.turbine.test
45
import com.google.protobuf.kotlin.toByteString
56
import com.google.protobuf.kotlin.toByteStringUtf8
6-
import kotlinx.coroutines.cancel
7-
import kotlinx.coroutines.flow.catch
8-
import kotlinx.coroutines.launch
97
import kotlinx.coroutines.runBlocking
108
import org.junit.Assert.assertEquals
119
import org.junit.Assert.assertFalse
@@ -590,19 +588,11 @@ class ConversationTest {
590588

591589
@Test
592590
fun testCanStreamConversationsV2() = kotlinx.coroutines.test.runTest {
593-
val flow = bobClient.conversations.stream()
594-
val job = launch {
595-
flow.catch { e ->
596-
throw Exception("Error collecting flow: $e")
597-
}.collect { convo ->
598-
assert(convo.topic.isNotEmpty())
599-
this.cancel()
600-
}
591+
bobClient.conversations.stream().test {
592+
val conversation = bobClient.conversations.newConversation(alice.walletAddress)
593+
conversation.send(content = "hi")
594+
assertEquals("hi", awaitItem().messages(limit = 1).first().body)
601595
}
602-
603-
bobClient.conversations.newConversation(alice.walletAddress)
604-
605-
job.join()
606596
}
607597

608598
@Test
@@ -611,37 +601,19 @@ class ConversationTest {
611601
fixtures.publishLegacyContact(client = bobClient)
612602
fixtures.publishLegacyContact(client = aliceClient)
613603
val conversation = aliceClient.conversations.newConversation(bob.walletAddress)
614-
615-
val flow = conversation.streamMessages()
616-
val job = launch {
617-
flow.catch { e ->
618-
throw Exception("Error collecting flow: $e")
619-
}.collect { message ->
620-
assertEquals("hi alice", message.encodedContent.content.toStringUtf8())
621-
this.cancel()
622-
}
604+
conversation.streamMessages().test {
605+
conversation.send("hi alice")
606+
assertEquals("hi alice", awaitItem().encodedContent.content.toStringUtf8())
623607
}
624-
625-
conversation.send("hi alice")
626-
627-
job.join()
628608
}
609+
629610
@Test
630611
fun testStreamingMessagesFromV2Conversations() = kotlinx.coroutines.test.runTest {
631612
val conversation = aliceClient.conversations.newConversation(bob.walletAddress)
632-
val flow = conversation.streamMessages()
633-
val job = launch {
634-
flow.catch { e ->
635-
throw Exception("Error collecting flow: $e")
636-
}.collect { message ->
637-
assertEquals("hi alice", message.encodedContent.content.toStringUtf8())
638-
this.cancel()
639-
}
613+
conversation.streamMessages().test {
614+
conversation.send("hi alice")
615+
assertEquals("hi alice", awaitItem().encodedContent.content.toStringUtf8())
640616
}
641-
642-
conversation.send("hi alice")
643-
644-
job.join()
645617
}
646618

647619
@Test

library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt

+7-25
Original file line numberDiff line numberDiff line change
@@ -507,32 +507,14 @@ class GroupTest {
507507

508508
@Test
509509
fun testCanStreamGroupsAndConversations() = kotlinx.coroutines.test.runTest {
510-
val flow = boClient.conversations.streamAll()
511-
var counter = 0
512-
val job = launch {
513-
flow.catch { e ->
514-
throw Exception("Error collecting flow: $e")
515-
}.collect { convo ->
516-
counter++
517-
when (convo) {
518-
is Conversation.Group -> {
519-
assert(true)
520-
}
521-
is Conversation.V2 -> {
522-
assert(true)
523-
}
524-
else -> {
525-
assert(false)
526-
}
527-
}
528-
if (counter == 2) this.cancel()
529-
}
510+
boClient.conversations.streamAll().test {
511+
val group =
512+
caroClient.conversations.newGroup(listOf(bo.walletAddress))
513+
assertEquals(group.topic, awaitItem().topic)
514+
val conversation =
515+
alixClient.conversations.newConversation(bo.walletAddress)
516+
assertEquals(conversation.topic, awaitItem().topic)
530517
}
531-
532-
caroClient.conversations.newGroup(listOf(bo.walletAddress))
533-
alixClient.conversations.newConversation(bo.walletAddress)
534-
535-
job.join()
536518
}
537519

538520
@Test

library/src/androidTest/java/org/xmtp/android/library/TestHelpers.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -193,10 +193,10 @@ class FakeApiClient : ApiClient {
193193
PublishResponse.newBuilder().build()
194194
}
195195

196-
override suspend fun subscribe(topics: List<String>): Flow<MessageApiOuterClass.Envelope> {
196+
override suspend fun subscribe(request: Flow<MessageApiOuterClass.SubscribeRequest>): Flow<MessageApiOuterClass.Envelope> {
197197
val env = stream.counts().first()
198198

199-
if (topics.contains(env.contentTopic)) {
199+
if (request.first().contentTopicsList.contains(env.contentTopic)) {
200200
return flowOf(env)
201201
}
202202
return flowOf()

library/src/main/java/org/xmtp/android/library/ApiClient.kt

+45-22
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
package org.xmtp.android.library
22

33
import com.google.protobuf.kotlin.toByteString
4+
import io.grpc.Grpc
5+
import io.grpc.InsecureChannelCredentials
6+
import io.grpc.ManagedChannel
47
import kotlinx.coroutines.flow.Flow
58
import kotlinx.coroutines.flow.flow
69
import org.xmtp.android.library.messages.Pagination
710
import org.xmtp.android.library.messages.Topic
8-
import org.xmtp.proto.message.api.v1.MessageApiOuterClass.BatchQueryResponse
9-
import org.xmtp.proto.message.api.v1.MessageApiOuterClass.Cursor
10-
import org.xmtp.proto.message.api.v1.MessageApiOuterClass.Envelope
11-
import org.xmtp.proto.message.api.v1.MessageApiOuterClass.PagingInfo
12-
import org.xmtp.proto.message.api.v1.MessageApiOuterClass.QueryRequest
11+
import io.grpc.Metadata
12+
import io.grpc.TlsChannelCredentials
13+
import org.xmtp.proto.message.api.v1.MessageApiGrpcKt
14+
import org.xmtp.proto.message.api.v1.MessageApiOuterClass.*
1315
import org.xmtp.proto.message.api.v1.MessageApiOuterClass.QueryResponse
14-
import org.xmtp.proto.message.api.v1.MessageApiOuterClass.SortDirection
1516
import uniffi.xmtpv3.FfiCursor
1617
import uniffi.xmtpv3.FfiEnvelope
1718
import uniffi.xmtpv3.FfiPagingInfo
@@ -24,6 +25,7 @@ import uniffi.xmtpv3.FfiV2QueryRequest
2425
import uniffi.xmtpv3.FfiV2QueryResponse
2526
import uniffi.xmtpv3.FfiV2SubscribeRequest
2627
import java.io.Closeable
28+
import java.util.concurrent.TimeUnit
2729

2830
interface ApiClient {
2931
val environment: XMTPEnvironment
@@ -38,7 +40,7 @@ interface ApiClient {
3840
suspend fun batchQuery(requests: List<QueryRequest>): BatchQueryResponse
3941
suspend fun envelopes(topic: String, pagination: Pagination? = null): List<Envelope>
4042
suspend fun publish(envelopes: List<Envelope>)
41-
suspend fun subscribe(topics: List<String>): Flow<Envelope>
43+
suspend fun subscribe(request: Flow<SubscribeRequest>): Flow<Envelope>
4244
}
4345

4446
data class GRPCApiClient(
@@ -48,6 +50,13 @@ data class GRPCApiClient(
4850
) :
4951
ApiClient, Closeable {
5052
companion object {
53+
54+
val CLIENT_VERSION_HEADER_KEY: Metadata.Key<String> =
55+
Metadata.Key.of("X-Client-Version", Metadata.ASCII_STRING_MARSHALLER)
56+
57+
val APP_VERSION_HEADER_KEY: Metadata.Key<String> =
58+
Metadata.Key.of("X-App-Version", Metadata.ASCII_STRING_MARSHALLER)
59+
5160
fun makeQueryRequest(
5261
topic: String,
5362
pagination: Pagination? = null,
@@ -76,6 +85,26 @@ data class GRPCApiClient(
7685
}.build()
7786
}
7887
}.build()
88+
89+
fun makeSubscribeRequest(
90+
topics: List<String>,
91+
): SubscribeRequest = SubscribeRequest.newBuilder().addAllContentTopics(topics).build()
92+
}
93+
94+
private val channel: ManagedChannel by lazy {
95+
Grpc.newChannelBuilderForAddress(
96+
environment.getValue(),
97+
if (environment == XMTPEnvironment.LOCAL) 5556 else 443,
98+
if (environment != XMTPEnvironment.LOCAL) {
99+
TlsChannelCredentials.create()
100+
} else {
101+
InsecureChannelCredentials.create()
102+
},
103+
).build()
104+
}
105+
106+
private val client: MessageApiGrpcKt.MessageApiCoroutineStub by lazy {
107+
MessageApiGrpcKt.MessageApiCoroutineStub(channel)
79108
}
80109

81110
private var authToken: String? = null
@@ -133,25 +162,19 @@ data class GRPCApiClient(
133162
rustV2Client.publish(request = request, authToken = authToken ?: "")
134163
}
135164

136-
override suspend fun subscribe(topics: List<String>): Flow<Envelope> = flow {
137-
try {
138-
val subscription = rustV2Client.subscribe(FfiV2SubscribeRequest(topics))
139-
try {
140-
while (true) {
141-
val nextEnvelope = subscription.next()
142-
emit(envelopeFromFFi(nextEnvelope))
143-
}
144-
} catch (e: Exception) {
145-
throw e
146-
} finally {
147-
subscription.end()
148-
}
149-
} catch (e: Exception) {
150-
throw e
165+
override suspend fun subscribe(request: Flow<SubscribeRequest>): Flow<Envelope> {
166+
val headers = Metadata()
167+
headers.put(CLIENT_VERSION_HEADER_KEY, Constants.VERSION)
168+
if (appVersion != null) {
169+
headers.put(APP_VERSION_HEADER_KEY, appVersion)
151170
}
171+
172+
return client.subscribe2(request, headers)
152173
}
174+
153175
override fun close() {
154176
rustV2Client.close()
177+
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS)
155178
}
156179

157180
private fun envelopeToFFi(envelope: Envelope): FfiEnvelope {

library/src/main/java/org/xmtp/android/library/Client.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ import com.google.crypto.tink.subtle.Base64
99
import com.google.gson.GsonBuilder
1010
import kotlinx.coroutines.Dispatchers
1111
import kotlinx.coroutines.flow.Flow
12+
import kotlinx.coroutines.flow.flowOf
1213
import kotlinx.coroutines.runBlocking
1314
import kotlinx.coroutines.withContext
1415
import org.web3j.crypto.Keys
1516
import org.web3j.crypto.Keys.toChecksumAddress
17+
import org.xmtp.android.library.GRPCApiClient.Companion.makeSubscribeRequest
1618
import org.xmtp.android.library.codecs.ContentCodec
1719
import org.xmtp.android.library.codecs.TextCodec
1820
import org.xmtp.android.library.libxmtp.XMTPLogger
@@ -516,7 +518,11 @@ class Client() {
516518
}
517519

518520
suspend fun subscribe(topics: List<String>): Flow<Envelope> {
519-
return apiClient.subscribe(topics)
521+
return subscribe2(flowOf(makeSubscribeRequest(topics)))
522+
}
523+
524+
suspend fun subscribe2(request: Flow<MessageApiOuterClass.SubscribeRequest>): Flow<Envelope> {
525+
return apiClient.subscribe(request = request)
520526
}
521527

522528
suspend fun fetchConversation(topic: String?, includeGroups: Boolean = false): Conversation? {

library/src/main/java/org/xmtp/android/library/Conversations.kt

+12-8
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ import com.google.protobuf.kotlin.toByteString
55
import io.grpc.StatusException
66
import kotlinx.coroutines.CancellationException
77
import kotlinx.coroutines.channels.awaitClose
8-
import kotlinx.coroutines.currentCoroutineContext
98
import kotlinx.coroutines.flow.Flow
9+
import kotlinx.coroutines.flow.MutableStateFlow
1010
import kotlinx.coroutines.flow.callbackFlow
1111
import kotlinx.coroutines.flow.flow
1212
import kotlinx.coroutines.flow.merge
13-
import kotlinx.coroutines.job
1413
import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest
14+
import org.xmtp.android.library.GRPCApiClient.Companion.makeSubscribeRequest
1515
import org.xmtp.android.library.libxmtp.MessageV3
1616
import org.xmtp.android.library.messages.DecryptedMessage
1717
import org.xmtp.android.library.messages.Envelope
@@ -616,9 +616,11 @@ data class Conversations(
616616
topics.add(conversation.topic)
617617
}
618618

619+
val subscribeFlow = MutableStateFlow(makeSubscribeRequest(topics))
620+
619621
while (true) {
620622
try {
621-
client.subscribe(topics).collect { envelope ->
623+
client.subscribe2(request = subscribeFlow).collect { envelope ->
622624
when {
623625
conversationsByTopic.containsKey(envelope.contentTopic) -> {
624626
val conversation = conversationsByTopic[envelope.contentTopic]
@@ -630,7 +632,7 @@ data class Conversations(
630632
val conversation = fromInvite(envelope = envelope)
631633
conversationsByTopic[conversation.topic] = conversation
632634
topics.add(conversation.topic)
633-
currentCoroutineContext().job.cancel()
635+
subscribeFlow.value = makeSubscribeRequest(topics)
634636
}
635637

636638
envelope.contentTopic.startsWith("/xmtp/0/intro-") -> {
@@ -639,7 +641,7 @@ data class Conversations(
639641
val decoded = conversation.decode(envelope)
640642
emit(decoded)
641643
topics.add(conversation.topic)
642-
currentCoroutineContext().job.cancel()
644+
subscribeFlow.value = makeSubscribeRequest(topics)
643645
}
644646

645647
else -> {}
@@ -685,9 +687,11 @@ data class Conversations(
685687
topics.add(conversation.topic)
686688
}
687689

690+
val subscribeFlow = MutableStateFlow(makeSubscribeRequest(topics))
691+
688692
while (true) {
689693
try {
690-
client.subscribe(topics).collect { envelope ->
694+
client.subscribe2(request = subscribeFlow).collect { envelope ->
691695
when {
692696
conversationsByTopic.containsKey(envelope.contentTopic) -> {
693697
val conversation = conversationsByTopic[envelope.contentTopic]
@@ -699,7 +703,7 @@ data class Conversations(
699703
val conversation = fromInvite(envelope = envelope)
700704
conversationsByTopic[conversation.topic] = conversation
701705
topics.add(conversation.topic)
702-
currentCoroutineContext().job.cancel()
706+
subscribeFlow.value = makeSubscribeRequest(topics)
703707
}
704708

705709
envelope.contentTopic.startsWith("/xmtp/0/intro-") -> {
@@ -708,7 +712,7 @@ data class Conversations(
708712
val decrypted = conversation.decrypt(envelope)
709713
emit(decrypted)
710714
topics.add(conversation.topic)
711-
currentCoroutineContext().job.cancel()
715+
subscribeFlow.value = makeSubscribeRequest(topics)
712716
}
713717

714718
else -> {}

library/src/test/java/org/xmtp/android/library/TestHelpers.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -186,10 +186,10 @@ class FakeApiClient : ApiClient {
186186
published.addAll(envelopes)
187187
}
188188

189-
override suspend fun subscribe(topics: List<String>): Flow<MessageApiOuterClass.Envelope> {
189+
override suspend fun subscribe(request: Flow<MessageApiOuterClass.SubscribeRequest>): Flow<MessageApiOuterClass.Envelope> {
190190
val env = stream.counts().first()
191191

192-
if (topics.contains(env.contentTopic)) {
192+
if (request.first().contentTopicsList.contains(env.contentTopic)) {
193193
return flowOf(env)
194194
}
195195
return flowOf()

0 commit comments

Comments
 (0)