Skip to content

Commit e5b8995

Browse files
authored
Decrypted message streaming (#139)
* add decrypt streaming functions * add the stream all decrypted message function as well * remove unused imports * remove the bad package name * fix up the linter error
1 parent 65a3ed1 commit e5b8995

File tree

8 files changed

+157
-13
lines changed

8 files changed

+157
-13
lines changed

library/src/androidTest/java/org/xmtp/android/library/ConversationTest.kt

+24
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,30 @@ class ConversationTest {
448448
assertTrue(isSteveOrBobConversation(messages[2].topic))
449449
}
450450

451+
@Test
452+
fun testListBatchDecryptedMessages() {
453+
val bobConversation = aliceClient.conversations.newConversation(bob.walletAddress)
454+
val steveConversation =
455+
aliceClient.conversations.newConversation(fixtures.steve.walletAddress)
456+
457+
bobConversation.send(text = "hey alice 1")
458+
bobConversation.send(text = "hey alice 2")
459+
steveConversation.send(text = "hey alice 3")
460+
val messages = aliceClient.conversations.listBatchDecryptedMessages(
461+
listOf(
462+
Pair(steveConversation.topic, null),
463+
Pair(bobConversation.topic, null),
464+
),
465+
)
466+
val isSteveOrBobConversation = { topic: String ->
467+
(topic.equals(steveConversation.topic) || topic.equals(bobConversation.topic))
468+
}
469+
assertEquals(3, messages.size)
470+
assertTrue(isSteveOrBobConversation(messages[0].topic))
471+
assertTrue(isSteveOrBobConversation(messages[1].topic))
472+
assertTrue(isSteveOrBobConversation(messages[2].topic))
473+
}
474+
451475
@Test
452476
fun testListBatchMessagesWithPagination() {
453477
val bobConversation = aliceClient.conversations.newConversation(bob.walletAddress)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[*.{kt,kts}]
2+
disabled_rules = import-ordering

library/src/main/java/org/xmtp/android/library/Conversation.kt

+17-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import org.xmtp.proto.keystore.api.v1.Keystore.TopicMap.TopicData
1010
import org.xmtp.proto.message.api.v1.MessageApiOuterClass
1111
import org.xmtp.proto.message.contents.Invitation
1212
import org.xmtp.proto.message.contents.Invitation.InvitationV1.Aes256gcmHkdfsha256
13-
import uniffi.xmtp_dh.org.xmtp.android.library.messages.DecryptedMessage
13+
import org.xmtp.android.library.messages.DecryptedMessage
1414
import java.util.Date
1515

1616
sealed class Conversation {
@@ -193,6 +193,15 @@ sealed class Conversation {
193193
}
194194
}
195195

196+
fun decrypt(
197+
envelope: Envelope,
198+
): DecryptedMessage {
199+
return when (this) {
200+
is V1 -> conversationV1.decrypt(envelope)
201+
is V2 -> conversationV2.decrypt(envelope)
202+
}
203+
}
204+
196205
val client: Client
197206
get() {
198207
return when (this) {
@@ -208,6 +217,13 @@ sealed class Conversation {
208217
}
209218
}
210219

220+
fun streamDecryptedMessages(): Flow<DecryptedMessage> {
221+
return when (this) {
222+
is V1 -> conversationV1.streamDecryptedMessages()
223+
is V2 -> conversationV2.streamDecryptedMessages()
224+
}
225+
}
226+
211227
fun streamEphemeral(): Flow<Envelope> {
212228
return when (this) {
213229
is V1 -> return conversationV1.streamEphemeral()

library/src/main/java/org/xmtp/android/library/ConversationV1.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.xmtp.android.library.messages.sentAt
2222
import org.xmtp.android.library.messages.toPublicKeyBundle
2323
import org.xmtp.android.library.messages.walletAddress
2424
import org.xmtp.proto.message.api.v1.MessageApiOuterClass
25-
import uniffi.xmtp_dh.org.xmtp.android.library.messages.DecryptedMessage
25+
import org.xmtp.android.library.messages.DecryptedMessage
2626
import java.util.Date
2727

2828
data class ConversationV1(
@@ -233,4 +233,10 @@ data class ConversationV1(
233233
emit(it)
234234
}
235235
}
236+
237+
fun streamDecryptedMessages(): Flow<DecryptedMessage> = flow {
238+
client.subscribe(listOf(topic.description)).collect {
239+
emit(decrypt(envelope = it))
240+
}
241+
}
236242
}

library/src/main/java/org/xmtp/android/library/ConversationV2.kt

+19-9
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.xmtp.android.library.messages.getPublicKeyBundle
2121
import org.xmtp.android.library.messages.walletAddress
2222
import org.xmtp.proto.message.api.v1.MessageApiOuterClass
2323
import org.xmtp.proto.message.contents.Invitation
24-
import uniffi.xmtp_dh.org.xmtp.android.library.messages.DecryptedMessage
24+
import org.xmtp.android.library.messages.DecryptedMessage
2525
import java.util.Date
2626

2727
data class ConversationV2(
@@ -88,17 +88,21 @@ data class ConversationV2(
8888
val envelopes = runBlocking { client.apiClient.envelopes(topic, pagination) }
8989

9090
return envelopes.map { envelope ->
91-
val message = Message.parseFrom(envelope.message)
92-
MessageV2Builder.buildDecrypt(
93-
id = generateId(envelope = envelope),
94-
topic,
95-
message.v2,
96-
keyMaterial,
97-
client
98-
)
91+
decrypt(envelope)
9992
}
10093
}
10194

95+
fun decrypt(envelope: Envelope): DecryptedMessage {
96+
val message = Message.parseFrom(envelope.message)
97+
return MessageV2Builder.buildDecrypt(
98+
id = generateId(envelope = envelope),
99+
topic,
100+
message.v2,
101+
keyMaterial,
102+
client
103+
)
104+
}
105+
102106
fun streamMessages(): Flow<DecodedMessage> = flow {
103107
client.subscribe(listOf(topic)).mapNotNull { decodeEnvelopeOrNull(envelope = it) }.collect {
104108
emit(it)
@@ -219,4 +223,10 @@ data class ConversationV2(
219223
emit(it)
220224
}
221225
}
226+
227+
fun streamDecryptedMessages(): Flow<DecryptedMessage> = flow {
228+
client.subscribe(listOf(topic)).collect {
229+
emit(decrypt(envelope = it))
230+
}
231+
}
222232
}

library/src/main/java/org/xmtp/android/library/Conversations.kt

+87
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.xmtp.android.library.messages.walletAddress
3131
import org.xmtp.proto.keystore.api.v1.Keystore.TopicMap.TopicData
3232
import org.xmtp.proto.message.contents.Contact
3333
import org.xmtp.proto.message.contents.Invitation
34+
import org.xmtp.android.library.messages.DecryptedMessage
3435
import java.util.Date
3536

3637
data class Conversations(
@@ -307,6 +308,37 @@ data class Conversations(
307308
return messages
308309
}
309310

311+
fun listBatchDecryptedMessages(
312+
topics: List<Pair<String, Pagination?>>,
313+
): List<DecryptedMessage> {
314+
val requests = topics.map { (topic, page) ->
315+
makeQueryRequest(topic = topic, pagination = page)
316+
}
317+
318+
// The maximum number of requests permitted in a single batch call.
319+
val maxQueryRequestsPerBatch = 50
320+
val messages: MutableList<DecryptedMessage> = mutableListOf()
321+
val batches = requests.chunked(maxQueryRequestsPerBatch)
322+
for (batch in batches) {
323+
runBlocking {
324+
messages.addAll(
325+
client.batchQuery(batch).responsesOrBuilderList.flatMap { res ->
326+
res.envelopesList.mapNotNull { envelope ->
327+
val conversation = conversationsByTopic[envelope.contentTopic]
328+
if (conversation == null) {
329+
Log.d(TAG, "discarding message, unknown conversation $envelope")
330+
return@mapNotNull null
331+
}
332+
val msg = conversation.decrypt(envelope)
333+
msg
334+
}
335+
}
336+
)
337+
}
338+
}
339+
return messages
340+
}
341+
310342
fun sendInvitation(
311343
recipient: SignedPublicKeyBundle,
312344
invitation: InvitationV1,
@@ -423,4 +455,59 @@ data class Conversations(
423455
}
424456
}
425457
}
458+
459+
fun streamAllDecryptedMessages(): Flow<DecryptedMessage> = flow {
460+
val topics = mutableListOf(
461+
Topic.userInvite(client.address).description,
462+
Topic.userIntro(client.address).description
463+
)
464+
465+
for (conversation in list()) {
466+
topics.add(conversation.topic)
467+
}
468+
469+
val subscribeFlow = MutableStateFlow(makeSubscribeRequest(topics))
470+
471+
while (true) {
472+
try {
473+
client.subscribe2(request = subscribeFlow).collect { envelope ->
474+
when {
475+
conversationsByTopic.containsKey(envelope.contentTopic) -> {
476+
val conversation = conversationsByTopic[envelope.contentTopic]
477+
val decrypted = conversation?.decrypt(envelope)
478+
decrypted?.let { emit(it) }
479+
}
480+
481+
envelope.contentTopic.startsWith("/xmtp/0/invite-") -> {
482+
val conversation = fromInvite(envelope = envelope)
483+
conversationsByTopic[conversation.topic] = conversation
484+
topics.add(conversation.topic)
485+
subscribeFlow.value = makeSubscribeRequest(topics)
486+
}
487+
488+
envelope.contentTopic.startsWith("/xmtp/0/intro-") -> {
489+
val conversation = fromIntro(envelope = envelope)
490+
conversationsByTopic[conversation.topic] = conversation
491+
val decrypted = conversation.decrypt(envelope)
492+
emit(decrypted)
493+
topics.add(conversation.topic)
494+
subscribeFlow.value = makeSubscribeRequest(topics)
495+
}
496+
497+
else -> {}
498+
}
499+
}
500+
} catch (error: CancellationException) {
501+
break
502+
} catch (error: StatusException) {
503+
if (error.status.code == io.grpc.Status.Code.UNAVAILABLE) {
504+
continue
505+
} else {
506+
break
507+
}
508+
} catch (error: Exception) {
509+
continue
510+
}
511+
}
512+
}
426513
}

library/src/main/java/org/xmtp/android/library/messages/DecryptedMessage.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package uniffi.xmtp_dh.org.xmtp.android.library.messages
1+
package org.xmtp.android.library.messages
22

33
import org.xmtp.android.library.codecs.EncodedContent
44
import java.util.Date

library/src/main/java/org/xmtp/android/library/messages/MessageV2.kt

-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import org.xmtp.android.library.DecodedMessage
1111
import org.xmtp.android.library.KeyUtil
1212
import org.xmtp.android.library.XMTPException
1313
import org.xmtp.android.library.codecs.EncodedContent
14-
import uniffi.xmtp_dh.org.xmtp.android.library.messages.DecryptedMessage
1514
import java.math.BigInteger
1615
import java.util.Date
1716

0 commit comments

Comments
 (0)