Skip to content

Commit 4854818

Browse files
committed
clean up query threading specifically around listing conversations
1 parent 49e94db commit 4854818

File tree

6 files changed

+96
-118
lines changed

6 files changed

+96
-118
lines changed

library/src/main/java/org/xmtp/android/library/Client.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ class Client() {
448448
return null
449449
}
450450

451-
fun publishUserContact(legacy: Boolean = false) {
451+
suspend fun publishUserContact(legacy: Boolean = false) {
452452
val envelopes: MutableList<MessageApiOuterClass.Envelope> = mutableListOf()
453453
if (legacy) {
454454
val contactBundle = ContactBundle.newBuilder().also {
@@ -484,7 +484,7 @@ class Client() {
484484
message = contactBundle.toByteString()
485485
}.build()
486486
envelopes.add(envelope)
487-
runBlocking { publish(envelopes = envelopes) }
487+
publish(envelopes = envelopes)
488488
}
489489

490490
fun getUserContact(peerAddress: String): ContactBundle? {
@@ -525,7 +525,7 @@ class Client() {
525525
return apiClient.publish(envelopes = envelopes)
526526
}
527527

528-
fun ensureUserContactPublished() {
528+
suspend fun ensureUserContactPublished() {
529529
val contact = getUserContact(peerAddress = address)
530530
if (contact != null && keys.getPublicKeyBundle() == contact.v2.keyBundle) {
531531
return

library/src/main/java/org/xmtp/android/library/Contacts.kt

+6-6
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ class ConsentList(val client: Client) {
101101
return consentList
102102
}
103103

104-
fun publish(entry: ConsentListEntry) {
104+
suspend fun publish(entry: ConsentListEntry) {
105105
val payload =
106106
PrivatePreferencesAction.newBuilder().also {
107107
when (entry.entryType) {
@@ -152,7 +152,7 @@ class ConsentList(val client: Client) {
152152
ByteArray(message.size) { message[it] },
153153
)
154154

155-
runBlocking { client.publish(listOf(envelope)) }
155+
client.publish(listOf(envelope))
156156
}
157157

158158
fun allow(address: String): ConsentListEntry {
@@ -210,25 +210,25 @@ data class Contacts(
210210
return consentList
211211
}
212212

213-
fun allow(addresses: List<String>) {
213+
suspend fun allow(addresses: List<String>) {
214214
for (address in addresses) {
215215
ConsentList(client).publish(consentList.allow(address))
216216
}
217217
}
218218

219-
fun deny(addresses: List<String>) {
219+
suspend fun deny(addresses: List<String>) {
220220
for (address in addresses) {
221221
ConsentList(client).publish(consentList.deny(address))
222222
}
223223
}
224224

225-
fun allowGroup(groupIds: List<ByteArray>) {
225+
suspend fun allowGroup(groupIds: List<ByteArray>) {
226226
for (id in groupIds) {
227227
ConsentList(client).publish(consentList.allowGroup(id))
228228
}
229229
}
230230

231-
fun denyGroup(groupIds: List<ByteArray>) {
231+
suspend fun denyGroup(groupIds: List<ByteArray>) {
232232
for (id in groupIds) {
233233
ConsentList(client).publish(consentList.denyGroup(id))
234234
}

library/src/main/java/org/xmtp/android/library/ConversationV1.kt

+4-8
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package org.xmtp.android.library
33
import android.util.Log
44
import kotlinx.coroutines.flow.Flow
55
import kotlinx.coroutines.flow.flow
6-
import kotlinx.coroutines.runBlocking
76
import org.web3j.crypto.Hash
87
import org.xmtp.android.library.codecs.ContentCodec
98
import org.xmtp.android.library.codecs.EncodedContent
@@ -60,17 +59,15 @@ data class ConversationV1(
6059
* If [direction] is specified then that will control the sort order of te messages.
6160
* @see Conversation.messages
6261
*/
63-
fun messages(
62+
suspend fun messages(
6463
limit: Int? = null,
6564
before: Date? = null,
6665
after: Date? = null,
6766
direction: PagingInfoSortDirection = MessageApiOuterClass.SortDirection.SORT_DIRECTION_DESCENDING,
6867
): List<DecodedMessage> {
6968
val pagination =
7069
Pagination(limit = limit, before = before, after = after, direction = direction)
71-
val result = runBlocking {
72-
client.apiClient.envelopes(topic = topic.description, pagination = pagination)
73-
}
70+
val result = client.apiClient.envelopes(topic = topic.description, pagination = pagination)
7471

7572
return result.mapNotNull { envelope ->
7673
decodeOrNull(envelope = envelope)
@@ -90,7 +87,7 @@ data class ConversationV1(
9087
* If [limit] is specified then results are pulled in pages of that size.
9188
* If [direction] is specified then that will control the sort order of te messages.
9289
*/
93-
fun decryptedMessages(
90+
suspend fun decryptedMessages(
9491
limit: Int? = null,
9592
before: Date? = null,
9693
after: Date? = null,
@@ -99,12 +96,11 @@ data class ConversationV1(
9996
val pagination =
10097
Pagination(limit = limit, before = before, after = after, direction = direction)
10198

102-
val envelopes = runBlocking {
99+
val envelopes =
103100
client.apiClient.envelopes(
104101
topic = Topic.directMessageV1(client.address, peerAddress).description,
105102
pagination = pagination,
106103
)
107-
}
108104

109105
return envelopes.map { decrypt(it) }
110106
}

library/src/main/java/org/xmtp/android/library/ConversationV2.kt

+4-6
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import android.util.Log
44
import kotlinx.coroutines.flow.Flow
55
import kotlinx.coroutines.flow.flow
66
import kotlinx.coroutines.flow.mapNotNull
7-
import kotlinx.coroutines.runBlocking
87
import org.web3j.crypto.Hash
98
import org.xmtp.android.library.codecs.ContentCodec
109
import org.xmtp.android.library.codecs.EncodedContent
@@ -73,20 +72,19 @@ data class ConversationV2(
7372
* If [direction] is specified then that will control the sort order of te messages.
7473
* @see Conversation.messages
7574
*/
76-
fun messages(
75+
suspend fun messages(
7776
limit: Int? = null,
7877
before: Date? = null,
7978
after: Date? = null,
8079
direction: PagingInfoSortDirection = MessageApiOuterClass.SortDirection.SORT_DIRECTION_DESCENDING,
8180
): List<DecodedMessage> {
8281
val pagination =
8382
Pagination(limit = limit, before = before, after = after, direction = direction)
84-
val result = runBlocking {
83+
val result =
8584
client.apiClient.envelopes(
8685
topic = topic,
8786
pagination = pagination,
8887
)
89-
}
9088

9189
return result.mapNotNull { envelope ->
9290
decodeEnvelopeOrNull(envelope)
@@ -106,15 +104,15 @@ data class ConversationV2(
106104
* If [limit] is specified then results are pulled in pages of that size.
107105
* If [direction] is specified then that will control the sort order of te messages.
108106
*/
109-
fun decryptedMessages(
107+
suspend fun decryptedMessages(
110108
limit: Int? = null,
111109
before: Date? = null,
112110
after: Date? = null,
113111
direction: PagingInfoSortDirection = MessageApiOuterClass.SortDirection.SORT_DIRECTION_DESCENDING,
114112
): List<DecryptedMessage> {
115113
val pagination =
116114
Pagination(limit = limit, before = before, after = after, direction = direction)
117-
val envelopes = runBlocking { client.apiClient.envelopes(topic, pagination) }
115+
val envelopes = client.apiClient.envelopes(topic, pagination)
118116

119117
return envelopes.map { envelope ->
120118
decrypt(envelope)

library/src/main/java/org/xmtp/android/library/Conversations.kt

+54-63
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import kotlinx.coroutines.flow.MutableStateFlow
1010
import kotlinx.coroutines.flow.callbackFlow
1111
import kotlinx.coroutines.flow.flow
1212
import kotlinx.coroutines.flow.merge
13-
import kotlinx.coroutines.runBlocking
1413
import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest
1514
import org.xmtp.android.library.GRPCApiClient.Companion.makeSubscribeRequest
1615
import org.xmtp.android.library.libxmtp.Message
@@ -127,17 +126,19 @@ data class Conversations(
127126
libXMTPConversations?.sync()
128127
}
129128

130-
fun listGroups(after: Date? = null, before: Date? = null, limit: Int? = null): List<Group> {
131-
return runBlocking {
132-
libXMTPConversations?.list(
133-
opts = FfiListConversationsOptions(
134-
after?.time?.nanoseconds?.toLong(DurationUnit.NANOSECONDS),
135-
before?.time?.nanoseconds?.toLong(DurationUnit.NANOSECONDS),
136-
limit?.toLong()
137-
)
138-
)?.map {
139-
Group(client, it)
140-
}
129+
suspend fun listGroups(
130+
after: Date? = null,
131+
before: Date? = null,
132+
limit: Int? = null,
133+
): List<Group> {
134+
return libXMTPConversations?.list(
135+
opts = FfiListConversationsOptions(
136+
after?.time?.nanoseconds?.toLong(DurationUnit.NANOSECONDS),
137+
before?.time?.nanoseconds?.toLong(DurationUnit.NANOSECONDS),
138+
limit?.toLong()
139+
)
140+
)?.map {
141+
Group(client, it)
141142
} ?: emptyList()
142143
}
143144

@@ -234,7 +235,7 @@ data class Conversations(
234235
* Get the list of conversations that current user has
235236
* @return The list of [Conversation] that the current [Client] has.
236237
*/
237-
fun list(includeGroups: Boolean = false): List<Conversation> {
238+
suspend fun list(includeGroups: Boolean = false): List<Conversation> {
238239
val newConversations = mutableListOf<Conversation>()
239240
val mostRecent = conversationsByTopic.values.maxOfOrNull { it.createdAt }
240241
val pagination = Pagination(after = mostRecent)
@@ -264,10 +265,8 @@ data class Conversations(
264265
}.map { Pair(it.topic, it) }
265266

266267
if (includeGroups) {
267-
val groups = runBlocking {
268-
syncGroups()
269-
listGroups()
270-
}
268+
syncGroups()
269+
val groups = listGroups()
271270
conversationsByTopic += groups.map { Pair(it.id.toString(), Conversation.Group(it)) }
272271
}
273272
return conversationsByTopic.values.sortedByDescending { it.createdAt }
@@ -338,14 +337,11 @@ data class Conversations(
338337
return hmacKeysResponse.build()
339338
}
340339

341-
private fun listIntroductionPeers(pagination: Pagination? = null): Map<String, Date> {
342-
val envelopes =
343-
runBlocking {
344-
client.apiClient.queryTopic(
345-
topic = Topic.userIntro(client.address),
346-
pagination = pagination,
347-
).envelopesList
348-
}
340+
private suspend fun listIntroductionPeers(pagination: Pagination? = null): Map<String, Date> {
341+
val envelopes = client.apiClient.queryTopic(
342+
topic = Topic.userIntro(client.address),
343+
pagination = pagination,
344+
).envelopesList
349345
val messages = envelopes.mapNotNull { envelope ->
350346
try {
351347
val message = MessageV1Builder.buildFromBytes(envelope.message.toByteArray())
@@ -381,10 +377,9 @@ data class Conversations(
381377
* @param pagination Information of the topics, ranges (dates), etc.
382378
* @return List of [SealedInvitation] that are inside of the range specified by [pagination]
383379
*/
384-
private fun listInvitations(pagination: Pagination? = null): List<SealedInvitation> {
385-
val envelopes = runBlocking {
380+
private suspend fun listInvitations(pagination: Pagination? = null): List<SealedInvitation> {
381+
val envelopes =
386382
client.apiClient.envelopes(Topic.userInvite(client.address).description, pagination)
387-
}
388383
return envelopes.map { envelope ->
389384
SealedInvitation.parseFrom(envelope.message)
390385
}
@@ -404,7 +399,7 @@ data class Conversations(
404399
* This pulls messages from multiple conversations in a single call.
405400
* @see Conversation.messages
406401
*/
407-
fun listBatchMessages(
402+
suspend fun listBatchMessages(
408403
topics: List<Pair<String, Pagination?>>,
409404
): List<DecodedMessage> {
410405
val requests = topics.map { (topic, page) ->
@@ -416,21 +411,19 @@ data class Conversations(
416411
val messages: MutableList<DecodedMessage> = mutableListOf()
417412
val batches = requests.chunked(maxQueryRequestsPerBatch)
418413
for (batch in batches) {
419-
runBlocking {
420-
messages.addAll(
421-
client.batchQuery(batch).responsesOrBuilderList.flatMap { res ->
422-
res.envelopesList.mapNotNull { envelope ->
423-
val conversation = conversationsByTopic[envelope.contentTopic]
424-
if (conversation == null) {
425-
Log.d(TAG, "discarding message, unknown conversation $envelope")
426-
return@mapNotNull null
427-
}
428-
val msg = conversation.decodeOrNull(envelope)
429-
msg
414+
messages.addAll(
415+
client.batchQuery(batch).responsesOrBuilderList.flatMap { res ->
416+
res.envelopesList.mapNotNull { envelope ->
417+
val conversation = conversationsByTopic[envelope.contentTopic]
418+
if (conversation == null) {
419+
Log.d(TAG, "discarding message, unknown conversation $envelope")
420+
return@mapNotNull null
430421
}
431-
},
432-
)
433-
}
422+
val msg = conversation.decodeOrNull(envelope)
423+
msg
424+
}
425+
},
426+
)
434427
}
435428
return messages
436429
}
@@ -440,7 +433,7 @@ data class Conversations(
440433
* This pulls messages from multiple conversations in a single call.
441434
* @see listBatchMessages
442435
*/
443-
fun listBatchDecryptedMessages(
436+
suspend fun listBatchDecryptedMessages(
444437
topics: List<Pair<String, Pagination?>>,
445438
): List<DecryptedMessage> {
446439
val requests = topics.map { (topic, page) ->
@@ -452,26 +445,24 @@ data class Conversations(
452445
val messages: MutableList<DecryptedMessage> = mutableListOf()
453446
val batches = requests.chunked(maxQueryRequestsPerBatch)
454447
for (batch in batches) {
455-
runBlocking {
456-
messages.addAll(
457-
client.batchQuery(batch).responsesOrBuilderList.flatMap { res ->
458-
res.envelopesList.mapNotNull { envelope ->
459-
val conversation = conversationsByTopic[envelope.contentTopic]
460-
if (conversation == null) {
461-
Log.d(TAG, "discarding message, unknown conversation $envelope")
462-
return@mapNotNull null
463-
}
464-
try {
465-
val msg = conversation.decrypt(envelope)
466-
msg
467-
} catch (e: Exception) {
468-
Log.e(TAG, "Error decrypting message: $envelope", e)
469-
null
470-
}
448+
messages.addAll(
449+
client.batchQuery(batch).responsesOrBuilderList.flatMap { res ->
450+
res.envelopesList.mapNotNull { envelope ->
451+
val conversation = conversationsByTopic[envelope.contentTopic]
452+
if (conversation == null) {
453+
Log.d(TAG, "discarding message, unknown conversation $envelope")
454+
return@mapNotNull null
471455
}
472-
},
473-
)
474-
}
456+
try {
457+
val msg = conversation.decrypt(envelope)
458+
msg
459+
} catch (e: Exception) {
460+
Log.e(TAG, "Error decrypting message: $envelope", e)
461+
null
462+
}
463+
}
464+
},
465+
)
475466
}
476467
return messages
477468
}

0 commit comments

Comments
 (0)