Skip to content

Commit caf88b2

Browse files
authored
List batch messages (#78)
* add api client with grpc kotlin * add list batch messages * add a test for it * fix up linter
1 parent 71c8035 commit caf88b2

File tree

3 files changed

+93
-27
lines changed

3 files changed

+93
-27
lines changed

library/src/androidTest/java/org/xmtp/android/library/ConversationTest.kt

+18
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,24 @@ class ConversationTest {
412412
assertEquals("hey alice 1", messages2[0].body)
413413
}
414414

415+
@Test
416+
fun testListBatchMessages() {
417+
val bobConversation = bobClient.conversations.newConversation(
418+
alice.walletAddress,
419+
context = InvitationV1ContextBuilder.buildFromConversation("hi")
420+
)
421+
422+
val aliceConversation = aliceClient.conversations.newConversation(
423+
bob.walletAddress,
424+
context = InvitationV1ContextBuilder.buildFromConversation("hi")
425+
)
426+
bobConversation.send(text = "hey alice 1")
427+
bobConversation.send(text = "hey alice 2")
428+
bobConversation.send(text = "hey alice 3")
429+
val messages = aliceClient.conversations.listBatchMessages(listOf(aliceConversation.topic, bobConversation.topic))
430+
assertEquals(3, messages.size)
431+
}
432+
415433
@Test
416434
fun testImportV1ConversationFromJS() {
417435
val jsExportJSONData =

library/src/main/java/org/xmtp/android/library/ApiClient.kt

+37-27
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,11 @@ interface ApiClient {
3737
suspend fun subscribe(topics: List<String>): Flow<Envelope>
3838
}
3939

40-
data class GRPCApiClient(override val environment: XMTPEnvironment, val secure: Boolean = true, val appVersion: String? = null) :
40+
data class GRPCApiClient(
41+
override val environment: XMTPEnvironment,
42+
val secure: Boolean = true,
43+
val appVersion: String? = null,
44+
) :
4145
ApiClient, Closeable {
4246
companion object {
4347
val AUTHORIZATION_HEADER_KEY: Metadata.Key<String> =
@@ -48,6 +52,37 @@ data class GRPCApiClient(override val environment: XMTPEnvironment, val secure:
4852

4953
val APP_VERSION_HEADER_KEY: Metadata.Key<String> =
5054
Metadata.Key.of("X-App-Version", Metadata.ASCII_STRING_MARSHALLER)
55+
56+
fun makeQueryRequest(
57+
topic: String,
58+
pagination: Pagination? = null,
59+
cursor: Cursor? = null,
60+
): QueryRequest =
61+
QueryRequest.newBuilder()
62+
.addContentTopics(topic).also {
63+
if (pagination != null) {
64+
it.pagingInfo = pagination.pagingInfo
65+
}
66+
if (pagination?.startTime != null) {
67+
it.endTimeNs = pagination.startTime.time * 1_000_000
68+
it.pagingInfo = it.pagingInfo.toBuilder().also { info ->
69+
info.direction =
70+
MessageApiOuterClass.SortDirection.SORT_DIRECTION_DESCENDING
71+
}.build()
72+
}
73+
if (pagination?.endTime != null) {
74+
it.startTimeNs = pagination.endTime.time * 1_000_000
75+
it.pagingInfo = it.pagingInfo.toBuilder().also { info ->
76+
info.direction =
77+
MessageApiOuterClass.SortDirection.SORT_DIRECTION_DESCENDING
78+
}.build()
79+
}
80+
if (cursor != null) {
81+
it.pagingInfo = it.pagingInfo.toBuilder().also { info ->
82+
info.cursor = cursor
83+
}.build()
84+
}
85+
}.build()
5186
}
5287

5388
private val channel: ManagedChannel =
@@ -74,32 +109,7 @@ data class GRPCApiClient(override val environment: XMTPEnvironment, val secure:
74109
pagination: Pagination?,
75110
cursor: Cursor?,
76111
): QueryResponse {
77-
val request = QueryRequest.newBuilder()
78-
.addContentTopics(topic).also {
79-
if (pagination != null) {
80-
it.pagingInfo = pagination.pagingInfo
81-
}
82-
if (pagination?.startTime != null) {
83-
it.endTimeNs = pagination.startTime.time * 1_000_000
84-
it.pagingInfo = it.pagingInfo.toBuilder().also { info ->
85-
info.direction =
86-
MessageApiOuterClass.SortDirection.SORT_DIRECTION_DESCENDING
87-
}.build()
88-
}
89-
if (pagination?.endTime != null) {
90-
it.startTimeNs = pagination.endTime.time * 1_000_000
91-
it.pagingInfo = it.pagingInfo.toBuilder().also { info ->
92-
info.direction =
93-
MessageApiOuterClass.SortDirection.SORT_DIRECTION_DESCENDING
94-
}.build()
95-
}
96-
if (cursor != null) {
97-
it.pagingInfo = it.pagingInfo.toBuilder().also { info ->
98-
info.cursor = cursor
99-
}.build()
100-
}
101-
}.build()
102-
112+
val request = makeQueryRequest(topic, pagination, cursor)
103113
val headers = Metadata()
104114

105115
authToken?.let { token ->

library/src/main/java/org/xmtp/android/library/Conversations.kt

+38
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ import android.util.Log
44
import kotlinx.coroutines.flow.Flow
55
import kotlinx.coroutines.flow.flow
66
import kotlinx.coroutines.runBlocking
7+
import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest
78
import org.xmtp.android.library.messages.Envelope
89
import org.xmtp.android.library.messages.EnvelopeBuilder
910
import org.xmtp.android.library.messages.InvitationV1
1011
import org.xmtp.android.library.messages.MessageV1Builder
12+
import org.xmtp.android.library.messages.Pagination
1113
import org.xmtp.android.library.messages.SealedInvitation
1214
import org.xmtp.android.library.messages.SealedInvitationBuilder
1315
import org.xmtp.android.library.messages.SignedPublicKeyBundle
@@ -220,6 +222,42 @@ data class Conversations(
220222
}
221223
}
222224

225+
fun listBatchMessages(
226+
topics: List<String>,
227+
limit: Int? = null,
228+
before: Date? = null,
229+
after: Date? = null,
230+
): List<DecodedMessage> {
231+
val pagination = Pagination(limit = limit, startTime = before, endTime = after)
232+
val requests = topics.map { topic ->
233+
makeQueryRequest(topic = topic, pagination = pagination)
234+
}
235+
236+
// The maximum number of requests permitted in a single batch call.
237+
val maxQueryRequestsPerBatch = 50
238+
val messages: MutableList<DecodedMessage> = mutableListOf()
239+
val batches = requests.chunked(maxQueryRequestsPerBatch)
240+
for (batch in batches) {
241+
runBlocking {
242+
messages.addAll(
243+
client.batchQuery(batch).responsesOrBuilderList.flatMap { res ->
244+
res.envelopesList.mapNotNull { envelope ->
245+
val conversation =
246+
conversations.firstOrNull { it.topic == envelope.contentTopic }
247+
if (conversation == null) {
248+
Log.d(TAG, "discarding message, unknown conversation $envelope")
249+
return@mapNotNull null
250+
}
251+
val msg = conversation.decode(envelope)
252+
msg
253+
}
254+
}
255+
)
256+
}
257+
}
258+
return messages
259+
}
260+
223261
fun sendInvitation(
224262
recipient: SignedPublicKeyBundle,
225263
invitation: InvitationV1,

0 commit comments

Comments
 (0)