Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream All Group Messages #186

Merged
merged 9 commits into from
Feb 21, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,31 @@ class GroupTest {
}
}

@Test
fun testCanStreamAllGroupMessages() = kotlinx.coroutines.test.runTest {
val group = caroClient.conversations.newGroup(listOf(alix.walletAddress))
alixClient.conversations.syncGroups()
alixClient.conversations.streamAllGroupMessages().test {
group.send("hi")
assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8())
group.send("hi again")
assertEquals("hi again", awaitItem().encodedContent.content.toStringUtf8())
}
}

@Test
fun testCanStreamAllMessages() = kotlinx.coroutines.test.runTest {
val group = caroClient.conversations.newGroup(listOf(alix.walletAddress))
val conversation = boClient.conversations.newConversation(alix.walletAddress)
alixClient.conversations.syncGroups()
alixClient.conversations.streamAllMessages(includeGroups = true).test {
group.send("hi")
assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8())
conversation.send("hi again")
assertEquals("hi again", awaitItem().encodedContent.content.toStringUtf8())
}
}

@Test
fun testCanStreamDecryptedGroupMessages() = kotlinx.coroutines.test.runTest {
val group = boClient.conversations.newGroup(listOf(alix.walletAddress))
Expand All @@ -333,6 +358,31 @@ class GroupTest {
}
}

@Test
fun testCanStreamAllDecryptedGroupMessages() = kotlinx.coroutines.test.runTest {
val group = caroClient.conversations.newGroup(listOf(alix.walletAddress))
alixClient.conversations.syncGroups()
alixClient.conversations.streamAllGroupDecryptedMessages().test {
group.send("hi")
assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8())
group.send("hi again")
assertEquals("hi again", awaitItem().encodedContent.content.toStringUtf8())
}
}

@Test
fun testCanStreamAllDecryptedMessages() = kotlinx.coroutines.test.runTest {
val group = caroClient.conversations.newGroup(listOf(alix.walletAddress))
val conversation = boClient.conversations.newConversation(alix.walletAddress)
alixClient.conversations.syncGroups()
alixClient.conversations.streamAllDecryptedMessages(includeGroups = true).test {
group.send("hi")
assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8())
conversation.send("hi again")
assertEquals("hi again", awaitItem().encodedContent.content.toStringUtf8())
}
}

@Test
fun testCanStreamGroups() = kotlinx.coroutines.test.runTest {
boClient.conversations.streamGroups().test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,7 @@ sealed class Conversation {
is V1 -> conversationV1.decrypt(envelope)
is V2 -> conversationV2.decrypt(envelope)
is Group -> {
if (message == null) {
throw XMTPException("Groups require message be passed")
} else {
group.decrypt(message)
}
message?.decrypt() ?: throw XMTPException("Groups require message be passed")
}
}
}
Expand Down
45 changes: 43 additions & 2 deletions library/src/main/java/org/xmtp/android/library/Conversations.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.runBlocking
import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest
import org.xmtp.android.library.GRPCApiClient.Companion.makeSubscribeRequest
import org.xmtp.android.library.libxmtp.Message
import org.xmtp.android.library.messages.DecryptedMessage
import org.xmtp.android.library.messages.Envelope
import org.xmtp.android.library.messages.EnvelopeBuilder
Expand Down Expand Up @@ -39,6 +40,8 @@ import uniffi.xmtpv3.FfiConversationCallback
import uniffi.xmtpv3.FfiConversations
import uniffi.xmtpv3.FfiGroup
import uniffi.xmtpv3.FfiListConversationsOptions
import uniffi.xmtpv3.FfiMessage
import uniffi.xmtpv3.FfiMessageCallback
import uniffi.xmtpv3.GroupPermissions
import java.util.Date
import kotlin.time.Duration.Companion.nanoseconds
Expand Down Expand Up @@ -530,12 +533,34 @@ data class Conversations(
awaitClose { stream.end() }
}

