Skip to content

Commit 9f81f85

Browse files
authored
Stream All Group Messages (#186)
* add dates to example and a test around removing members * add stream all message functions * fix lint issue * adds tests for it * get all the tests working
1 parent 5a3f9d0 commit 9f81f85

File tree

5 files changed

+107
-19
lines changed

5 files changed

+107
-19
lines changed

library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt

+50
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,31 @@ class GroupTest {
320320
}
321321
}
322322

323+
@Test
324+
fun testCanStreamAllGroupMessages() = kotlinx.coroutines.test.runTest {
325+
val group = caroClient.conversations.newGroup(listOf(alix.walletAddress))
326+
alixClient.conversations.syncGroups()
327+
alixClient.conversations.streamAllGroupMessages().test {
328+
group.send("hi")
329+
assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8())
330+
group.send("hi again")
331+
assertEquals("hi again", awaitItem().encodedContent.content.toStringUtf8())
332+
}
333+
}
334+
335+
@Test
336+
fun testCanStreamAllMessages() = kotlinx.coroutines.test.runTest {
337+
val group = caroClient.conversations.newGroup(listOf(alix.walletAddress))
338+
val conversation = boClient.conversations.newConversation(alix.walletAddress)
339+
alixClient.conversations.syncGroups()
340+
alixClient.conversations.streamAllMessages(includeGroups = true).test {
341+
group.send("hi")
342+
assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8())
343+
conversation.send("hi again")
344+
assertEquals("hi again", awaitItem().encodedContent.content.toStringUtf8())
345+
}
346+
}
347+
323348
@Test
324349
fun testCanStreamDecryptedGroupMessages() = kotlinx.coroutines.test.runTest {
325350
val group = boClient.conversations.newGroup(listOf(alix.walletAddress))
@@ -333,6 +358,31 @@ class GroupTest {
333358
}
334359
}
335360

