Skip to content

Commit 0134852

Browse files
authored
feat: streaming and threading improvements (#191)
* bump the binaries * make all send functions suspend * fix up the tests to not lock on send * Revert "make all send functions suspend" This reverts commit e4152d8. * Revert "Revert "make all send functions suspend"" This reverts commit 125d1ac. * fix lots of the threading issues * fix up the linter errors * bump to the latest rust library * make delay longer
1 parent 0e1e0bc commit 0134852

23 files changed

+708
-197
lines changed

library/src/androidTest/java/org/xmtp/android/library/AttachmentTest.kt

+7-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package org.xmtp.android.library
22

33
import androidx.test.ext.junit.runners.AndroidJUnit4
44
import com.google.protobuf.kotlin.toByteStringUtf8
5+
import kotlinx.coroutines.runBlocking
56
import org.junit.Assert.assertEquals
67
import org.junit.Test
78
import org.junit.runner.RunWith
@@ -27,10 +28,12 @@ class AttachmentTest {
2728
val aliceConversation =
2829
aliceClient.conversations.newConversation(fixtures.bob.walletAddress)
2930

30-
aliceConversation.send(
31-
content = attachment,
32-
options = SendOptions(contentType = ContentTypeAttachment),
33-
)
31+
runBlocking {
32+
aliceConversation.send(
33+
content = attachment,
34+
options = SendOptions(contentType = ContentTypeAttachment),
35+
)
36+
}
3437
val messages = aliceConversation.messages()
3538
assertEquals(messages.size, 1)
3639
if (messages.size == 1) {

library/src/androidTest/java/org/xmtp/android/library/CodecTest.kt

+25-16
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package org.xmtp.android.library
22

33
import androidx.test.ext.junit.runners.AndroidJUnit4
44
import com.google.protobuf.kotlin.toByteStringUtf8
5+
import kotlinx.coroutines.runBlocking
56
import org.junit.Assert.assertEquals
67
import org.junit.Assert.assertTrue
78
import org.junit.Test
@@ -60,10 +61,12 @@ class CodecTest {
6061
val aliceClient = fixtures.aliceClient
6162
val aliceConversation =
6263
aliceClient.conversations.newConversation(fixtures.bob.walletAddress)
63-
aliceConversation.send(
64-
content = 3.14,
65-
options = SendOptions(contentType = NumberCodec().contentType),
66-
)
64+
runBlocking {
65+
aliceConversation.send(
66+
content = 3.14,
67+
options = SendOptions(contentType = NumberCodec().contentType),
68+
)
69+
}
6770
val messages = aliceConversation.messages()
6871
assertEquals(messages.size, 1)
6972
if (messages.size == 1) {
@@ -82,10 +85,12 @@ class CodecTest {
8285
aliceClient.conversations.newConversation(fixtures.bob.walletAddress)
8386
val textContent = TextCodec().encode(content = "hiya")
8487
val source = DecodedComposite(encodedContent = textContent)
85-
aliceConversation.send(
86-
content = source,
87-
options = SendOptions(contentType = CompositeCodec().contentType),
88-
)
88+
runBlocking {
89+
aliceConversation.send(
90+
content = source,
91+
options = SendOptions(contentType = CompositeCodec().contentType),
92+
)
93+
}
8994
val messages = aliceConversation.messages()
9095
val decoded: DecodedComposite? = messages[0].content()
9196
assertEquals("hiya", decoded?.content())
@@ -107,10 +112,12 @@ class CodecTest {
107112
DecodedComposite(parts = listOf(DecodedComposite(encodedContent = numberContent))),
108113
),
109114
)
110-
aliceConversation.send(
111-
content = source,
112-
options = SendOptions(contentType = CompositeCodec().contentType),
113-
)
115+
runBlocking {
116+
aliceConversation.send(
117+
content = source,
118+
options = SendOptions(contentType = CompositeCodec().contentType),
119+
)
120+
}
114121
val messages = aliceConversation.messages()
115122
val decoded: DecodedComposite? = messages[0].content()
116123
val part1 = decoded!!.parts[0]
@@ -127,10 +134,12 @@ class CodecTest {
127134
val aliceClient = fixtures.aliceClient!!
128135
val aliceConversation =
129136
aliceClient.conversations.newConversation(fixtures.bob.walletAddress)
130-
aliceConversation.send(
131-
content = 3.14,
132-
options = SendOptions(contentType = codec.contentType),
133-
)
137+
runBlocking {
138+
aliceConversation.send(
139+
content = 3.14,
140+
options = SendOptions(contentType = codec.contentType),
141+
)
142+
}
134143
val messages = aliceConversation.messages()
135144
assert(messages.isNotEmpty())
136145

library/src/androidTest/java/org/xmtp/android/library/ConversationTest.kt

+78-66
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import app.cash.turbine.test
55
import com.google.protobuf.kotlin.toByteString
66
import com.google.protobuf.kotlin.toByteStringUtf8
77
import kotlinx.coroutines.ExperimentalCoroutinesApi
8+
import kotlinx.coroutines.runBlocking
89
import org.junit.Assert
910
import org.junit.Assert.assertEquals
1011
import org.junit.Assert.assertFalse
@@ -108,28 +109,30 @@ class ConversationTest {
108109
// Overwrite contact as legacy
109110
bobClient.publishUserContact(legacy = true)
110111
aliceClient.publishUserContact(legacy = true)
111-
bobClient.publish(
112-
envelopes = listOf(
113-
EnvelopeBuilder.buildFromTopic(
114-
topic = Topic.userIntro(bob.walletAddress),
115-
timestamp = someTimeAgo,
116-
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
117-
),
118-
EnvelopeBuilder.buildFromTopic(
119-
topic = Topic.userIntro(alice.walletAddress),
120-
timestamp = someTimeAgo,
121-
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
122-
),
123-
EnvelopeBuilder.buildFromTopic(
124-
topic = Topic.directMessageV1(
125-
bob.walletAddress,
126-
alice.walletAddress,
112+
runBlocking {
113+
bobClient.publish(
114+
envelopes = listOf(
115+
EnvelopeBuilder.buildFromTopic(
116+
topic = Topic.userIntro(bob.walletAddress),
117+
timestamp = someTimeAgo,
118+
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
119+
),
120+
EnvelopeBuilder.buildFromTopic(
121+
topic = Topic.userIntro(alice.walletAddress),
122+
timestamp = someTimeAgo,
123+
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
124+
),
125+
EnvelopeBuilder.buildFromTopic(
126+
topic = Topic.directMessageV1(
127+
bob.walletAddress,
128+
alice.walletAddress,
129+
),
130+
timestamp = someTimeAgo,
131+
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
127132
),
128-
timestamp = someTimeAgo,
129-
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
130133
),
131-
),
132-
)
134+
)
135+
}
133136
var conversation = aliceClient.conversations.newConversation(bob.walletAddress)
134137
assertEquals(conversation.peerAddress, bob.walletAddress)
135138
assertEquals(conversation.createdAt, someTimeAgo)
@@ -173,8 +176,8 @@ class ConversationTest {
173176
val bobConversation = bobClient.conversations.newConversation(aliceWallet.address)
174177
val aliceConversation = aliceClient.conversations.newConversation(bobWallet.address)
175178

176-
bobConversation.send(content = "hey alice")
177-
bobConversation.send(content = "hey alice again")
179+
runBlocking { bobConversation.send(content = "hey alice") }
180+
runBlocking { bobConversation.send(content = "hey alice again") }
178181
val messages = aliceConversation.messages()
179182
assertEquals(2, messages.size)
180183
assertEquals("hey alice", messages[1].body)
@@ -192,7 +195,7 @@ class ConversationTest {
192195
bobWallet.address,
193196
InvitationV1ContextBuilder.buildFromConversation("hi"),
194197
)
195-
bobConversation.send(content = "hey alice")
198+
runBlocking { bobConversation.send(content = "hey alice") }
196199
val messages = aliceConversation.messages()
197200
assertEquals(1, messages.size)
198201
assertEquals("hey alice", messages[0].body)
@@ -248,9 +251,10 @@ class ConversationTest {
248251
val tamperedEnvelope = EnvelopeBuilder.buildFromString(
249252
topic = aliceConversation.topic,
250253
timestamp = Date(),
251-
message = MessageBuilder.buildFromMessageV2(v2 = tamperedMessage.messageV2).toByteArray(),
254+
message = MessageBuilder.buildFromMessageV2(v2 = tamperedMessage.messageV2)
255+
.toByteArray(),
252256
)
253-
aliceClient.publish(envelopes = listOf(tamperedEnvelope))
257+
runBlocking { aliceClient.publish(envelopes = listOf(tamperedEnvelope)) }
254258
val bobConversation = bobClient.conversations.newConversation(
255259
aliceWallet.address,
256260
InvitationV1ContextBuilder.buildFromConversation("hi"),
@@ -268,10 +272,12 @@ class ConversationTest {
268272
fixtures.publishLegacyContact(client = aliceClient)
269273
val bobConversation = bobClient.conversations.newConversation(aliceWallet.address)
270274
val aliceConversation = aliceClient.conversations.newConversation(bobWallet.address)
271-
bobConversation.send(
272-
text = MutableList(1000) { "A" }.toString(),
273-
sendOptions = SendOptions(compression = EncodedContentCompression.GZIP),
274-
)
275+
runBlocking {
276+
bobConversation.send(
277+
text = MutableList(1000) { "A" }.toString(),
278+
sendOptions = SendOptions(compression = EncodedContentCompression.GZIP),
279+
)
280+
}
275281
val messages = aliceConversation.messages()
276282
assertEquals(1, messages.size)
277283
assertEquals(MutableList(1000) { "A" }.toString(), messages[0].content())
@@ -283,10 +289,12 @@ class ConversationTest {
283289
fixtures.publishLegacyContact(client = aliceClient)
284290
val bobConversation = bobClient.conversations.newConversation(aliceWallet.address)
285291
val aliceConversation = aliceClient.conversations.newConversation(bobWallet.address)
286-
bobConversation.send(
287-
content = MutableList(1000) { "A" }.toString(),
288-
options = SendOptions(compression = EncodedContentCompression.DEFLATE),
289-
)
292+
runBlocking {
293+
bobConversation.send(
294+
content = MutableList(1000) { "A" }.toString(),
295+
options = SendOptions(compression = EncodedContentCompression.DEFLATE),
296+
)
297+
}
290298
val messages = aliceConversation.messages()
291299
assertEquals(1, messages.size)
292300
assertEquals(MutableList(1000) { "A" }.toString(), messages[0].content())
@@ -302,10 +310,12 @@ class ConversationTest {
302310
bobWallet.address,
303311
InvitationV1ContextBuilder.buildFromConversation(conversationId = "hi"),
304312
)
305-
bobConversation.send(
306-
text = MutableList(1000) { "A" }.toString(),
307-
sendOptions = SendOptions(compression = EncodedContentCompression.GZIP),
308-
)
313+
runBlocking {
314+
bobConversation.send(
315+
text = MutableList(1000) { "A" }.toString(),
316+
sendOptions = SendOptions(compression = EncodedContentCompression.GZIP),
317+
)
318+
}
309319
val messages = aliceConversation.messages()
310320
assertEquals(1, messages.size)
311321
assertEquals(MutableList(1000) { "A" }.toString(), messages[0].body)
@@ -322,10 +332,12 @@ class ConversationTest {
322332
bobWallet.address,
323333
InvitationV1ContextBuilder.buildFromConversation(conversationId = "hi"),
324334
)
325-
bobConversation.send(
326-
content = MutableList(1000) { "A" }.toString(),
327-
options = SendOptions(compression = EncodedContentCompression.DEFLATE),
328-
)
335+
runBlocking {
336+
bobConversation.send(
337+
content = MutableList(1000) { "A" }.toString(),
338+
options = SendOptions(compression = EncodedContentCompression.DEFLATE),
339+
)
340+
}
329341
val messages = aliceConversation.messages()
330342
assertEquals(1, messages.size)
331343
assertEquals(MutableList(1000) { "A" }.toString(), messages[0].body)
@@ -369,7 +381,7 @@ class ConversationTest {
369381
ConversationV2.create(client = client, invitation = invitationv1, header = header)
370382
assertEquals(fakeContactWallet.address, conversation.peerAddress)
371383

372-
conversation.send(content = "hello world")
384+
runBlocking { conversation.send(content = "hello world") }
373385

374386
val conversationList = client.conversations.list()
375387
val recipientConversation = conversationList.lastOrNull()
@@ -401,9 +413,9 @@ class ConversationTest {
401413

402414
val date = Date()
403415
date.time = date.time - 1000000
404-
bobConversation.send(text = "hey alice 1", sentAt = date)
405-
bobConversation.send(text = "hey alice 2")
406-
bobConversation.send(text = "hey alice 3")
416+
runBlocking { bobConversation.send(text = "hey alice 1", sentAt = date) }
417+
runBlocking { bobConversation.send(text = "hey alice 2") }
418+
runBlocking { bobConversation.send(text = "hey alice 3") }
407419
val messages = aliceConversation.messages(limit = 1)
408420
assertEquals(1, messages.size)
409421
assertEquals("hey alice 3", messages[0].body)
@@ -422,9 +434,9 @@ class ConversationTest {
422434
)
423435
val date = Date()
424436
date.time = date.time - 1000000
425-
bobConversation.send(text = "hey alice 1", sentAt = date)
426-
bobConversation.send(text = "hey alice 2")
427-
bobConversation.send(text = "hey alice 3")
437+
runBlocking { bobConversation.send(text = "hey alice 1", sentAt = date) }
438+
runBlocking { bobConversation.send(text = "hey alice 2") }
439+
runBlocking { bobConversation.send(text = "hey alice 3") }
428440
val messages = aliceConversation.messages(limit = 1)
429441
assertEquals(1, messages.size)
430442
assertEquals("hey alice 3", messages[0].body)
@@ -445,9 +457,9 @@ class ConversationTest {
445457
val steveConversation =
446458
aliceClient.conversations.newConversation(fixtures.caro.walletAddress)
447459

448-
bobConversation.send(text = "hey alice 1")
449-
bobConversation.send(text = "hey alice 2")
450-
steveConversation.send(text = "hey alice 3")
460+
runBlocking { bobConversation.send(text = "hey alice 1") }
461+
runBlocking { bobConversation.send(text = "hey alice 2") }
462+
runBlocking { steveConversation.send(text = "hey alice 3") }
451463
val messages = aliceClient.conversations.listBatchMessages(
452464
listOf(
453465
Pair(steveConversation.topic, null),
@@ -469,9 +481,9 @@ class ConversationTest {
469481
val steveConversation =
470482
aliceClient.conversations.newConversation(fixtures.caro.walletAddress)
471483

472-
bobConversation.send(text = "hey alice 1")
473-
bobConversation.send(text = "hey alice 2")
474-
steveConversation.send(text = "hey alice 3")
484+
runBlocking { bobConversation.send(text = "hey alice 1") }
485+
runBlocking { bobConversation.send(text = "hey alice 2") }
486+
runBlocking { steveConversation.send(text = "hey alice 3") }
475487
val messages = aliceClient.conversations.listBatchDecryptedMessages(
476488
listOf(
477489
Pair(steveConversation.topic, null),
@@ -493,16 +505,16 @@ class ConversationTest {
493505
val steveConversation =
494506
aliceClient.conversations.newConversation(fixtures.caro.walletAddress)
495507

496-
bobConversation.send(text = "hey alice 1 bob")
497-
steveConversation.send(text = "hey alice 1 steve")
508+
runBlocking { bobConversation.send(text = "hey alice 1 bob") }
509+
runBlocking { steveConversation.send(text = "hey alice 1 steve") }
498510

499511
Thread.sleep(100)
500512
val date = Date()
501513

502-
bobConversation.send(text = "hey alice 2 bob")
503-
bobConversation.send(text = "hey alice 3 bob")
504-
steveConversation.send(text = "hey alice 2 steve")
505-
steveConversation.send(text = "hey alice 3 steve")
514+
runBlocking { bobConversation.send(text = "hey alice 2 bob") }
515+
runBlocking { bobConversation.send(text = "hey alice 3 bob") }
516+
runBlocking { steveConversation.send(text = "hey alice 2 steve") }
517+
runBlocking { steveConversation.send(text = "hey alice 3 steve") }
506518

507519
val messages = aliceClient.conversations.listBatchMessages(
508520
listOf(
@@ -645,7 +657,7 @@ class ConversationTest {
645657
assertEquals(conversation.version, Conversation.Version.V1)
646658
val preparedMessage = conversation.prepareMessage(content = "hi")
647659
val messageID = preparedMessage.messageId
648-
conversation.send(prepared = preparedMessage)
660+
runBlocking { conversation.send(prepared = preparedMessage) }
649661
val messages = conversation.messages()
650662
val message = messages[0]
651663
assertEquals("hi", message.body)
@@ -657,7 +669,7 @@ class ConversationTest {
657669
val conversation = aliceClient.conversations.newConversation(bob.walletAddress)
658670
val preparedMessage = conversation.prepareMessage(content = "hi")
659671
val messageID = preparedMessage.messageId
660-
conversation.send(prepared = preparedMessage)
672+
runBlocking { conversation.send(prepared = preparedMessage) }
661673
val messages = conversation.messages()
662674
val message = messages[0]
663675
assertEquals("hi", message.body)
@@ -672,7 +684,7 @@ class ConversationTest {
672684

673685
// This does not need the `conversation` to `.publish` the message.
674686
// This simulates a background task publishing all pending messages upon connection.
675-
aliceClient.publish(envelopes = preparedMessage.envelopes)
687+
runBlocking { aliceClient.publish(envelopes = preparedMessage.envelopes) }
676688

677689
val messages = conversation.messages()
678690
val message = messages[0]
@@ -753,7 +765,7 @@ class ConversationTest {
753765
val bobConversation = bobClient.conversations.newConversation(aliceWallet.address)
754766
val aliceConversation = aliceClient.conversations.newConversation(bobWallet.address)
755767
val encodedContent = TextCodec().encode(content = "hi")
756-
bobConversation.send(encodedContent = encodedContent)
768+
runBlocking { bobConversation.send(encodedContent = encodedContent) }
757769
val messages = aliceConversation.messages()
758770
assertEquals(1, messages.size)
759771
assertEquals("hi", messages[0].content())
@@ -763,7 +775,7 @@ class ConversationTest {
763775
fun testCanSendEncodedContentV2Message() {
764776
val bobConversation = bobClient.conversations.newConversation(aliceWallet.address)
765777
val encodedContent = TextCodec().encode(content = "hi")
766-
bobConversation.send(encodedContent = encodedContent)
778+
runBlocking { bobConversation.send(encodedContent = encodedContent) }
767779
val messages = bobConversation.messages()
768780
assertEquals(1, messages.size)
769781
assertEquals("hi", messages[0].content())
@@ -821,7 +833,7 @@ class ConversationTest {
821833
// Conversations you receive should start as unknown
822834
assertTrue(isUnknown)
823835

824-
aliceConversation.send(content = "hey bob")
836+
runBlocking { aliceConversation.send(content = "hey bob") }
825837
aliceClient.contacts.refreshConsentList()
826838
val isNowAllowed = aliceConversation.consentState() == ConsentState.ALLOWED
827839

0 commit comments

Comments
 (0)