Skip to content

Commit e2db1c8

Browse files
committed
add stream all message functions
1 parent 9471920 commit e2db1c8

File tree

3 files changed

+56
-14
lines changed

3 files changed

+56
-14
lines changed

library/src/main/java/org/xmtp/android/library/Conversations.kt

+43-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import kotlinx.coroutines.flow.merge
1212
import kotlinx.coroutines.runBlocking
1313
import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest
1414
import org.xmtp.android.library.GRPCApiClient.Companion.makeSubscribeRequest
15+
import org.xmtp.android.library.libxmtp.Message
1516
import org.xmtp.android.library.messages.DecryptedMessage
1617
import org.xmtp.android.library.messages.Envelope
1718
import org.xmtp.android.library.messages.EnvelopeBuilder
@@ -39,6 +40,8 @@ import uniffi.xmtpv3.FfiConversationCallback
3940
import uniffi.xmtpv3.FfiConversations
4041
import uniffi.xmtpv3.FfiGroup
4142
import uniffi.xmtpv3.FfiListConversationsOptions
43+
import uniffi.xmtpv3.FfiMessage
44+
import uniffi.xmtpv3.FfiMessageCallback
4245
import uniffi.xmtpv3.GroupPermissions
4346
import java.util.Date
4447
import kotlin.time.Duration.Companion.nanoseconds
@@ -530,12 +533,34 @@ data class Conversations(
530533
awaitClose { stream.end() }
531534
}
532535

536+
fun streamAllGroupMessages(): Flow<DecodedMessage> = callbackFlow {
537+
val messageCallback = object : FfiMessageCallback {
538+
override fun onMessage(message: FfiMessage) {
539+
trySend(Message(client, message).decode())
540+
}
541+
}
542+
val stream = libXMTPConversations?.streamAllMessages(messageCallback)
543+
?: throw XMTPException("Client does not support Groups")
544+
awaitClose { stream.end() }
545+
}
546+
547+
fun streamAllGroupDecryptedMessages(): Flow<DecryptedMessage> = callbackFlow {
548+
val messageCallback = object : FfiMessageCallback {
549+
override fun onMessage(message: FfiMessage) {
550+
trySend(Message(client, message).decrypt())
551+
}
552+
}
553+
val stream = libXMTPConversations?.streamAllMessages(messageCallback)
554+
?: throw XMTPException("Client does not support Groups")
555+
awaitClose { stream.end() }
556+
}
557+
533558
/**
534559
* Get the stream of all messages of the current [Client]
535560
* @return Flow object of [DecodedMessage] that represents all the messages of the
536561
* current [Client] as userInvite and userIntro
537562
*/
538-
fun streamAllMessages(): Flow<DecodedMessage> = flow {
563+
private fun streamAllV2Messages(): Flow<DecodedMessage> = flow {
539564
val topics = mutableListOf(
540565
Topic.userInvite(client.address).description,
541566
Topic.userIntro(client.address).description,
@@ -590,7 +615,23 @@ data class Conversations(
590615
}
591616
}
592617

593-
fun streamAllDecryptedMessages(): Flow<DecryptedMessage> = flow {
618+
fun streamAllMessages(includeGroups: Boolean = false): Flow<DecodedMessage> {
619+
return if (includeGroups) {
620+
merge(streamAllV2Messages(), streamAllGroupMessages())
621+
} else {
622+
streamAllV2Messages()
623+
}
624+
}
625+
626+
fun streamAllDecryptedMessages(includeGroups: Boolean = false): Flow<DecryptedMessage> {
627+
return if (includeGroups) {
628+
merge(streamAllV2DecryptedMessages(), streamAllGroupDecryptedMessages())
629+
} else {
630+
streamAllV2DecryptedMessages()
631+
}
632+
}
633+
634+
private fun streamAllV2DecryptedMessages(): Flow<DecryptedMessage> = flow {
594635
val topics = mutableListOf(
595636
Topic.userInvite(client.address).description,
596637
Topic.userIntro(client.address).description,

library/src/main/java/org/xmtp/android/library/Group.kt

+2-12
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
115115
limit = limit?.toLong()
116116
)
117117
).map {
118-
decrypt(Message(client, it))
118+
Message(client, it).decrypt()
119119
}
120120
when (direction) {
121121
MessageApiOuterClass.SortDirection.SORT_DIRECTION_ASCENDING -> messages
@@ -124,16 +124,6 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
124124
}
125125
}
126126

127-
fun decrypt(message: Message): DecryptedMessage {
128-
return DecryptedMessage(
129-
id = message.id.toHex(),
130-
topic = message.convoId.toHex(),
131-
encodedContent = message.decode().encodedContent,
132-
senderAddress = message.senderAddress,
133-
sentAt = Date()
134-
)
135-
}
136-
137127
fun isActive(): Boolean {
138128
return libXMTPGroup.isActive()
139129
}
@@ -186,7 +176,7 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
186176
fun streamDecryptedMessages(): Flow<DecryptedMessage> = callbackFlow {
187177
val messageCallback = object : FfiMessageCallback {
188178
override fun onMessage(message: FfiMessage) {
189-
trySend(decrypt(Message(client, message)))
179+
trySend(Message(client, message).decrypt())
190180
}
191181
}
192182

library/src/main/java/org/xmtp/android/library/libxmtp/Message.kt

+11
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import org.xmtp.android.library.Client
44
import org.xmtp.android.library.DecodedMessage
55
import org.xmtp.android.library.XMTPException
66
import org.xmtp.android.library.codecs.EncodedContent
7+
import org.xmtp.android.library.messages.DecryptedMessage
78
import org.xmtp.android.library.toHex
89
import uniffi.xmtpv3.FfiMessage
910
import java.util.Date
@@ -35,4 +36,14 @@ data class Message(val client: Client, private val libXMTPMessage: FfiMessage) {
3536
throw XMTPException("Error decoding message", e)
3637
}
3738
}
39+
40+
fun decrypt(): DecryptedMessage {
41+
return DecryptedMessage(
42+
id = id.toHex(),
43+
topic = convoId.toHex(),
44+
encodedContent = decode().encodedContent,
45+
senderAddress = senderAddress,
46+
sentAt = Date()
47+
)
48+
}
3849
}

0 commit comments

Comments
 (0)