Skip to content

Commit 6037cd5

Browse files
committed
clean up the subscribe2 under the hood
1 parent ea244de commit 6037cd5

File tree

4 files changed

+20
-57
lines changed

4 files changed

+20
-57
lines changed

library/src/main/java/org/xmtp/android/library/Client.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ import com.google.crypto.tink.subtle.Base64
99
import com.google.gson.GsonBuilder
1010
import kotlinx.coroutines.Dispatchers
1111
import kotlinx.coroutines.flow.Flow
12+
import kotlinx.coroutines.flow.flowOf
1213
import kotlinx.coroutines.runBlocking
1314
import kotlinx.coroutines.withContext
1415
import org.web3j.crypto.Keys
1516
import org.web3j.crypto.Keys.toChecksumAddress
17+
import org.xmtp.android.library.GRPCApiClient.Companion.makeSubscribeRequest
1618
import org.xmtp.android.library.codecs.ContentCodec
1719
import org.xmtp.android.library.codecs.TextCodec
1820
import org.xmtp.android.library.libxmtp.XMTPLogger
@@ -497,7 +499,10 @@ class Client() {
497499
return apiClient.batchQuery(requests)
498500
}
499501

500-
suspend fun subscribe(request: Flow<MessageApiOuterClass.SubscribeRequest>): Flow<Envelope> {
502+
suspend fun subscribe(topics: List<String>): Flow<Envelope> {
503+
return subscribe2(flowOf(makeSubscribeRequest(topics)))
504+
}
505+
suspend fun subscribe2(request: Flow<MessageApiOuterClass.SubscribeRequest>): Flow<Envelope> {
501506
return apiClient.subscribe(request = request)
502507
}
503508

library/src/main/java/org/xmtp/android/library/ConversationV1.kt

+4-23
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ package org.xmtp.android.library
22

33
import android.util.Log
44
import kotlinx.coroutines.flow.Flow
5-
import kotlinx.coroutines.flow.MutableStateFlow
65
import kotlinx.coroutines.flow.flow
76
import kotlinx.coroutines.runBlocking
87
import org.web3j.crypto.Hash
98
import org.xmtp.android.library.codecs.ContentCodec
109
import org.xmtp.android.library.codecs.EncodedContent
1110
import org.xmtp.android.library.codecs.compress
11+
import org.xmtp.android.library.messages.DecryptedMessage
1212
import org.xmtp.android.library.messages.Envelope
1313
import org.xmtp.android.library.messages.EnvelopeBuilder
1414
import org.xmtp.android.library.messages.Message
@@ -23,7 +23,6 @@ import org.xmtp.android.library.messages.sentAt
2323
import org.xmtp.android.library.messages.toPublicKeyBundle
2424
import org.xmtp.android.library.messages.walletAddress
2525
import org.xmtp.proto.message.api.v1.MessageApiOuterClass
26-
import org.xmtp.android.library.messages.DecryptedMessage
2726
import java.util.Date
2827

2928
data class ConversationV1(
@@ -42,13 +41,7 @@ data class ConversationV1(
4241
* @see Conversations.streamAllMessages
4342
*/
4443
fun streamMessages(): Flow<DecodedMessage> = flow {
45-
client.subscribe(
46-
MutableStateFlow(
47-
GRPCApiClient.makeSubscribeRequest(
48-
listOf(topic.description)
49-
)
50-
)
51-
).collect {
44+
client.subscribe(listOf(topic.description)).collect {
5245
emit(decode(envelope = it))
5346
}
5447
}
@@ -279,25 +272,13 @@ data class ConversationV1(
279272
get() = topic.description.replace("/xmtp/0/dm-", "/xmtp/0/dmE-")
280273

281274
fun streamEphemeral(): Flow<Envelope> = flow {
282-
client.subscribe(
283-
MutableStateFlow(
284-
GRPCApiClient.makeSubscribeRequest(
285-
listOf(ephemeralTopic)
286-
)
287-
)
288-
).collect {
275+
client.subscribe(listOf(ephemeralTopic)).collect {
289276
emit(it)
290277
}
291278
}
292279

293280
fun streamDecryptedMessages(): Flow<DecryptedMessage> = flow {
294-
client.subscribe(
295-
MutableStateFlow(
296-
GRPCApiClient.makeSubscribeRequest(
297-
listOf(topic.description)
298-
)
299-
)
300-
).collect {
281+
client.subscribe(listOf(topic.description)).collect {
301282
emit(decrypt(envelope = it))
302283
}
303284
}

library/src/main/java/org/xmtp/android/library/ConversationV2.kt

+4-23
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ package org.xmtp.android.library
22

33
import android.util.Log
44
import kotlinx.coroutines.flow.Flow
5-
import kotlinx.coroutines.flow.MutableStateFlow
65
import kotlinx.coroutines.flow.flow
76
import kotlinx.coroutines.flow.mapNotNull
87
import kotlinx.coroutines.runBlocking
98
import org.web3j.crypto.Hash
109
import org.xmtp.android.library.codecs.ContentCodec
1110
import org.xmtp.android.library.codecs.EncodedContent
1211
import org.xmtp.android.library.codecs.compress
12+
import org.xmtp.android.library.messages.DecryptedMessage
1313
import org.xmtp.android.library.messages.Envelope
1414
import org.xmtp.android.library.messages.EnvelopeBuilder
1515
import org.xmtp.android.library.messages.Message
@@ -22,7 +22,6 @@ import org.xmtp.android.library.messages.getPublicKeyBundle
2222
import org.xmtp.android.library.messages.walletAddress
2323
import org.xmtp.proto.message.api.v1.MessageApiOuterClass
2424
import org.xmtp.proto.message.contents.Invitation
25-
import org.xmtp.android.library.messages.DecryptedMessage
2625
import java.util.Date
2726

2827
data class ConversationV2(
@@ -139,13 +138,7 @@ data class ConversationV2(
139138
}
140139

141140
fun streamMessages(): Flow<DecodedMessage> = flow {
142-
client.subscribe(
143-
MutableStateFlow(
144-
GRPCApiClient.makeSubscribeRequest(
145-
listOf(topic)
146-
)
147-
)
148-
).mapNotNull { decodeEnvelopeOrNull(envelope = it) }.collect {
141+
client.subscribe(listOf(topic)).mapNotNull { decodeEnvelopeOrNull(envelope = it) }.collect {
149142
emit(it)
150143
}
151144
}
@@ -276,25 +269,13 @@ data class ConversationV2(
276269
get() = topic.replace("/xmtp/0/m", "/xmtp/0/mE")
277270

278271
fun streamEphemeral(): Flow<Envelope> = flow {
279-
client.subscribe(
280-
MutableStateFlow(
281-
GRPCApiClient.makeSubscribeRequest(
282-
listOf(ephemeralTopic)
283-
)
284-
)
285-
).collect {
272+
client.subscribe(listOf(ephemeralTopic)).collect {
286273
emit(it)
287274
}
288275
}
289276

290277
fun streamDecryptedMessages(): Flow<DecryptedMessage> = flow {
291-
client.subscribe(
292-
MutableStateFlow(
293-
GRPCApiClient.makeSubscribeRequest(
294-
listOf(topic)
295-
)
296-
)
297-
).collect {
278+
client.subscribe(listOf(topic)).collect {
298279
emit(decrypt(envelope = it))
299280
}
300281
}

library/src/main/java/org/xmtp/android/library/Conversations.kt

+6-10
Original file line numberDiff line numberDiff line change
@@ -527,14 +527,10 @@ data class Conversations(
527527
fun stream(): Flow<Conversation> = flow {
528528
val streamedConversationTopics: MutableSet<String> = mutableSetOf()
529529
client.subscribe(
530-
MutableStateFlow(
531-
makeSubscribeRequest(
532-
listOf(
533-
Topic.userIntro(client.address).description,
534-
Topic.userInvite(client.address).description
535-
)
536-
)
537-
),
530+
listOf(
531+
Topic.userIntro(client.address).description,
532+
Topic.userInvite(client.address).description
533+
)
538534
).collect { envelope ->
539535
if (envelope.contentTopic == Topic.userIntro(client.address).description) {
540536
val conversationV1 = fromIntro(envelope = envelope)
@@ -621,7 +617,7 @@ data class Conversations(
621617

622618
while (true) {
623619
try {
624-
client.subscribe(request = subscribeFlow).collect { envelope ->
620+
client.subscribe2(request = subscribeFlow).collect { envelope ->
625621
when {
626622
conversationsByTopic.containsKey(envelope.contentTopic) -> {
627623
val conversation = conversationsByTopic[envelope.contentTopic]
@@ -692,7 +688,7 @@ data class Conversations(
692688

693689
while (true) {
694690
try {
695-
client.subscribe(request = subscribeFlow).collect { envelope ->
691+
client.subscribe2(request = subscribeFlow).collect { envelope ->
696692
when {
697693
conversationsByTopic.containsKey(envelope.contentTopic) -> {
698694
val conversation = conversationsByTopic[envelope.contentTopic]

0 commit comments

Comments
 (0)