Skip to content

Commit 9561552

Browse files
authored
Fix: Stream All Messages not correctly continuing flow (#123)
* fix: stream all messages * fix up the lint issues * replaced with the new test * move it out of the while for performance * Update docker-compose.yml * work around no longer needed
1 parent 5cd45da commit 9561552

File tree

5 files changed

+86
-35
lines changed

5 files changed

+86
-35
lines changed

.github/workflows/test.yml

-6
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,6 @@ jobs:
4444
uses: gradle/gradle-build-action@v2
4545
- name: Validate Gradle Wrapper
4646
uses: gradle/wrapper-validation-action@v1
47-
# Workaround for https://github.com/actions/runner-images/issues/8104
48-
- name: Fix Qemu Error
49-
run: |
50-
brew remove --ignore-dependencies qemu
51-
curl -o ./qemu.rb https://raw.githubusercontent.com/Homebrew/homebrew-core/f88e30b3a23ef3735580f9b05535ce5a0a03c9e3/Formula/qemu.rb
52-
brew install ./qemu.rb
5347
- name: Set up Docker
5448
run: brew install docker docker-compose
5549
- name: Start Colima

dev/local/docker-compose.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,4 @@ services:
3030
depends_on:
3131
wakunode:
3232
condition: service_healthy
33-
build: ./test
33+
build: ./test

library/src/androidTest/java/org/xmtp/android/library/ConversationTest.kt

-12
Original file line numberDiff line numberDiff line change
@@ -557,18 +557,6 @@ class ConversationTest {
557557
}
558558
}
559559

560-
@Test
561-
fun testStreamAllMessagesGetsMessageFromKnownConversation() = kotlinx.coroutines.test.runTest {
562-
val fixtures = fixtures()
563-
val client = fixtures.aliceClient
564-
val bobConversation = fixtures.bobClient.conversations.newConversation(client.address)
565-
client.conversations.streamAllMessages().test {
566-
bobConversation.send(text = "hi")
567-
assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8())
568-
awaitComplete()
569-
}
570-
}
571-
572560
@Test
573561
fun testV2RejectsSpoofedContactBundles() {
574562
val topic = "/xmtp/0/m-Gdb7oj5nNdfZ3MJFLAcS4WTABgr6al1hePy6JV1-QUE/proto"

library/src/androidTest/java/org/xmtp/android/library/ConversationsTest.kt

+66
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package org.xmtp.android.library
22

33
import androidx.test.ext.junit.runners.AndroidJUnit4
4+
import kotlinx.coroutines.CoroutineScope
5+
import kotlinx.coroutines.Dispatchers
6+
import kotlinx.coroutines.launch
7+
import kotlinx.coroutines.runBlocking
48
import org.junit.Assert.assertEquals
59
import org.junit.Test
610
import org.junit.runner.RunWith
@@ -16,6 +20,7 @@ import org.xmtp.android.library.messages.createDeterministic
1620
import org.xmtp.android.library.messages.getPublicKeyBundle
1721
import org.xmtp.android.library.messages.toPublicKeyBundle
1822
import org.xmtp.android.library.messages.walletAddress
23+
import java.lang.Thread.sleep
1924
import java.util.Date
2025

2126
@RunWith(AndroidJUnit4::class)
@@ -71,4 +76,65 @@ class ConversationsTest {
7176
assertEquals(conversation.peerAddress, newWallet.address)
7277
assertEquals(conversation.createdAt.time, created.time)
7378
}
79+
80+
@Test
81+
fun testStreamAllMessages() = runBlocking {
82+
val bo = PrivateKeyBuilder()
83+
val alix = PrivateKeyBuilder()
84+
val clientOptions =
85+
ClientOptions(api = ClientOptions.Api(env = XMTPEnvironment.LOCAL, isSecure = false))
86+
val boClient = Client().create(bo, clientOptions)
87+
val alixClient = Client().create(alix, clientOptions)
88+
val boConversation = boClient.conversations.newConversation(alixClient.address)
89+
90+
// Record message stream across all conversations
91+
val allMessages = mutableListOf<DecodedMessage>()
92+
93+
val job = CoroutineScope(Dispatchers.IO).launch {
94+
try {
95+
alixClient.conversations.streamAllMessages().collect { message ->
96+
allMessages.add(message)
97+
}
98+
} catch (e: Exception) {}
99+
}
100+
sleep(2500)
101+
102+
for (i in 0 until 5) {
103+
boConversation.send(text = "Message $i")
104+
sleep(1000)
105+
}
106+
assertEquals(allMessages.size, 5)
107+
108+
val caro = PrivateKeyBuilder()
109+
val caroClient = Client().create(caro, clientOptions)
110+
val caroConversation = caroClient.conversations.newConversation(alixClient.address)
111+
112+
sleep(2500)
113+
114+
for (i in 0 until 5) {
115+
caroConversation.send(text = "Message $i")
116+
sleep(1000)
117+
}
118+
119+
assertEquals(allMessages.size, 10)
120+
121+
job.cancel()
122+
123+
CoroutineScope(Dispatchers.IO).launch {
124+
try {
125+
alixClient.conversations.streamAllMessages().collect { message ->
126+
allMessages.add(message)
127+
}
128+
} catch (e: Exception) {
129+
}
130+
}
131+
sleep(2500)
132+
133+
for (i in 0 until 5) {
134+
boConversation.send(text = "Message $i")
135+
sleep(1000)
136+
}
137+
138+
assertEquals(allMessages.size, 15)
139+
}
74140
}

