Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: streaming and threading improvements #191

Merged
merged 11 commits into from
Mar 14, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.xmtp.android.library

import androidx.test.ext.junit.runners.AndroidJUnit4
import com.google.protobuf.kotlin.toByteStringUtf8
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Test
import org.junit.runner.RunWith
Expand All @@ -27,10 +28,12 @@ class AttachmentTest {
val aliceConversation =
aliceClient.conversations.newConversation(fixtures.bob.walletAddress)

aliceConversation.send(
content = attachment,
options = SendOptions(contentType = ContentTypeAttachment),
)
runBlocking {
aliceConversation.send(
content = attachment,
options = SendOptions(contentType = ContentTypeAttachment),
)
}
val messages = aliceConversation.messages()
assertEquals(messages.size, 1)
if (messages.size == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.xmtp.android.library

import androidx.test.ext.junit.runners.AndroidJUnit4
import com.google.protobuf.kotlin.toByteStringUtf8
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Test
Expand Down Expand Up @@ -60,10 +61,12 @@ class CodecTest {
val aliceClient = fixtures.aliceClient
val aliceConversation =
aliceClient.conversations.newConversation(fixtures.bob.walletAddress)
aliceConversation.send(
content = 3.14,
options = SendOptions(contentType = NumberCodec().contentType),
)
runBlocking {
aliceConversation.send(
content = 3.14,
options = SendOptions(contentType = NumberCodec().contentType),
)
}
val messages = aliceConversation.messages()
assertEquals(messages.size, 1)
if (messages.size == 1) {
Expand All @@ -82,10 +85,12 @@ class CodecTest {
aliceClient.conversations.newConversation(fixtures.bob.walletAddress)
val textContent = TextCodec().encode(content = "hiya")
val source = DecodedComposite(encodedContent = textContent)
aliceConversation.send(
content = source,
options = SendOptions(contentType = CompositeCodec().contentType),
)
runBlocking {
aliceConversation.send(
content = source,
options = SendOptions(contentType = CompositeCodec().contentType),
)
}
val messages = aliceConversation.messages()
val decoded: DecodedComposite? = messages[0].content()
assertEquals("hiya", decoded?.content())
Expand All @@ -107,10 +112,12 @@ class CodecTest {
DecodedComposite(parts = listOf(DecodedComposite(encodedContent = numberContent))),
),
)
aliceConversation.send(
content = source,
options = SendOptions(contentType = CompositeCodec().contentType),
)
runBlocking {
aliceConversation.send(
content = source,
options = SendOptions(contentType = CompositeCodec().contentType),
)
}
val messages = aliceConversation.messages()
val decoded: DecodedComposite? = messages[0].content()
val part1 = decoded!!.parts[0]
Expand All @@ -127,10 +134,12 @@ class CodecTest {
val aliceClient = fixtures.aliceClient!!
val aliceConversation =
aliceClient.conversations.newConversation(fixtures.bob.walletAddress)
aliceConversation.send(
content = 3.14,
options = SendOptions(contentType = codec.contentType),
)
runBlocking {
aliceConversation.send(
content = 3.14,
options = SendOptions(contentType = codec.contentType),
)
}
val messages = aliceConversation.messages()
assert(messages.isNotEmpty())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import app.cash.turbine.test
import com.google.protobuf.kotlin.toByteString
import com.google.protobuf.kotlin.toByteStringUtf8
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.runBlocking
import org.junit.Assert
import org.junit.Assert.assertEquals
import org.junit.Assert.assertFalse
Expand Down Expand Up @@ -108,28 +109,30 @@ class ConversationTest {
// Overwrite contact as legacy
bobClient.publishUserContact(legacy = true)
aliceClient.publishUserContact(legacy = true)
bobClient.publish(
envelopes = listOf(
EnvelopeBuilder.buildFromTopic(
topic = Topic.userIntro(bob.walletAddress),
timestamp = someTimeAgo,
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
),
EnvelopeBuilder.buildFromTopic(
topic = Topic.userIntro(alice.walletAddress),
timestamp = someTimeAgo,
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
),
EnvelopeBuilder.buildFromTopic(
topic = Topic.directMessageV1(
bob.walletAddress,
alice.walletAddress,
runBlocking {
bobClient.publish(
envelopes = listOf(
EnvelopeBuilder.buildFromTopic(
topic = Topic.userIntro(bob.walletAddress),
timestamp = someTimeAgo,
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
),
EnvelopeBuilder.buildFromTopic(
topic = Topic.userIntro(alice.walletAddress),
timestamp = someTimeAgo,
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
),
EnvelopeBuilder.buildFromTopic(
topic = Topic.directMessageV1(
bob.walletAddress,
alice.walletAddress,
),
timestamp = someTimeAgo,
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
),
timestamp = someTimeAgo,
message = MessageBuilder.buildFromMessageV1(v1 = messageV1).toByteArray(),
),
),
)
)
}
var conversation = aliceClient.conversations.newConversation(bob.walletAddress)
assertEquals(conversation.peerAddress, bob.walletAddress)
assertEquals(conversation.createdAt, someTimeAgo)
Expand Down Expand Up @@ -173,8 +176,8 @@ class ConversationTest {
val bobConversation = bobClient.conversations.newConversation(aliceWallet.address)
val aliceConversation = aliceClient.conversations.newConversation(bobWallet.address)

bobConversation.send(content = "hey alice")
bobConversation.send(content = "hey alice again")
runBlocking { bobConversation.send(content = "hey alice") }
runBlocking { bobConversation.send(content = "hey alice again") }
val messages = aliceConversation.messages()
assertEquals(2, messages.size)
assertEquals("hey alice", messages[1].body)
Expand All @@ -192,7 +195,7 @@ class ConversationTest {
bobWallet.address,
InvitationV1ContextBuilder.buildFromConversation("hi"),
)
bobConversation.send(content = "hey alice")
runBlocking { bobConversation.send(content = "hey alice") }
val messages = aliceConversation.messages()
assertEquals(1, messages.size)
assertEquals("hey alice", messages[0].body)
Expand Down Expand Up @@ -248,9 +251,10 @@ class ConversationTest {
val tamperedEnvelope = EnvelopeBuilder.buildFromString(
topic = aliceConversation.topic,
timestamp = Date(),
message = MessageBuilder.buildFromMessageV2(v2 = tamperedMessage.messageV2).toByteArray(),
message = MessageBuilder.buildFromMessageV2(v2 = tamperedMessage.messageV2)
.toByteArray(),
)
aliceClient.publish(envelopes = listOf(tamperedEnvelope))
runBlocking { aliceClient.publish(envelopes = listOf(tamperedEnvelope)) }
val bobConversation = bobClient.conversations.newConversation(
aliceWallet.address,
InvitationV1ContextBuilder.buildFromConversation("hi"),
Expand All @@ -268,10 +272,12 @@ class ConversationTest {
fixtures.publishLegacyContact(client = aliceClient)
val bobConversation = bobClient.conversations.newConversation(aliceWallet.address)
val aliceConversation = aliceClient.conversations.newConversation(bobWallet.address)
bobConversation.send(
text = MutableList(1000) { "A" }.toString(),
sendOptions = SendOptions(compression = EncodedContentCompression.GZIP),
)
runBlocking {
bobConversation.send(
text = MutableList(1000) { "A" }.toString(),
sendOptions = SendOptions(compression = EncodedContentCompression.GZIP),
)
}
val messages = aliceConversation.messages()
assertEquals(1, messages.size)
assertEquals(MutableList(1000) { "A" }.toString(), messages[0].content())
Expand All @@ -283,10 +289,12 @@ class ConversationTest {
fixtures.publishLegacyContact(client = aliceClient)
val bobConversation = bobClient.conversations.newConversation(aliceWallet.address)
val aliceConversation = aliceClient.conversations.newConversation(bobWallet.address)
bobConversation.send(
content = MutableList(1000) { "A" }.toString(),
options = SendOptions(compression = EncodedContentCompression.DEFLATE),
)
runBlocking {
bobConversation.send(
content = MutableList(1000) { "A" }.toString(),
options = SendOptions(compression = EncodedContentCompression.DEFLATE),
)
}
val messages = aliceConversation.messages()
assertEquals(1, messages.size)
assertEquals(MutableList(1000) { "A" }.toString(), messages[0].content())
Expand All @@ -302,10 +310,12 @@ class ConversationTest {
bobWallet.address,
InvitationV1ContextBuilder.buildFromConversation(conversationId = "hi"),
)
bobConversation.send(
text = MutableList(1000) { "A" }.toString(),
sendOptions = SendOptions(compression = EncodedContentCompression.GZIP),
)
runBlocking {
bobConversation.send(
text = MutableList(1000) { "A" }.toString(),
sendOptions = SendOptions(compression = EncodedContentCompression.GZIP),
)
}
val messages = aliceConversation.messages()
assertEquals(1, messages.size)
assertEquals(MutableList(1000) { "A" }.toString(), messages[0].body)
Expand All @@ -322,10 +332,12 @@ class ConversationTest {
bobWallet.address,
InvitationV1ContextBuilder.buildFromConversation(conversationId = "hi"),
)
bobConversation.send(
content = MutableList(1000) { "A" }.toString(),
options = SendOptions(compression = EncodedContentCompression.DEFLATE),
)
runBlocking {
bobConversation.send(
content = MutableList(1000) { "A" }.toString(),
options = SendOptions(compression = EncodedContentCompression.DEFLATE),
)
}
val messages = aliceConversation.messages()
assertEquals(1, messages.size)
assertEquals(MutableList(1000) { "A" }.toString(), messages[0].body)
Expand Down Expand Up @@ -369,7 +381,7 @@ class ConversationTest {
ConversationV2.create(client = client, invitation = invitationv1, header = header)
assertEquals(fakeContactWallet.address, conversation.peerAddress)

conversation.send(content = "hello world")
runBlocking { conversation.send(content = "hello world") }

val conversationList = client.conversations.list()
val recipientConversation = conversationList.lastOrNull()
Expand Down Expand Up @@ -401,9 +413,9 @@ class ConversationTest {

val date = Date()
date.time = date.time - 1000000
bobConversation.send(text = "hey alice 1", sentAt = date)
bobConversation.send(text = "hey alice 2")
bobConversation.send(text = "hey alice 3")
runBlocking { bobConversation.send(text = "hey alice 1", sentAt = date) }
runBlocking { bobConversation.send(text = "hey alice 2") }
runBlocking { bobConversation.send(text = "hey alice 3") }
val messages = aliceConversation.messages(limit = 1)
assertEquals(1, messages.size)
assertEquals("hey alice 3", messages[0].body)
Expand All @@ -422,9 +434,9 @@ class ConversationTest {
)
val date = Date()
date.time = date.time - 1000000
bobConversation.send(text = "hey alice 1", sentAt = date)
bobConversation.send(text = "hey alice 2")
bobConversation.send(text = "hey alice 3")
runBlocking { bobConversation.send(text = "hey alice 1", sentAt = date) }
runBlocking { bobConversation.send(text = "hey alice 2") }
runBlocking { bobConversation.send(text = "hey alice 3") }
val messages = aliceConversation.messages(limit = 1)
assertEquals(1, messages.size)
assertEquals("hey alice 3", messages[0].body)
Expand All @@ -445,9 +457,9 @@ class ConversationTest {
val steveConversation =
aliceClient.conversations.newConversation(fixtures.caro.walletAddress)

bobConversation.send(text = "hey alice 1")
bobConversation.send(text = "hey alice 2")
steveConversation.send(text = "hey alice 3")
runBlocking { bobConversation.send(text = "hey alice 1") }
runBlocking { bobConversation.send(text = "hey alice 2") }
runBlocking { steveConversation.send(text = "hey alice 3") }
val messages = aliceClient.conversations.listBatchMessages(
listOf(
Pair(steveConversation.topic, null),
Expand All @@ -469,9 +481,9 @@ class ConversationTest {
val steveConversation =
aliceClient.conversations.newConversation(fixtures.caro.walletAddress)

bobConversation.send(text = "hey alice 1")
bobConversation.send(text = "hey alice 2")
steveConversation.send(text = "hey alice 3")
runBlocking { bobConversation.send(text = "hey alice 1") }
runBlocking { bobConversation.send(text = "hey alice 2") }
runBlocking { steveConversation.send(text = "hey alice 3") }
val messages = aliceClient.conversations.listBatchDecryptedMessages(
listOf(
Pair(steveConversation.topic, null),
Expand All @@ -493,16 +505,16 @@ class ConversationTest {
val steveConversation =
aliceClient.conversations.newConversation(fixtures.caro.walletAddress)

bobConversation.send(text = "hey alice 1 bob")
steveConversation.send(text = "hey alice 1 steve")
runBlocking { bobConversation.send(text = "hey alice 1 bob") }
runBlocking { steveConversation.send(text = "hey alice 1 steve") }

Thread.sleep(100)
val date = Date()

bobConversation.send(text = "hey alice 2 bob")
bobConversation.send(text = "hey alice 3 bob")
steveConversation.send(text = "hey alice 2 steve")
steveConversation.send(text = "hey alice 3 steve")
runBlocking { bobConversation.send(text = "hey alice 2 bob") }
runBlocking { bobConversation.send(text = "hey alice 3 bob") }
runBlocking { steveConversation.send(text = "hey alice 2 steve") }
runBlocking { steveConversation.send(text = "hey alice 3 steve") }

val messages = aliceClient.conversations.listBatchMessages(
listOf(
Expand Down Expand Up @@ -645,7 +657,7 @@ class ConversationTest {
assertEquals(conversation.version, Conversation.Version.V1)
val preparedMessage = conversation.prepareMessage(content = "hi")
val messageID = preparedMessage.messageId
conversation.send(prepared = preparedMessage)
runBlocking { conversation.send(prepared = preparedMessage) }
val messages = conversation.messages()
val message = messages[0]
assertEquals("hi", message.body)
Expand All @@ -657,7 +669,7 @@ class ConversationTest {
val conversation = aliceClient.conversations.newConversation(bob.walletAddress)
val preparedMessage = conversation.prepareMessage(content = "hi")
val messageID = preparedMessage.messageId
conversation.send(prepared = preparedMessage)
runBlocking { conversation.send(prepared = preparedMessage) }
val messages = conversation.messages()
val message = messages[0]
assertEquals("hi", message.body)
Expand All @@ -672,7 +684,7 @@ class ConversationTest {

// This does not need the `conversation` to `.publish` the message.
// This simulates a background task publishing all pending messages upon connection.
aliceClient.publish(envelopes = preparedMessage.envelopes)
runBlocking { aliceClient.publish(envelopes = preparedMessage.envelopes) }

val messages = conversation.messages()
val message = messages[0]
Expand Down Expand Up @@ -753,7 +765,7 @@ class ConversationTest {
val bobConversation = bobClient.conversations.newConversation(aliceWallet.address)
val aliceConversation = aliceClient.conversations.newConversation(bobWallet.address)
val encodedContent = TextCodec().encode(content = "hi")
bobConversation.send(encodedContent = encodedContent)
runBlocking { bobConversation.send(encodedContent = encodedContent) }
val messages = aliceConversation.messages()
assertEquals(1, messages.size)
assertEquals("hi", messages[0].content())
Expand All @@ -763,7 +775,7 @@ class ConversationTest {
fun testCanSendEncodedContentV2Message() {
val bobConversation = bobClient.conversations.newConversation(aliceWallet.address)
val encodedContent = TextCodec().encode(content = "hi")
bobConversation.send(encodedContent = encodedContent)
runBlocking { bobConversation.send(encodedContent = encodedContent) }
val messages = bobConversation.messages()
assertEquals(1, messages.size)
assertEquals("hi", messages[0].content())
Expand Down Expand Up @@ -821,7 +833,7 @@ class ConversationTest {
// Conversations you receive should start as unknown
assertTrue(isUnknown)

aliceConversation.send(content = "hey bob")
runBlocking { aliceConversation.send(content = "hey bob") }
aliceClient.contacts.refreshConsentList()
val isNowAllowed = aliceConversation.consentState() == ConsentState.ALLOWED

Expand Down
Loading
Loading