Skip to content

Commit cda4f13

Browse files
committed
get message streaming working and start on group streaming
1 parent 457331b commit cda4f13

File tree

3 files changed

+57
-40
lines changed

3 files changed

+57
-40
lines changed

library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt

+30-10
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
package org.xmtp.android.library
22

3+
import android.util.Log
34
import androidx.test.ext.junit.runners.AndroidJUnit4
45
import androidx.test.platform.app.InstrumentationRegistry
56
import app.cash.turbine.test
67
import kotlinx.coroutines.ExperimentalCoroutinesApi
8+
import kotlinx.coroutines.coroutineScope
79
import kotlinx.coroutines.flow.collect
10+
import kotlinx.coroutines.flow.mapLatest
11+
import kotlinx.coroutines.flow.toList
812
import kotlinx.coroutines.launch
913
import kotlinx.coroutines.runBlocking
1014
import org.junit.Assert.assertEquals
@@ -20,6 +24,8 @@ import org.xmtp.android.library.codecs.ReactionSchema
2024
import org.xmtp.android.library.messages.PrivateKey
2125
import org.xmtp.android.library.messages.PrivateKeyBuilder
2226
import org.xmtp.android.library.messages.walletAddress
27+
import java.lang.Thread.sleep
28+
import java.util.Date
2329
import kotlin.time.Duration
2430

2531
@RunWith(AndroidJUnit4::class)
@@ -211,31 +217,45 @@ class GroupTest {
211217
group.streamMessages().test {
212218
group.send("hi")
213219
assertEquals("hi", awaitItem().body)
214-
awaitComplete()
220+
group.send("hi again")
221+
assertEquals("hi again", awaitItem().body)
222+
}
223+
}
224+
225+
@OptIn(ExperimentalCoroutinesApi::class)
226+
@Test
227+
fun testCanStreamDecryptedGroupMessages() = kotlinx.coroutines.test.runTest {
228+
val group = boClient.conversations.newGroup(listOf(alix.walletAddress))
229+
230+
group.streamDecryptedMessages().test {
231+
group.send("hi")
232+
assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8())
233+
group.send("hi again")
234+
assertEquals("hi again", awaitItem().encodedContent.content.toStringUtf8())
215235
}
216236
}
217237

218238
@OptIn(ExperimentalCoroutinesApi::class)
219239
@Test
220240
fun testCanStreamGroups() = kotlinx.coroutines.test.runTest {
221241
boClient.conversations.streamGroups().test {
222-
val conversation =
223-
boClient.conversations.newGroup(listOf(alix.walletAddress.lowercase()))
224-
conversation.send(content = "hi")
242+
val group =
243+
boClient.conversations.newGroup(listOf(alix.walletAddress))
244+
group.send("hi")
225245
assertEquals("hi", awaitItem().messages().first().body)
226-
awaitComplete()
227246
}
228247
}
229248

249+
@OptIn(ExperimentalCoroutinesApi::class)
230250
@Test
231251
fun testCanStreamGroupsAndConversations() = kotlinx.coroutines.test.runTest {
232252
boClient.conversations.stream(includeGroups = true).test {
233-
val group =
234-
boClient.conversations.newGroup(listOf(alix.walletAddress.lowercase()))
235253
val conversation =
236-
boClient.conversations.newConversation(alix.walletAddress.lowercase())
237-
assertEquals("hi", awaitItem().messages().first().body)
238-
awaitComplete()
254+
boClient.conversations.newConversation(alix.walletAddress)
255+
assertEquals(conversation.topic, awaitItem().topic)
256+
val group =
257+
boClient.conversations.newGroup(listOf(alix.walletAddress))
258+
assertEquals(group.id.toHex(), awaitItem().topic)
239259
}
240260
}
241261
}

library/src/main/java/org/xmtp/android/library/Conversations.kt

