Skip to content

Commit e041fa8

Browse files
nplastererneekolas
andauthored
Group Chat Streaming (#166)
* first pass at all the pieces needed for threading * a few more places * make signing key extend inboxOwner * get it decoding messages * dump the latest v3 code * write a test for creating a v3 client * use created At * write test for creating libxmtp client and confirm it works * move these change to a different branch * dont pass a conversation * fix linter * point to local not dev * feature flag the client creating of libxmtp while in alpha * change to local * fix up the test helper * feat: fix up the example app * fix up the 22 compat issue * Revert "move these change to a different branch" This reverts commit 8998d13. * try and get some tests running * setup local database * have it create correctly * write tests for functionality * test sending * send encoded content * add updates to the v3 bindings * add updates to the v3 bindings * store in a keystore * move to preferences * fix lint * Fix build issues * new libxmtp updates * dump the latest schema * update to the latest client creation flow * get the create working again * use the keystore because its more secure * fix up linter compat again * flaky test * get the tests all passing * get the example working with groups * create a group with two addresses * more tweaks to the example app to get groups working * add streaming messages to groups * a few example UI tweaks * fix the lowercasing issue in the example app * dump the schema again * implement all the conversation functionality * add new codec for membership changes * write tests for it * fix up the tests a bit' * add more tests and group streaming * get the new codec working as expected * add pagination to messages * fix up the library linting issues * fix up flaky test * fix up min sdk version issue again * update the example app * remove the saved wallet stuff from the demo * get groups working again with signer improvements and membership changes * fix linter * remove syncs so the client will need to manage * add pagination to group listing * dont return self for peers and add erroring to new group creation * update the syncing in the tests * remove all the streams work and move to another PR * Revert "remove all the streams work and move to another PR" This reverts commit 76bfcc7. * update test * undo all the bad merge items * attempt at a few different stream techiniques * get message streaming working and start on group streaming * remove unneeded class * get group streaming working * fix streaming for both * fix the linter * throw error if client doesnt support groups --------- Co-authored-by: Nicholas Molnar <65710+neekolas@users.noreply.github.com>
1 parent c6fe0ea commit e041fa8

File tree

5 files changed

+105
-5
lines changed

5 files changed

+105
-5
lines changed

example/src/main/java/org/xmtp/android/example/MainViewModel.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class MainViewModel : ViewModel() {
7777
val stream: StateFlow<MainListItem?> =
7878
stateFlow(viewModelScope, null) { subscriptionCount ->
7979
if (ClientManager.clientState.value is ClientManager.ClientState.Ready) {
80-
ClientManager.client.conversations.stream()
80+
ClientManager.client.conversations.streamAll()
8181
.flowWhileShared(
8282
subscriptionCount,
8383
SharingStarted.WhileSubscribed(1000L)

library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt

+54
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package org.xmtp.android.library
22

33
import androidx.test.ext.junit.runners.AndroidJUnit4
44
import androidx.test.platform.app.InstrumentationRegistry
5+
import app.cash.turbine.test
6+
import kotlinx.coroutines.ExperimentalCoroutinesApi
57
import kotlinx.coroutines.runBlocking
68
import org.junit.Assert.assertEquals
79
import org.junit.Assert.assertThrows
@@ -197,4 +199,56 @@ class GroupTest {
197199
assertEquals(ReactionAction.Added, content?.action)
198200
assertEquals(ReactionSchema.Unicode, content?.schema)
199201
}
202+
203+
@OptIn(ExperimentalCoroutinesApi::class)
204+
@Test
205+
fun testCanStreamGroupMessages() = kotlinx.coroutines.test.runTest {
206+
val group = boClient.conversations.newGroup(listOf(alix.walletAddress.lowercase()))
207+
208+
group.streamMessages().test {
209+
group.send("hi")
210+
assertEquals("hi", awaitItem().body)
211+
group.send("hi again")
212+
assertEquals("hi again", awaitItem().body)
213+
}
214+
}
215+
216+
@OptIn(ExperimentalCoroutinesApi::class)
217+
@Test
218+
fun testCanStreamDecryptedGroupMessages() = kotlinx.coroutines.test.runTest {
219+
val group = boClient.conversations.newGroup(listOf(alix.walletAddress))
220+
221+
group.streamDecryptedMessages().test {
222+
group.send("hi")
223+
assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8())
224+
group.send("hi again")
225+
assertEquals("hi again", awaitItem().encodedContent.content.toStringUtf8())
226+
}
227+
}
228+
229+
@OptIn(ExperimentalCoroutinesApi::class)
230+
@Test
231+
fun testCanStreamGroups() = kotlinx.coroutines.test.runTest {
232+
boClient.conversations.streamGroups().test {
233+
val group =
234+
alixClient.conversations.newGroup(listOf(bo.walletAddress))
235+
assertEquals(group.id.toHex(), awaitItem().topic)
236+
val group2 =
237+
caroClient.conversations.newGroup(listOf(bo.walletAddress))
238+
assertEquals(group2.id.toHex(), awaitItem().topic)
239+
}
240+
}
241+
242+
@OptIn(ExperimentalCoroutinesApi::class)
243+
@Test
244+
fun testCanStreamGroupsAndConversations() = kotlinx.coroutines.test.runTest {
245+
boClient.conversations.streamAll().test {
246+
val group =
247+
caroClient.conversations.newGroup(listOf(bo.walletAddress))
248+
assertEquals(group.id.toHex(), awaitItem().topic)
249+
val conversation =
250+
boClient.conversations.newConversation(alix.walletAddress)
251+
assertEquals(conversation.topic, awaitItem().topic)
252+
}
253+
}
200254
}

library/src/main/java/org/xmtp/android/library/Conversation.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -318,15 +318,15 @@ sealed class Conversation {
318318
return when (this) {
319319
is V1 -> conversationV1.streamMessages()
320320
is V2 -> conversationV2.streamMessages()
321-
is Group -> throw XMTPException("Coming follow up PR")
321+
is Group -> group.streamMessages()
322322
}
323323
}
324324

325325
fun streamDecryptedMessages(): Flow<DecryptedMessage> {
326326
return when (this) {
327327
is V1 -> conversationV1.streamDecryptedMessages()
328328
is V2 -> conversationV2.streamDecryptedMessages()
329-
is Group -> throw XMTPException("Coming follow up PR")
329+
is Group -> group.streamDecryptedMessages()
330330
}
331331
}
332332

library/src/main/java/org/xmtp/android/library/Conversations.kt

+20-1
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@ package org.xmtp.android.library
33
import android.util.Log
44
import io.grpc.StatusException
55
import kotlinx.coroutines.CancellationException
6+
import kotlinx.coroutines.channels.awaitClose
67
import kotlinx.coroutines.flow.Flow
78
import kotlinx.coroutines.flow.MutableStateFlow
9+
import kotlinx.coroutines.flow.callbackFlow
810
import kotlinx.coroutines.flow.flow
11+
import kotlinx.coroutines.flow.merge
912
import kotlinx.coroutines.runBlocking
1013
import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest
1114
import org.xmtp.android.library.GRPCApiClient.Companion.makeSubscribeRequest
@@ -32,7 +35,9 @@ import org.xmtp.android.library.messages.walletAddress
3235
import org.xmtp.proto.keystore.api.v1.Keystore.TopicMap.TopicData
3336
import org.xmtp.proto.message.contents.Contact
3437
import org.xmtp.proto.message.contents.Invitation
38+
import uniffi.xmtpv3.FfiConversationCallback
3539
import uniffi.xmtpv3.FfiConversations
40+
import uniffi.xmtpv3.FfiGroup
3641
import uniffi.xmtpv3.FfiListConversationsOptions
3742
import java.util.Date
3843
import kotlin.time.Duration.Companion.nanoseconds
@@ -477,7 +482,6 @@ data class Conversations(
477482
client.subscribeTopic(
478483
listOf(Topic.userIntro(client.address), Topic.userInvite(client.address)),
479484
).collect { envelope ->
480-
481485
if (envelope.contentTopic == Topic.userIntro(client.address).description) {
482486
val conversationV1 = fromIntro(envelope = envelope)
483487
if (!streamedConversationTopics.contains(conversationV1.topic)) {
@@ -496,6 +500,21 @@ data class Conversations(
496500
}
497501
}
498502

503+
fun streamAll(): Flow<Conversation> {
504+
return merge(streamGroups(), stream())
505+
}
506+
507+
fun streamGroups(): Flow<Conversation> = callbackFlow {
508+
val groupCallback = object : FfiConversationCallback {
509+
override fun onConversation(conversation: FfiGroup) {
510+
trySend(Conversation.Group(Group(client, conversation)))
511+
}
512+
}
513+
val stream = libXMTPConversations?.stream(groupCallback)
514+
?: throw XMTPException("Client does not support Groups")
515+
awaitClose { stream.end() }
516+
}
517+
499518
/**
500519
* Get the stream of all messages of the current [Client]
501520
* @return Flow object of [DecodedMessage] that represents all the messages of the

library/src/main/java/org/xmtp/android/library/Group.kt

+28-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package org.xmtp.android.library
22

3+
import kotlinx.coroutines.channels.awaitClose
4+
import kotlinx.coroutines.flow.Flow
5+
import kotlinx.coroutines.flow.callbackFlow
36
import kotlinx.coroutines.runBlocking
47
import org.xmtp.android.library.codecs.ContentCodec
58
import org.xmtp.android.library.codecs.EncodedContent
@@ -10,6 +13,8 @@ import org.xmtp.android.library.messages.PagingInfoSortDirection
1013
import org.xmtp.proto.message.api.v1.MessageApiOuterClass
1114
import uniffi.xmtpv3.FfiGroup
1215
import uniffi.xmtpv3.FfiListMessagesOptions
16+
import uniffi.xmtpv3.FfiMessage
17+
import uniffi.xmtpv3.FfiMessageCallback
1318
import java.util.Date
1419
import kotlin.time.Duration.Companion.nanoseconds
1520
import kotlin.time.DurationUnit
@@ -34,7 +39,7 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
3439
runBlocking {
3540
libXMTPGroup.send(contentBytes = encodedContent.toByteArray())
3641
}
37-
return id.toString()
42+
return id.toHex()
3843
}
3944

4045
fun <T> prepareMessage(content: T, options: SendOptions?): EncodedContent {
@@ -136,4 +141,26 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
136141
libXMTPGroup.listMembers().map { it.accountAddress }
137142
}
138143
}
144+
145+
fun streamMessages(): Flow<DecodedMessage> = callbackFlow {
146+
val messageCallback = object : FfiMessageCallback {
147+
override fun onMessage(message: FfiMessage) {
148+
trySend(Message(client, message).decode())
149+
}
150+
}
151+
152+
val stream = libXMTPGroup.stream(messageCallback)
153+
awaitClose { stream.end() }
154+
}
155+
156+
fun streamDecryptedMessages(): Flow<DecryptedMessage> = callbackFlow {
157+
val messageCallback = object : FfiMessageCallback {
158+
override fun onMessage(message: FfiMessage) {
159+
trySend(decrypt(Message(client, message)))
160+
}
161+
}
162+
163+
val stream = libXMTPGroup.stream(messageCallback)
164+
awaitClose { stream.end() }
165+
}
139166
}

0 commit comments

Comments
 (0)