diff --git a/library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt b/library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt index da11e676..dc4e3141 100644 --- a/library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt +++ b/library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt @@ -320,6 +320,31 @@ class GroupTest { } } + @Test + fun testCanStreamAllGroupMessages() = kotlinx.coroutines.test.runTest { + val group = caroClient.conversations.newGroup(listOf(alix.walletAddress)) + alixClient.conversations.syncGroups() + alixClient.conversations.streamAllGroupMessages().test { + group.send("hi") + assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8()) + group.send("hi again") + assertEquals("hi again", awaitItem().encodedContent.content.toStringUtf8()) + } + } + + @Test + fun testCanStreamAllMessages() = kotlinx.coroutines.test.runTest { + val group = caroClient.conversations.newGroup(listOf(alix.walletAddress)) + val conversation = boClient.conversations.newConversation(alix.walletAddress) + alixClient.conversations.syncGroups() + alixClient.conversations.streamAllMessages(includeGroups = true).test { + group.send("hi") + assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8()) + conversation.send("hi again") + assertEquals("hi again", awaitItem().encodedContent.content.toStringUtf8()) + } + } + @Test fun testCanStreamDecryptedGroupMessages() = kotlinx.coroutines.test.runTest { val group = boClient.conversations.newGroup(listOf(alix.walletAddress)) @@ -333,6 +358,31 @@ class GroupTest { } } + @Test + fun testCanStreamAllDecryptedGroupMessages() = kotlinx.coroutines.test.runTest { + val group = caroClient.conversations.newGroup(listOf(alix.walletAddress)) + alixClient.conversations.syncGroups() + alixClient.conversations.streamAllGroupDecryptedMessages().test { + group.send("hi") + assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8()) + group.send("hi again") + assertEquals("hi again", awaitItem().encodedContent.content.toStringUtf8()) + } + } + + @Test + fun testCanStreamAllDecryptedMessages() = kotlinx.coroutines.test.runTest { + val group = caroClient.conversations.newGroup(listOf(alix.walletAddress)) + val conversation = boClient.conversations.newConversation(alix.walletAddress) + alixClient.conversations.syncGroups() + alixClient.conversations.streamAllDecryptedMessages(includeGroups = true).test { + group.send("hi") + assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8()) + conversation.send("hi again") + assertEquals("hi again", awaitItem().encodedContent.content.toStringUtf8()) + } + } + @Test fun testCanStreamGroups() = kotlinx.coroutines.test.runTest { boClient.conversations.streamGroups().test { diff --git a/library/src/main/java/org/xmtp/android/library/Conversation.kt b/library/src/main/java/org/xmtp/android/library/Conversation.kt index 534e42a0..bacf2506 100644 --- a/library/src/main/java/org/xmtp/android/library/Conversation.kt +++ b/library/src/main/java/org/xmtp/android/library/Conversation.kt @@ -291,11 +291,7 @@ sealed class Conversation { is V1 -> conversationV1.decrypt(envelope) is V2 -> conversationV2.decrypt(envelope) is Group -> { - if (message == null) { - throw XMTPException("Groups require message be passed") - } else { - group.decrypt(message) - } + message?.decrypt() ?: throw XMTPException("Groups require message be passed") } } } diff --git a/library/src/main/java/org/xmtp/android/library/Conversations.kt b/library/src/main/java/org/xmtp/android/library/Conversations.kt index 42881591..dca86282 100644 --- a/library/src/main/java/org/xmtp/android/library/Conversations.kt +++ b/library/src/main/java/org/xmtp/android/library/Conversations.kt @@ -12,6 +12,7 @@ import kotlinx.coroutines.flow.merge import kotlinx.coroutines.runBlocking import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest import org.xmtp.android.library.GRPCApiClient.Companion.makeSubscribeRequest +import org.xmtp.android.library.libxmtp.Message import org.xmtp.android.library.messages.DecryptedMessage import org.xmtp.android.library.messages.Envelope import org.xmtp.android.library.messages.EnvelopeBuilder @@ -39,6 +40,8 @@ import uniffi.xmtpv3.FfiConversationCallback import uniffi.xmtpv3.FfiConversations import uniffi.xmtpv3.FfiGroup import uniffi.xmtpv3.FfiListConversationsOptions +import uniffi.xmtpv3.FfiMessage +import uniffi.xmtpv3.FfiMessageCallback import uniffi.xmtpv3.GroupPermissions import java.util.Date import kotlin.time.Duration.Companion.nanoseconds @@ -530,12 +533,34 @@ data class Conversations( awaitClose { stream.end() } } + fun streamAllGroupMessages(): Flow = callbackFlow { + val messageCallback = object : FfiMessageCallback { + override fun onMessage(message: FfiMessage) { + trySend(Message(client, message).decode()) + } + } + val stream = libXMTPConversations?.streamAllMessages(messageCallback) + ?: throw XMTPException("Client does not support Groups") + awaitClose { stream.end() } + } + + fun streamAllGroupDecryptedMessages(): Flow = callbackFlow { + val messageCallback = object : FfiMessageCallback { + override fun onMessage(message: FfiMessage) { + trySend(Message(client, message).decrypt()) + } + } + val stream = libXMTPConversations?.streamAllMessages(messageCallback) + ?: throw XMTPException("Client does not support Groups") + awaitClose { stream.end() } + } + /** * Get the stream of all messages of the current [Client] * @return Flow object of [DecodedMessage] that represents all the messages of the * current [Client] as userInvite and userIntro */ - fun streamAllMessages(): Flow = flow { + private fun streamAllV2Messages(): Flow = flow { val topics = mutableListOf( Topic.userInvite(client.address).description, Topic.userIntro(client.address).description, @@ -590,7 +615,23 @@ data class Conversations( } } - fun streamAllDecryptedMessages(): Flow = flow { + fun streamAllMessages(includeGroups: Boolean = false): Flow { + return if (includeGroups) { + merge(streamAllV2Messages(), streamAllGroupMessages()) + } else { + streamAllV2Messages() + } + } + + fun streamAllDecryptedMessages(includeGroups: Boolean = false): Flow { + return if (includeGroups) { + merge(streamAllV2DecryptedMessages(), streamAllGroupDecryptedMessages()) + } else { + streamAllV2DecryptedMessages() + } + } + + private fun streamAllV2DecryptedMessages(): Flow = flow { val topics = mutableListOf( Topic.userInvite(client.address).description, Topic.userIntro(client.address).description, diff --git a/library/src/main/java/org/xmtp/android/library/Group.kt b/library/src/main/java/org/xmtp/android/library/Group.kt index 695e5fc7..60ec6ec5 100644 --- a/library/src/main/java/org/xmtp/android/library/Group.kt +++ b/library/src/main/java/org/xmtp/android/library/Group.kt @@ -115,7 +115,7 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) { limit = limit?.toLong() ) ).map { - decrypt(Message(client, it)) + Message(client, it).decrypt() } when (direction) { MessageApiOuterClass.SortDirection.SORT_DIRECTION_ASCENDING -> messages @@ -124,16 +124,6 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) { } } - fun decrypt(message: Message): DecryptedMessage { - return DecryptedMessage( - id = message.id.toHex(), - topic = message.convoId.toHex(), - encodedContent = message.decode().encodedContent, - senderAddress = message.senderAddress, - sentAt = Date() - ) - } - fun isActive(): Boolean { return libXMTPGroup.isActive() } @@ -186,7 +176,7 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) { fun streamDecryptedMessages(): Flow = callbackFlow { val messageCallback = object : FfiMessageCallback { override fun onMessage(message: FfiMessage) { - trySend(decrypt(Message(client, message))) + trySend(Message(client, message).decrypt()) } } diff --git a/library/src/main/java/org/xmtp/android/library/libxmtp/Message.kt b/library/src/main/java/org/xmtp/android/library/libxmtp/Message.kt index 6b072cc6..4c828a1e 100644 --- a/library/src/main/java/org/xmtp/android/library/libxmtp/Message.kt +++ b/library/src/main/java/org/xmtp/android/library/libxmtp/Message.kt @@ -4,6 +4,7 @@ import org.xmtp.android.library.Client import org.xmtp.android.library.DecodedMessage import org.xmtp.android.library.XMTPException import org.xmtp.android.library.codecs.EncodedContent +import org.xmtp.android.library.messages.DecryptedMessage import org.xmtp.android.library.toHex import uniffi.xmtpv3.FfiMessage import java.util.Date @@ -35,4 +36,14 @@ data class Message(val client: Client, private val libXMTPMessage: FfiMessage) { throw XMTPException("Error decoding message", e) } } + + fun decrypt(): DecryptedMessage { + return DecryptedMessage( + id = id.toHex(), + topic = convoId.toHex(), + encodedContent = decode().encodedContent, + senderAddress = senderAddress, + sentAt = Date() + ) + } }