fun streamAllGroupMessages(): Flow<DecodedMessage> = callbackFlow {
val messageCallback = object : FfiMessageCallback {
override fun onMessage(message: FfiMessage) {
trySend(Message(client, message).decode())
}
}
val stream = libXMTPConversations?.streamAllMessages(messageCallback)
?: throw XMTPException("Client does not support Groups")
awaitClose { stream.end() }
}

fun streamAllGroupDecryptedMessages(): Flow<DecryptedMessage> = callbackFlow {
val messageCallback = object : FfiMessageCallback {
override fun onMessage(message: FfiMessage) {
trySend(Message(client, message).decrypt())
}
}
val stream = libXMTPConversations?.streamAllMessages(messageCallback)
?: throw XMTPException("Client does not support Groups")
awaitClose { stream.end() }
}

/**
* Get the stream of all messages of the current [Client]
* @return Flow object of [DecodedMessage] that represents all the messages of the
* current [Client] as userInvite and userIntro
*/
fun streamAllMessages(): Flow<DecodedMessage> = flow {
private fun streamAllV2Messages(): Flow<DecodedMessage> = flow {
val topics = mutableListOf(
Topic.userInvite(client.address).description,
Topic.userIntro(client.address).description,
Expand Down Expand Up @@ -590,7 +615,23 @@ data class Conversations(
}
}

fun streamAllDecryptedMessages(): Flow<DecryptedMessage> = flow {
fun streamAllMessages(includeGroups: Boolean = false): Flow<DecodedMessage> {
return if (includeGroups) {
merge(streamAllV2Messages(), streamAllGroupMessages())
} else {
streamAllV2Messages()
}
}

fun streamAllDecryptedMessages(includeGroups: Boolean = false): Flow<DecryptedMessage> {
return if (includeGroups) {
merge(streamAllV2DecryptedMessages(), streamAllGroupDecryptedMessages())
} else {
streamAllV2DecryptedMessages()
}
}

private fun streamAllV2DecryptedMessages(): Flow<DecryptedMessage> = flow {
val topics = mutableListOf(
Topic.userInvite(client.address).description,
Topic.userIntro(client.address).description,
Expand Down
14 changes: 2 additions & 12 deletions library/src/main/java/org/xmtp/android/library/Group.kt
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
limit = limit?.toLong()
)
).map {
decrypt(Message(client, it))
Message(client, it).decrypt()
}
when (direction) {
MessageApiOuterClass.SortDirection.SORT_DIRECTION_ASCENDING -> messages
Expand All @@ -124,16 +124,6 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
}
}

fun decrypt(message: Message): DecryptedMessage {
return DecryptedMessage(
id = message.id.toHex(),
topic = message.convoId.toHex(),
encodedContent = message.decode().encodedContent,
senderAddress = message.senderAddress,
sentAt = Date()
)
}

fun isActive(): Boolean {
return libXMTPGroup.isActive()
}
Expand Down Expand Up @@ -186,7 +176,7 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
fun streamDecryptedMessages(): Flow<DecryptedMessage> = callbackFlow {
val messageCallback = object : FfiMessageCallback {
override fun onMessage(message: FfiMessage) {
trySend(decrypt(Message(client, message)))
trySend(Message(client, message).decrypt())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import org.xmtp.android.library.Client
import org.xmtp.android.library.DecodedMessage
import org.xmtp.android.library.XMTPException
import org.xmtp.android.library.codecs.EncodedContent
import org.xmtp.android.library.messages.DecryptedMessage
import org.xmtp.android.library.toHex
import uniffi.xmtpv3.FfiMessage
import java.util.Date
Expand Down Expand Up @@ -35,4 +36,14 @@ data class Message(val client: Client, private val libXMTPMessage: FfiMessage) {
throw XMTPException("Error decoding message", e)
}
}

fun decrypt(): DecryptedMessage {
return DecryptedMessage(
id = id.toHex(),
topic = convoId.toHex(),
encodedContent = decode().encodedContent,
senderAddress = senderAddress,
sentAt = Date()
)
}
}
Loading