Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group Message Kind #217

Merged
merged 15 commits into from
Apr 9, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.junit.runner.RunWith
import org.xmtp.android.library.messages.PrivateKey
import org.xmtp.android.library.messages.PrivateKeyBuilder
import org.xmtp.android.library.messages.walletAddress
import uniffi.xmtpv3.org.xmtp.android.library.codecs.ContentTypeGroupMembershipChange
import uniffi.xmtpv3.org.xmtp.android.library.codecs.GroupMembershipChangeCodec
import uniffi.xmtpv3.org.xmtp.android.library.codecs.GroupMembershipChanges

Expand Down Expand Up @@ -99,6 +100,34 @@ class GroupMembershipChangeTest {
assert(content?.membersAddedList.isNullOrEmpty())
}

@Test
fun testRemovesInvalidMessageKind() {
Client.register(codec = GroupMembershipChangeCodec())

val membershipChange = GroupMembershipChanges.newBuilder().build()

val group = runBlocking {
alixClient.conversations.newGroup(
listOf(
bo.walletAddress,
caro.walletAddress
)
)
}
val messages = group.messages()
assertEquals(messages.size, 1)
assertEquals(group.memberAddresses().size, 3)
runBlocking {
group.send(
content = membershipChange,
options = SendOptions(contentType = ContentTypeGroupMembershipChange),
)
group.sync()
}
val updatedMessages = group.messages()
assertEquals(updatedMessages.size, 1)
}