361+
@Test
362+
fun testCanStreamAllDecryptedGroupMessages() = kotlinx.coroutines.test.runTest {
363+
val group = caroClient.conversations.newGroup(listOf(alix.walletAddress))
364+
alixClient.conversations.syncGroups()
365+
alixClient.conversations.streamAllGroupDecryptedMessages().test {
366+
group.send("hi")
367+
assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8())
368+
group.send("hi again")
369+
assertEquals("hi again", awaitItem().encodedContent.content.toStringUtf8())
370+
}
371+
}
372+
373+
@Test
374+
fun testCanStreamAllDecryptedMessages() = kotlinx.coroutines.test.runTest {
375+
val group = caroClient.conversations.newGroup(listOf(alix.walletAddress))
376+
val conversation = boClient.conversations.newConversation(alix.walletAddress)
377+
alixClient.conversations.syncGroups()
378+
alixClient.conversations.streamAllDecryptedMessages(includeGroups = true).test {
379+
group.send("hi")
380+
assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8())
381+
conversation.send("hi again")
382+
assertEquals("hi again", awaitItem().encodedContent.content.toStringUtf8())
383+
}
384+
}
385+
336386
@Test
337387
fun testCanStreamGroups() = kotlinx.coroutines.test.runTest {
338388
boClient.conversations.streamGroups().test {

library/src/main/java/org/xmtp/android/library/Conversation.kt

+1-5
Original file line numberDiff line numberDiff line change
@@ -291,11 +291,7 @@ sealed class Conversation {
291291
is V1 -> conversationV1.decrypt(envelope)
292292
is V2 -> conversationV2.decrypt(envelope)
293293
is Group -> {
294-
if (message == null) {
295-
throw XMTPException("Groups require message be passed")
296-
} else {
297-
group.decrypt(message)
298-
}
294+
message?.decrypt() ?: throw XMTPException("Groups require message be passed")
299295
}
300296
}
301297
}

library/src/main/java/org/xmtp/android/library/Conversations.kt

+43-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import kotlinx.coroutines.flow.merge
1212
import kotlinx.coroutines.runBlocking
1313
import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest
1414
import org.xmtp.android.library.GRPCApiClient.Companion.makeSubscribeRequest
15+
import org.xmtp.android.library.libxmtp.Message
1516
import org.xmtp.android.library.messages.DecryptedMessage
1617
import org.xmtp.android.library.messages.Envelope
1718
import org.xmtp.android.library.messages.EnvelopeBuilder
@@ -39,6 +40,8 @@ import uniffi.xmtpv3.FfiConversationCallback
3940
import uniffi.xmtpv3.FfiConversations
4041
import uniffi.xmtpv3.FfiGroup
4142
import uniffi.xmtpv3.FfiListConversationsOptions
43+
import uniffi.xmtpv3.FfiMessage
44+
import uniffi.xmtpv3.FfiMessageCallback
4245
import uniffi.xmtpv3.GroupPermissions
4346
import java.util.Date
4447
import kotlin.time.Duration.Companion.nanoseconds
@@ -530,12 +533,34 @@ data class Conversations(
530533
awaitClose { stream.end() }
531534
}
532535

536+
fun streamAllGroupMessages(): Flow<DecodedMessage> = callbackFlow {
537+
val messageCallback = object : FfiMessageCallback {
538+
override fun onMessage(message: FfiMessage) {
539+
trySend(Message(client, message).decode())
540+
}
541+
}
542+
val stream = libXMTPConversations?.streamAllMessages(messageCallback)
543+
?: throw XMTPException("Client does not support Groups")
544+
awaitClose { stream.end() }
545+
}
546+
547+
fun streamAllGroupDecryptedMessages(): Flow<DecryptedMessage> = callbackFlow {
548+
val messageCallback = object : FfiMessageCallback {
549+
override fun onMessage(message: FfiMessage) {
550+
trySend(Message(client, message).decrypt())
551+
}
552+
}
553+
val stream = libXMTPConversations?.streamAllMessages(messageCallback)
554+
?: throw XMTPException("Client does not support Groups")
555+
awaitClose { stream.end() }
556+
}
557+
533558
/**
534559
* Get the stream of all messages of the current [Client]
535560
* @return Flow object of [DecodedMessage] that represents all the messages of the
536561
* current [Client] as userInvite and userIntro
537562
*/
538-
fun streamAllMessages(): Flow<DecodedMessage> = flow {
563+
private fun streamAllV2Messages(): Flow<DecodedMessage> = flow {
539564
val topics = mutableListOf(
540565
Topic.userInvite(client.address).description,
541566
Topic.userIntro(client.address).description,
@@ -590,7 +615,23 @@ data class Conversations(
590615
}
591616
}
592617

593-
fun streamAllDecryptedMessages(): Flow<DecryptedMessage> = flow {
618+
fun streamAllMessages(includeGroups: Boolean = false): Flow<DecodedMessage> {
619+
return if (includeGroups) {
620+
merge(streamAllV2Messages(), streamAllGroupMessages())
621+
} else {
622+
streamAllV2Messages()
623+
}
624+
}
625+
626+
fun streamAllDecryptedMessages(includeGroups: Boolean = false): Flow<DecryptedMessage> {
627+
return if (includeGroups) {
628+
merge(streamAllV2DecryptedMessages(), streamAllGroupDecryptedMessages())
629+
} else {
630+
streamAllV2DecryptedMessages()
631+
}
632+
}
633+
634+
private fun streamAllV2DecryptedMessages(): Flow<DecryptedMessage> = flow {
594635
val topics = mutableListOf(
595636
Topic.userInvite(client.address).description,
596637
Topic.userIntro(client.address).description,

library/src/main/java/org/xmtp/android/library/Group.kt

+2-12
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
115115
limit = limit?.toLong()
116116
)
117117
).map {
118-
decrypt(Message(client, it))
118+
Message(client, it).decrypt()
119119
}
120120
when (direction) {
121121
MessageApiOuterClass.SortDirection.SORT_DIRECTION_ASCENDING -> messages
@@ -124,16 +124,6 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
124124
}
125125
}
126126

127-
fun decrypt(message: Message): DecryptedMessage {
128-
return DecryptedMessage(
129-
id = message.id.toHex(),
130-
topic = message.convoId.toHex(),
131-
encodedContent = message.decode().encodedContent,
132-
senderAddress = message.senderAddress,
133-
sentAt = Date()
134-
)
135-
}
136-
137127
fun isActive(): Boolean {
138128
return libXMTPGroup.isActive()
139129
}
@@ -186,7 +176,7 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
186176
fun streamDecryptedMessages(): Flow<DecryptedMessage> = callbackFlow {
187177
val messageCallback = object : FfiMessageCallback {
188178
override fun onMessage(message: FfiMessage) {
189-
trySend(decrypt(Message(client, message)))
179+
trySend(Message(client, message).decrypt())
190180
}
191181
}
192182

library/src/main/java/org/xmtp/android/library/libxmtp/Message.kt

+11
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import org.xmtp.android.library.Client
44
import org.xmtp.android.library.DecodedMessage
55
import org.xmtp.android.library.XMTPException
66
import org.xmtp.android.library.codecs.EncodedContent
7+
import org.xmtp.android.library.messages.DecryptedMessage
78
import org.xmtp.android.library.toHex
89
import uniffi.xmtpv3.FfiMessage
910
import java.util.Date
@@ -35,4 +36,14 @@ data class Message(val client: Client, private val libXMTPMessage: FfiMessage) {
3536
throw XMTPException("Error decoding message", e)
3637
}
3738
}
39+
40+
fun decrypt(): DecryptedMessage {
41+
return DecryptedMessage(
42+
id = id.toHex(),
43+
topic = convoId.toHex(),
44+
encodedContent = decode().encodedContent,
45+
senderAddress = senderAddress,
46+
sentAt = Date()
47+
)
48+
}
3849
}

0 commit comments

Comments
 (0)