library/src/main/java/org/xmtp/android/library/Conversations.kt

+19-16
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package org.xmtp.android.library
22

33
import android.util.Log
4+
import kotlinx.coroutines.currentCoroutineContext
45
import kotlinx.coroutines.flow.Flow
56
import kotlinx.coroutines.flow.flow
7+
import kotlinx.coroutines.job
68
import kotlinx.coroutines.runBlocking
79
import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest
810
import org.xmtp.android.library.messages.Envelope
@@ -354,16 +356,15 @@ data class Conversations(
354356
}
355357

356358
fun streamAllMessages(): Flow<DecodedMessage> = flow {
357-
while (true) {
358-
val topics = mutableListOf(
359-
Topic.userInvite(client.address).description,
360-
Topic.userIntro(client.address).description
361-
)
362-
363-
for (conversation in list()) {
364-
topics.add(conversation.topic)
365-
}
359+
val topics = mutableListOf(
360+
Topic.userInvite(client.address).description,
361+
Topic.userIntro(client.address).description
362+
)
366363

364+
for (conversation in list()) {
365+
topics.add(conversation.topic)
366+
}
367+
while (true) {
367368
try {
368369
client.subscribe(topics = topics).collect { envelope ->
369370
when {
@@ -374,24 +375,26 @@ data class Conversations(
374375
}
375376

376377
envelope.contentTopic.startsWith("/xmtp/0/invite-") -> {
377-
val conversation = fromInvite(envelope)
378+
val conversation = fromInvite(envelope = envelope)
378379
conversationsByTopic[conversation.topic] = conversation
379-
// Break so we can resubscribe with the new conversation
380-
return@collect
380+
topics.add(conversation.topic)
381+
currentCoroutineContext().job.cancel()
381382
}
382383

383384
envelope.contentTopic.startsWith("/xmtp/0/intro-") -> {
384-
val conversation = fromIntro(envelope)
385+
val conversation = fromIntro(envelope = envelope)
385386
conversationsByTopic[conversation.topic] = conversation
386387
val decoded = conversation.decode(envelope)
387388
emit(decoded)
388-
// Break so we can resubscribe with the new conversation
389-
return@collect
389+
topics.add(conversation.topic)
390+
currentCoroutineContext().job.cancel()
390391
}
392+
393+
else -> {}
391394
}
392395
}
393396
} catch (error: Exception) {
394-
throw error
397+
continue
395398
}
396399
}
397400
}

0 commit comments

Comments
 (0)