+26-21
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@ package org.xmtp.android.library
33
import android.util.Log
44
import io.grpc.StatusException
55
import kotlinx.coroutines.CancellationException
6+
import kotlinx.coroutines.channels.awaitClose
67
import kotlinx.coroutines.coroutineScope
78
import kotlinx.coroutines.flow.Flow
89
import kotlinx.coroutines.flow.MutableStateFlow
10+
import kotlinx.coroutines.flow.callbackFlow
911
import kotlinx.coroutines.flow.flow
1012
import kotlinx.coroutines.launch
1113
import kotlinx.coroutines.runBlocking
1214
import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest
1315
import org.xmtp.android.library.GRPCApiClient.Companion.makeSubscribeRequest
16+
import org.xmtp.android.library.libxmtp.Message
1417
import org.xmtp.android.library.messages.DecryptedMessage
1518
import org.xmtp.android.library.messages.Envelope
1619
import org.xmtp.android.library.messages.EnvelopeBuilder
@@ -41,6 +44,9 @@ import uniffi.xmtpv3.FfiConversationCallback
4144
import uniffi.xmtpv3.FfiConversations
4245
import uniffi.xmtpv3.FfiGroup
4346
import uniffi.xmtpv3.FfiListConversationsOptions
47+
import uniffi.xmtpv3.FfiMessage
48+
import uniffi.xmtpv3.FfiMessageCallback
49+
import uniffi.xmtpv3.FfiStreamCloser
4450
import uniffi.xmtpv3.org.xmtp.android.library.libxmtp.GroupEmitter
4551
import java.util.Date
4652
import kotlin.time.Duration.Companion.nanoseconds
@@ -93,6 +99,7 @@ data class Conversations(
9399
),
94100
)
95101
}
102+
96103
fun newGroup(accountAddresses: List<String>): Group {
97104
if (accountAddresses.isEmpty()) {
98105
throw XMTPException("Cannot start an empty group chat.")
@@ -479,56 +486,54 @@ data class Conversations(
479486
* of the information of those conversations according to the topics
480487
* @return Stream of data information for the conversations
481488
*/
482-
fun stream(includeGroups: Boolean = false): Flow<Conversation> = flow {
489+
fun stream(includeGroups: Boolean = false): Flow<Conversation> = callbackFlow {
490+
var stream: FfiStreamCloser? = null
483491
if (includeGroups) {
484-
val groupEmitter = GroupEmitter()
485-
486-
coroutineScope {
487-
launch {
488-
groupEmitter.groups.collect { group ->
489-
emit(Conversation.Group(Group(client, group)))
490-
}
492+
val groupCallback = object : FfiConversationCallback {
493+
override fun onConversation(conversation: FfiGroup) {
494+
Log.e("LOPI", "callback called")
495+
trySend(Conversation.Group(Group(client, conversation)))
491496
}
492497
}
493498

494-
libXMTPConversations?.stream(groupEmitter.callback)
499+
Log.e("LOPI", "starting stream ${libXMTPConversations.toString()}")
500+
stream = libXMTPConversations?.stream(groupCallback)
495501
}
496502

497503
val streamedConversationTopics: MutableSet<String> = mutableSetOf()
498504
client.subscribeTopic(
499505
listOf(Topic.userIntro(client.address), Topic.userInvite(client.address)),
500506
).collect { envelope ->
501-
502507
if (envelope.contentTopic == Topic.userIntro(client.address).description) {
503508
val conversationV1 = fromIntro(envelope = envelope)
504509
if (!streamedConversationTopics.contains(conversationV1.topic)) {
505510
streamedConversationTopics.add(conversationV1.topic)
506-
emit(conversationV1)
511+
send(conversationV1)
507512
}
508513
}
509514

510515
if (envelope.contentTopic == Topic.userInvite(client.address).description) {
511516
val conversationV2 = fromInvite(envelope = envelope)
512517
if (!streamedConversationTopics.contains(conversationV2.topic)) {
513518
streamedConversationTopics.add(conversationV2.topic)
514-
emit(conversationV2)
519+
send(conversationV2)
515520
}
516521
}
517522
}
523+
awaitClose { stream?.end() }
518524
}
519525

520-
fun streamGroups(): Flow<Group> = flow {
521-
val groupEmitter = GroupEmitter()
522-
523-
coroutineScope {
524-
launch {
525-
groupEmitter.groups.collect { group ->
526-
emit(Group(client, group))
527-
}
526+
fun streamGroups(): Flow<Group> = callbackFlow {
527+
val groupCallback = object : FfiConversationCallback {
528+
override fun onConversation(conversation: FfiGroup) {
529+
Log.e("LOPI", "callback called")
530+
trySend(Group(client, conversation))
528531
}
529532
}
530533

531-
libXMTPConversations?.stream(groupEmitter.callback)
534+
Log.e("LOPI", "starting stream ${libXMTPConversations.toString()}")
535+
val stream = libXMTPConversations?.stream(groupCallback)
536+
awaitClose { stream?.end() }
532537
}
533538

534539
/**

library/src/main/java/org/xmtp/android/library/Group.kt

+1-9
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
4040
runBlocking {
4141
libXMTPGroup.send(contentBytes = encodedContent.toByteArray())
4242
}
43-
return id.toString()
43+
return id.toHex()
4444
}
4545

4646
fun <T> prepareMessage(content: T, options: SendOptions?): EncodedContent {
@@ -146,10 +146,6 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
146146
fun streamMessages(): Flow<DecodedMessage> = callbackFlow {
147147
val messageCallback = object : FfiMessageCallback {
148148
override fun onMessage(message: FfiMessage) {
149-
Log.e(
150-
"LOPI",
151-
"INFO Callback - Message callback with ID: " + message.id.toHex() + ", members: " + message.addrFrom
152-
)
153149
trySend(Message(client, message).decode())
154150
}
155151
}
@@ -161,10 +157,6 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
161157
fun streamDecryptedMessages(): Flow<DecryptedMessage> = callbackFlow {
162158
val messageCallback = object : FfiMessageCallback {
163159
override fun onMessage(message: FfiMessage) {
164-
Log.e(
165-
"LOPI",
166-
"INFO Callback - Message callback with ID: " + message.id.toHex() + ", members: " + message.addrFrom
167-
)
168160
trySend(decrypt(Message(client, message)))
169161
}
170162
}

0 commit comments

Comments
 (0)