@Test
fun testIfNotRegisteredReturnsFallback() {
val group = runBlocking {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import org.xmtp.android.library.messages.PrivateKey
import org.xmtp.android.library.messages.PrivateKeyBuilder
import org.xmtp.android.library.messages.walletAddress
import uniffi.xmtpv3.GroupPermissions
import uniffi.xmtpv3.org.xmtp.android.library.codecs.ContentTypeGroupMembershipChange
import uniffi.xmtpv3.org.xmtp.android.library.codecs.GroupMembershipChangeCodec
import uniffi.xmtpv3.org.xmtp.android.library.codecs.GroupMembershipChanges

@RunWith(AndroidJUnit4::class)
class GroupTest {
Expand Down Expand Up @@ -360,12 +363,19 @@ class GroupTest {

@Test
fun testCanStreamGroupMessages() = kotlinx.coroutines.test.runTest {
Client.register(codec = GroupMembershipChangeCodec())
val membershipChange = GroupMembershipChanges.newBuilder().build()

val group = boClient.conversations.newGroup(listOf(alix.walletAddress.lowercase()))
alixClient.conversations.syncGroups()
val alixGroup = alixClient.conversations.listGroups().first()
group.streamMessages().test {
alixGroup.send("hi")
assertEquals("hi", awaitItem().body)
alixGroup.send(
content = membershipChange,
options = SendOptions(contentType = ContentTypeGroupMembershipChange),
)
alixGroup.send("hi again")
assertEquals("hi again", awaitItem().body)
}
Expand Down Expand Up @@ -432,6 +442,8 @@ class GroupTest {

@Test
fun testCanStreamAllDecryptedGroupMessages() = kotlinx.coroutines.test.runTest {
Client.register(codec = GroupMembershipChangeCodec())
val membershipChange = GroupMembershipChanges.newBuilder().build()
val group = caroClient.conversations.newGroup(listOf(alix.walletAddress))
alixClient.conversations.syncGroups()

Expand All @@ -448,6 +460,10 @@ class GroupTest {
}

group.send("hi 1")
group.send(
content = membershipChange,
options = SendOptions(contentType = ContentTypeGroupMembershipChange),
)
group.send("hi 2")

job.join()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class FakeApiClient : ApiClient {
MessageApiOuterClass.SortDirection.SORT_DIRECTION_ASCENDING -> {
result = result.reversed().toMutableList()
}

else -> Unit
}
}
Expand All @@ -191,6 +192,7 @@ class FakeApiClient : ApiClient {
published.addAll(envelopes)
return PublishResponse.newBuilder().build()
}

override suspend fun subscribe(request: Flow<MessageApiOuterClass.SubscribeRequest>): Flow<MessageApiOuterClass.Envelope> {
val env = stream.counts().first()

Expand All @@ -205,7 +207,9 @@ data class Fixtures(
val aliceAccount: PrivateKeyBuilder,
val bobAccount: PrivateKeyBuilder,
val caroAccount: PrivateKeyBuilder,
val clientOptions: ClientOptions? = null
val clientOptions: ClientOptions? = ClientOptions(
ClientOptions.Api(XMTPEnvironment.LOCAL, isSecure = false)
),
) {
var fakeApiClient: FakeApiClient = FakeApiClient()
var alice: PrivateKey = aliceAccount.getPrivateKey()
Expand All @@ -214,6 +218,7 @@ data class Fixtures(
var bobClient: Client = Client().create(account = bobAccount, options = clientOptions)
var caro: PrivateKey = caroAccount.getPrivateKey()
var caroClient: Client = Client().create(account = caroAccount, options = clientOptions)

constructor(clientOptions: ClientOptions?) : this(
aliceAccount = PrivateKeyBuilder(),
bobAccount = PrivateKeyBuilder(),
Expand Down
3 changes: 3 additions & 0 deletions library/src/main/java/libxmtp-version.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Version: 4068715
Branch: main
Date: 2024-04-06 04:27:39 +0000
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import android.util.Log
import com.google.protobuf.kotlin.toByteString
import kotlinx.coroutines.flow.Flow
import org.xmtp.android.library.codecs.EncodedContent
import org.xmtp.android.library.libxmtp.Message
import org.xmtp.android.library.libxmtp.MessageV3
import org.xmtp.android.library.messages.DecryptedMessage
import org.xmtp.android.library.messages.Envelope
import org.xmtp.android.library.messages.PagingInfoSortDirection
Expand Down Expand Up @@ -121,7 +121,7 @@ sealed class Conversation {
}
}

fun decode(envelope: Envelope, message: Message? = null): DecodedMessage {
fun decode(envelope: Envelope, message: MessageV3? = null): DecodedMessage {
return when (this) {
is V1 -> conversationV1.decode(envelope)
is V2 -> conversationV2.decodeEnvelope(envelope)
Expand Down Expand Up @@ -277,7 +277,7 @@ sealed class Conversation {

fun decrypt(
envelope: Envelope,
message: Message? = null,
message: MessageV3? = null,
): DecryptedMessage {
return when (this) {
is V1 -> conversationV1.decrypt(envelope)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.merge
import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest
import org.xmtp.android.library.GRPCApiClient.Companion.makeSubscribeRequest
import org.xmtp.android.library.libxmtp.Message
import org.xmtp.android.library.libxmtp.MessageV3
import org.xmtp.android.library.messages.DecryptedMessage
import org.xmtp.android.library.messages.Envelope
import org.xmtp.android.library.messages.EnvelopeBuilder
Expand Down Expand Up @@ -576,7 +576,10 @@ data class Conversations(
fun streamAllGroupMessages(): Flow<DecodedMessage> = callbackFlow {
val messageCallback = object : FfiMessageCallback {
override fun onMessage(message: FfiMessage) {
trySend(Message(client, message).decode())
val decodedMessage = MessageV3(client, message).decodeOrNull()
decodedMessage?.let {
trySend(it)
}
}
}
val stream = libXMTPConversations?.streamAllMessages(messageCallback)
Expand All @@ -587,7 +590,10 @@ data class Conversations(
fun streamAllGroupDecryptedMessages(): Flow<DecryptedMessage> = callbackFlow {
val messageCallback = object : FfiMessageCallback {
override fun onMessage(message: FfiMessage) {
trySend(Message(client, message).decrypt())
val decryptedMessage = MessageV3(client, message).decryptOrNull()
decryptedMessage?.let {
trySend(it)
}
}
}
val stream = libXMTPConversations?.streamAllMessages(messageCallback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data class DecodedMessage(
var topic: String,
var encodedContent: Content.EncodedContent,
var senderAddress: String,
var sent: Date
var sent: Date,
) {
companion object {
fun preview(client: Client, topic: String, body: String, senderAddress: String, sent: Date): DecodedMessage {
Expand Down
26 changes: 17 additions & 9 deletions library/src/main/java/org/xmtp/android/library/Group.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import kotlinx.coroutines.flow.callbackFlow
import org.xmtp.android.library.codecs.ContentCodec
import org.xmtp.android.library.codecs.EncodedContent
import org.xmtp.android.library.codecs.compress
import org.xmtp.android.library.libxmtp.Message
import org.xmtp.android.library.libxmtp.MessageV3
import org.xmtp.android.library.messages.DecryptedMessage
import org.xmtp.android.library.messages.PagingInfoSortDirection
import org.xmtp.android.library.messages.Topic
Expand Down Expand Up @@ -93,9 +93,10 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
sentAfterNs = after?.time?.nanoseconds?.toLong(DurationUnit.NANOSECONDS),
limit = limit?.toLong()
)
).map {
Message(client, it).decode()
).mapNotNull {
MessageV3(client, it).decodeOrNull()
}

return when (direction) {
MessageApiOuterClass.SortDirection.SORT_DIRECTION_ASCENDING -> messages
else -> messages.reversed()
Expand All @@ -114,18 +115,19 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
sentAfterNs = after?.time?.nanoseconds?.toLong(DurationUnit.NANOSECONDS),
limit = limit?.toLong()
)
).map {
Message(client, it).decrypt()
).mapNotNull {
MessageV3(client, it).decryptOrNull()
}

return when (direction) {
MessageApiOuterClass.SortDirection.SORT_DIRECTION_ASCENDING -> messages
else -> messages.reversed()
}
}

suspend fun processMessage(envelopeBytes: ByteArray): Message {
suspend fun processMessage(envelopeBytes: ByteArray): MessageV3 {
val message = libXMTPGroup.processStreamedGroupMessage(envelopeBytes)
return Message(client, message)
return MessageV3(client, message)
}

fun isActive(): Boolean {
Expand Down Expand Up @@ -173,7 +175,10 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
fun streamMessages(): Flow<DecodedMessage> = callbackFlow {
val messageCallback = object : FfiMessageCallback {
override fun onMessage(message: FfiMessage) {
trySend(Message(client, message).decode())
val decodedMessage = MessageV3(client, message).decodeOrNull()
decodedMessage?.let {
trySend(it)
}
}
}

Expand All @@ -184,7 +189,10 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
fun streamDecryptedMessages(): Flow<DecryptedMessage> = callbackFlow {
val messageCallback = object : FfiMessageCallback {
override fun onMessage(message: FfiMessage) {
trySend(Message(client, message).decrypt())
val decryptedMessage = MessageV3(client, message).decryptOrNull()
decryptedMessage?.let {
trySend(it)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package org.xmtp.android.library.libxmtp

import android.util.Log
import org.xmtp.android.library.Client
import org.xmtp.android.library.DecodedMessage
import org.xmtp.android.library.XMTPException
import org.xmtp.android.library.codecs.EncodedContent
import org.xmtp.android.library.messages.DecryptedMessage
import org.xmtp.android.library.messages.Topic
import org.xmtp.android.library.toHex
import uniffi.xmtpv3.FfiGroupMessageKind
import uniffi.xmtpv3.FfiMessage
import uniffi.xmtpv3.org.xmtp.android.library.codecs.ContentTypeGroupMembershipChange
import java.util.Date

data class Message(val client: Client, private val libXMTPMessage: FfiMessage) {
data class MessageV3(val client: Client, private val libXMTPMessage: FfiMessage) {

val id: ByteArray
get() = libXMTPMessage.id

Expand All @@ -25,26 +29,48 @@ data class Message(val client: Client, private val libXMTPMessage: FfiMessage) {

fun decode(): DecodedMessage {
try {
return DecodedMessage(
val decodedMessage = DecodedMessage(
id = id.toHex(),
client = client,
topic = Topic.groupMessage(convoId.toHex()).description,
encodedContent = EncodedContent.parseFrom(libXMTPMessage.content),
senderAddress = senderAddress,
sent = sentAt
sent = sentAt,
)
if (decodedMessage.encodedContent.type == ContentTypeGroupMembershipChange && libXMTPMessage.kind != FfiGroupMessageKind.MEMBERSHIP_CHANGE) {
throw XMTPException("Error decoding group membership change")
}
return decodedMessage
} catch (e: Exception) {
throw XMTPException("Error decoding message", e)
}
}

fun decodeOrNull(): DecodedMessage? {
return try {
decode()
} catch (e: Exception) {
Log.d("MESSAGE_V3", "discarding message that failed to decode", e)
null
}
}

fun decryptOrNull(): DecryptedMessage? {
return try {
decrypt()
} catch (e: Exception) {
Log.d("MESSAGE_V3", "discarding message that failed to decrypt", e)
null
}
}

fun decrypt(): DecryptedMessage {
return DecryptedMessage(
id = id.toHex(),
topic = Topic.groupMessage(convoId.toHex()).description,
encodedContent = decode().encodedContent,
senderAddress = senderAddress,
sentAt = Date()
sentAt = Date(),
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ data class DecryptedMessage(
var encodedContent: EncodedContent,
var senderAddress: String,
var sentAt: Date,
var topic: String = ""
var topic: String = "",
)
Loading
Loading