Skip to content

Commit 5bed136

Browse files
committed
fix lots of the threading issues
1 parent 10df0c4 commit 5bed136

File tree

9 files changed

+98
-56
lines changed

9 files changed

+98
-56
lines changed

library/src/androidTest/java/org/xmtp/android/library/ConversationTest.kt

+27-25
Original file line numberDiff line numberDiff line change
@@ -109,28 +109,30 @@ class ConversationTest {
109109
// Overwrite contact as legacy
110110
bobClient.publishUserContact(legacy = true)
111111
aliceClient.publishUserContact(legacy = true)
112-
bobClient.publish(
113-
envelopes = listOf(
114-
EnvelopeBuilder.buildFromTopic(
115-
topic = Topic.userIntro(bob.walletAddress),
116-
timestamp = someTimeAgo,
117-
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
118-
),
119-
EnvelopeBuilder.buildFromTopic(
120-
topic = Topic.userIntro(alice.walletAddress),
121-
timestamp = someTimeAgo,
122-
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
123-
),
124-
EnvelopeBuilder.buildFromTopic(
125-
topic = Topic.directMessageV1(
126-
bob.walletAddress,
127-
alice.walletAddress,
112+
runBlocking {
113+
bobClient.publish(
114+
envelopes = listOf(
115+
EnvelopeBuilder.buildFromTopic(
116+
topic = Topic.userIntro(bob.walletAddress),
117+
timestamp = someTimeAgo,
118+
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
119+
),
120+
EnvelopeBuilder.buildFromTopic(
121+
topic = Topic.userIntro(alice.walletAddress),
122+
timestamp = someTimeAgo,
123+
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
124+
),
125+
EnvelopeBuilder.buildFromTopic(
126+
topic = Topic.directMessageV1(
127+
bob.walletAddress,
128+
alice.walletAddress,
129+
),
130+
timestamp = someTimeAgo,
131+
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
128132
),
129-
timestamp = someTimeAgo,
130-
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
131133
),
132-
),
133-
)
134+
)
135+
}
134136
var conversation = aliceClient.conversations.newConversation(bob.walletAddress)
135137
assertEquals(conversation.peerAddress, bob.walletAddress)
136138
assertEquals(conversation.createdAt, someTimeAgo)
@@ -251,7 +253,7 @@ class ConversationTest {
251253
timestamp = Date(),
252254
message = MessageBuilder.buildFromMessageV2(v2 = tamperedMessage.messageV2).toByteArray(),
253255
)
254-
aliceClient.publish(envelopes = listOf(tamperedEnvelope))
256+
runBlocking { aliceClient.publish(envelopes = listOf(tamperedEnvelope)) }
255257
val bobConversation = bobClient.conversations.newConversation(
256258
aliceWallet.address,
257259
InvitationV1ContextBuilder.buildFromConversation("hi"),
@@ -370,7 +372,7 @@ class ConversationTest {
370372
ConversationV2.create(client = client, invitation = invitationv1, header = header)
371373
assertEquals(fakeContactWallet.address, conversation.peerAddress)
372374

373-
conversation.send(content = "hello world")
375+
runBlocking { conversation.send(content = "hello world") }
374376

375377
val conversationList = client.conversations.list()
376378
val recipientConversation = conversationList.lastOrNull()
@@ -646,7 +648,7 @@ class ConversationTest {
646648
assertEquals(conversation.version, Conversation.Version.V1)
647649
val preparedMessage = conversation.prepareMessage(content = "hi")
648650
val messageID = preparedMessage.messageId
649-
conversation.send(prepared = preparedMessage)
651+
runBlocking { conversation.send(prepared = preparedMessage) }
650652
val messages = conversation.messages()
651653
val message = messages[0]
652654
assertEquals("hi", message.body)
@@ -658,7 +660,7 @@ class ConversationTest {
658660
val conversation = aliceClient.conversations.newConversation(bob.walletAddress)
659661
val preparedMessage = conversation.prepareMessage(content = "hi")
660662
val messageID = preparedMessage.messageId
661-
conversation.send(prepared = preparedMessage)
663+
runBlocking { conversation.send(prepared = preparedMessage) }
662664
val messages = conversation.messages()
663665
val message = messages[0]
664666
assertEquals("hi", message.body)
@@ -673,7 +675,7 @@ class ConversationTest {
673675

674676
// This does not need the `conversation` to `.publish` the message.
675677
// This simulates a background task publishing all pending messages upon connection.
676-
aliceClient.publish(envelopes = preparedMessage.envelopes)
678+
runBlocking { aliceClient.publish(envelopes = preparedMessage.envelopes) }
677679

678680
val messages = conversation.messages()
679681
val message = messages[0]

library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt

+55-16
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,12 @@ class GroupTest {
319319
schema = ReactionSchema.Unicode
320320
)
321321

322-
runBlocking { group.send(content = reaction, options = SendOptions(contentType = ContentTypeReaction)) }
322+
runBlocking {
323+
group.send(
324+
content = reaction,
325+
options = SendOptions(contentType = ContentTypeReaction)
326+
)
327+
}
323328
runBlocking { group.sync() }
324329

325330
val messages = group.messages()
@@ -348,25 +353,47 @@ class GroupTest {
348353
fun testCanStreamAllGroupMessages() = kotlinx.coroutines.test.runTest {
349354
val group = caroClient.conversations.newGroup(listOf(alix.walletAddress))
350355
alixClient.conversations.syncGroups()
351-
alixClient.conversations.streamAllGroupMessages().test {
352-
group.send("hi")
353-
assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8())
354-
group.send("hi again")
355-
assertEquals("hi again", awaitItem().encodedContent.content.toStringUtf8())
356+
val flow = alixClient.conversations.streamAllGroupMessages()
357+
var counter = 0
358+
val job = launch {
359+
flow.catch { e ->
360+
throw Exception("Error collecting flow: $e")
361+
}.collect { message ->
362+
counter++
363+
assertEquals("hi $counter", message.encodedContent.content.toStringUtf8())
364+
if (counter == 2) this.cancel()
365+
}
356366
}
367+
368+
group.send("hi 1")
369+
group.send("hi 2")
370+
371+
job.join()
357372
}
358373

359374
@Test
360375
fun testCanStreamAllMessages() = kotlinx.coroutines.test.runTest {
361376
val group = caroClient.conversations.newGroup(listOf(alix.walletAddress))
362377
val conversation = boClient.conversations.newConversation(alix.walletAddress)
363378
alixClient.conversations.syncGroups()
364-
alixClient.conversations.streamAllMessages(includeGroups = true).test {
365-
group.send("hi")
366-
assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8())
367-
conversation.send("hi again")
368-
assertEquals("hi again", awaitItem().encodedContent.content.toStringUtf8())
379+
380+
val flow = alixClient.conversations.streamAllMessages(includeGroups = true)
381+
var counter = 0
382+
val job = launch {
383+
flow.catch { e ->
384+
throw Exception("Error collecting flow: $e")
385+
}.collect { message ->
386+
counter++
387+
assertEquals("hi $counter", message.encodedContent.content.toStringUtf8())
388+
if (counter == 2) this.cancel()
389+
}
369390
}
391+
392+
group.send("hi 1")
393+
Thread.sleep(1000)
394+
conversation.send("hi 2")
395+
396+
job.join()
370397
}
371398

372399
@Test
@@ -410,12 +437,24 @@ class GroupTest {
410437
val group = caroClient.conversations.newGroup(listOf(alix.walletAddress))
411438
val conversation = boClient.conversations.newConversation(alix.walletAddress)
412439
alixClient.conversations.syncGroups()
413-
alixClient.conversations.streamAllDecryptedMessages(includeGroups = true).test {
414-
group.send("hi")
415-
assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8())
416-
conversation.send("hi again")
417-
assertEquals("hi again", awaitItem().encodedContent.content.toStringUtf8())
440+
441+
val flow = alixClient.conversations.streamAllDecryptedMessages(includeGroups = true)
442+
var counter = 0
443+
val job = launch {
444+
flow.catch { e ->
445+
throw Exception("Error collecting flow: $e")
446+
}.collect { message ->
447+
counter++
448+
assertEquals("hi $counter", message.encodedContent.content.toStringUtf8())
449+
if (counter == 2) this.cancel()
450+
}
418451
}
452+
453+
group.send("hi 1")
454+
Thread.sleep(1000)
455+
conversation.send("hi 2")
456+
457+
job.join()
419458
}
420459

421460
@Test

library/src/androidTest/java/org/xmtp/android/library/MessageTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ class MessageTest {
245245
peerAddress = "0xf4BF19Ed562651837bc11ff975472ABd239D35B5",
246246
sentAt = Date(),
247247
)
248-
convo.send(text = "hello from kotlin")
248+
runBlocking { convo.send(text = "hello from kotlin") }
249249
val messages = convo.messages()
250250
assertEquals(1, messages.size)
251251
assertEquals("hello from kotlin", messages[0].body)

library/src/androidTest/java/org/xmtp/android/library/TestHelpers.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import kotlinx.coroutines.flow.Flow
44
import kotlinx.coroutines.flow.MutableSharedFlow
55
import kotlinx.coroutines.flow.first
66
import kotlinx.coroutines.flow.flowOf
7+
import kotlinx.coroutines.runBlocking
78
import org.junit.Assert.assertEquals
89
import org.xmtp.android.library.codecs.Fetcher
910
import org.xmtp.android.library.messages.ContactBundle
@@ -242,7 +243,7 @@ data class Fixtures(
242243
message = contactBundle.toByteString()
243244
}.build()
244245

245-
client.publish(envelopes = listOf(envelope))
246+
runBlocking { client.publish(envelopes = listOf(envelope)) }
246247
}
247248
}
248249

library/src/main/java/org/xmtp/android/library/Client.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ class Client() {
497497
}
498498
}
499499

500-
fun publish(envelopes: List<Envelope>): PublishResponse {
500+
suspend fun publish(envelopes: List<Envelope>): PublishResponse {
501501
val authorized = AuthorizedIdentity(
502502
address = address,
503503
authorized = privateKeyBundleV1.identityKey.publicKey,
@@ -506,7 +506,7 @@ class Client() {
506506
val authToken = authorized.createAuthToken()
507507
apiClient.setAuthToken(authToken)
508508

509-
return runBlocking { apiClient.publish(envelopes = envelopes) }
509+
return apiClient.publish(envelopes = envelopes)
510510
}
511511

512512
fun ensureUserContactPublished() {

library/src/main/java/org/xmtp/android/library/Contacts.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ class ConsentList(val client: Client) {
152152
ByteArray(message.size) { message[it] },
153153
)
154154

155-
client.publish(listOf(envelope))
155+
runBlocking { client.publish(listOf(envelope)) }
156156
}
157157

158158
fun allow(address: String): ConsentListEntry {

library/src/main/java/org/xmtp/android/library/Conversation.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ sealed class Conversation {
169169
}
170170
}
171171

172-
fun send(prepared: PreparedMessage): String {
172+
suspend fun send(prepared: PreparedMessage): String {
173173
return when (this) {
174174
is V1 -> conversationV1.send(prepared = prepared)
175175
is V2 -> conversationV2.send(prepared = prepared)

library/src/main/java/org/xmtp/android/library/ConversationV1.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,11 @@ data class ConversationV1(
164164
}
165165
}
166166

167-
fun send(text: String, options: SendOptions? = null): String {
167+
suspend fun send(text: String, options: SendOptions? = null): String {
168168
return send(text = text, sendOptions = options, sentAt = null)
169169
}
170170

171-
internal fun send(
171+
internal suspend fun send(
172172
text: String,
173173
sendOptions: SendOptions? = null,
174174
sentAt: Date? = null,
@@ -177,17 +177,17 @@ data class ConversationV1(
177177
return send(preparedMessage)
178178
}
179179

180-
fun <T> send(content: T, options: SendOptions? = null): String {
180+
suspend fun <T> send(content: T, options: SendOptions? = null): String {
181181
val preparedMessage = prepareMessage(content = content, options = options)
182182
return send(preparedMessage)
183183
}
184184

185-
fun send(encodedContent: EncodedContent, options: SendOptions? = null): String {
185+
suspend fun send(encodedContent: EncodedContent, options: SendOptions? = null): String {
186186
val preparedMessage = prepareMessage(encodedContent = encodedContent, options = options)
187187
return send(preparedMessage)
188188
}
189189

190-
fun send(prepared: PreparedMessage): String {
190+
suspend fun send(prepared: PreparedMessage): String {
191191
client.publish(envelopes = prepared.envelopes)
192192
if (client.contacts.consentList.state(address = peerAddress) == ConsentState.UNKNOWN) {
193193
client.contacts.allow(addresses = listOf(peerAddress))

library/src/main/java/org/xmtp/android/library/ConversationV2.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -173,22 +173,22 @@ data class ConversationV2(
173173
}
174174
}
175175

176-
fun <T> send(content: T, options: SendOptions? = null): String {
176+
suspend fun <T> send(content: T, options: SendOptions? = null): String {
177177
val preparedMessage = prepareMessage(content = content, options = options)
178178
return send(preparedMessage)
179179
}
180180

181-
fun send(text: String, options: SendOptions? = null, sentAt: Date? = null): String {
181+
suspend fun send(text: String, options: SendOptions? = null, sentAt: Date? = null): String {
182182
val preparedMessage = prepareMessage(content = text, options = options)
183183
return send(preparedMessage)
184184
}
185185

186-
fun send(encodedContent: EncodedContent, options: SendOptions?): String {
186+
suspend fun send(encodedContent: EncodedContent, options: SendOptions?): String {
187187
val preparedMessage = prepareMessage(encodedContent = encodedContent, options = options)
188188
return send(preparedMessage)
189189
}
190190

191-
fun send(prepared: PreparedMessage): String {
191+
suspend fun send(prepared: PreparedMessage): String {
192192
client.publish(envelopes = prepared.envelopes)
193193
if (client.contacts.consentList.state(address = peerAddress) == ConsentState.UNKNOWN) {
194194
client.contacts.allow(addresses = listOf(peerAddress))

0 commit comments

Comments
 (0)