Skip to content

Commit cbfdce1

Browse files
committed
Revert "remove all the streams work and move to another PR"
This reverts commit 76bfcc7.
1 parent 76bfcc7 commit cbfdce1

File tree

6 files changed

+140
-5
lines changed

6 files changed

+140
-5
lines changed

library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt

+37
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package org.xmtp.android.library
22

33
import androidx.test.ext.junit.runners.AndroidJUnit4
44
import androidx.test.platform.app.InstrumentationRegistry
5+
import app.cash.turbine.test
6+
import kotlinx.coroutines.ExperimentalCoroutinesApi
57
import kotlinx.coroutines.runBlocking
68
import org.junit.Assert.assertEquals
79
import org.junit.Before
@@ -16,6 +18,7 @@ import org.xmtp.android.library.messages.PrivateKey
1618
import org.xmtp.android.library.messages.PrivateKeyBuilder
1719
import org.xmtp.android.library.messages.walletAddress
1820

21+
@OptIn(ExperimentalCoroutinesApi::class)
1922
@RunWith(AndroidJUnit4::class)
2023
class GroupTest {
2124
lateinit var fakeApiClient: FakeApiClient
@@ -174,4 +177,38 @@ class GroupTest {
174177
assertEquals(ReactionAction.Added, content?.action)
175178
assertEquals(ReactionSchema.Unicode, content?.schema)
176179
}
180+
181+
@Test
182+
fun testCanStreamGroupMessages() = kotlinx.coroutines.test.runTest {
183+
val group = boClient.conversations.newGroup(listOf(alix.walletAddress.lowercase()))
184+
185+
group.streamMessages().test {
186+
group.send("hi")
187+
assertEquals("hi", awaitItem().body)
188+
awaitComplete()
189+
}
190+
}
191+
192+
@Test
193+
fun testCanStreamGroups() = kotlinx.coroutines.test.runTest {
194+
boClient.conversations.streamGroups().test {
195+
val conversation =
196+
boClient.conversations.newGroup(listOf(alix.walletAddress.lowercase()))
197+
conversation.send(content = "hi")
198+
assertEquals("hi", awaitItem().messages().first().body)
199+
awaitComplete()
200+
}
201+
}
202+
203+
@Test
204+
fun testCanStreamGroupsAndConversations() = kotlinx.coroutines.test.runTest {
205+
boClient.conversations.stream(includeGroups = true).test {
206+
val group =
207+
boClient.conversations.newGroup(listOf(alix.walletAddress.lowercase()))
208+
val conversation =
209+
boClient.conversations.newConversation(alix.walletAddress.lowercase())
210+
assertEquals("hi", awaitItem().messages().first().body)
211+
awaitComplete()
212+
}
213+
}
177214
}

library/src/main/java/org/xmtp/android/library/Conversation.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -318,15 +318,15 @@ sealed class Conversation {
318318
return when (this) {
319319
is V1 -> conversationV1.streamMessages()
320320
is V2 -> conversationV2.streamMessages()
321-
is Group -> throw XMTPException("Coming follow up PR")
321+
is Group -> group.streamMessages()
322322
}
323323
}
324324

325325
fun streamDecryptedMessages(): Flow<DecryptedMessage> {
326326
return when (this) {
327327
is V1 -> conversationV1.streamDecryptedMessages()
328328
is V2 -> conversationV2.streamDecryptedMessages()
329-
is Group -> throw XMTPException("Coming follow up PR")
329+
is Group -> group.streamDecryptedMessages()
330330
}
331331
}
332332

library/src/main/java/org/xmtp/android/library/Conversations.kt

+34-3
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package org.xmtp.android.library
33
import android.util.Log
44
import io.grpc.StatusException
55
import kotlinx.coroutines.CancellationException
6+
import kotlinx.coroutines.coroutineScope
67
import kotlinx.coroutines.flow.Flow
78
import kotlinx.coroutines.flow.MutableStateFlow
89
import kotlinx.coroutines.flow.flow
10+
import kotlinx.coroutines.launch
911
import kotlinx.coroutines.runBlocking
1012
import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest
1113
import org.xmtp.android.library.GRPCApiClient.Companion.makeSubscribeRequest
@@ -34,6 +36,7 @@ import org.xmtp.proto.message.contents.Contact
3436
import org.xmtp.proto.message.contents.Invitation
3537
import uniffi.xmtpv3.FfiConversations
3638
import uniffi.xmtpv3.FfiListConversationsOptions
39+
import uniffi.xmtpv3.org.xmtp.android.library.libxmtp.GroupEmitter
3740
import java.util.Date
3841
import kotlin.time.Duration.Companion.nanoseconds
3942
import kotlin.time.DurationUnit
@@ -90,8 +93,8 @@ data class Conversations(
9093
if (accountAddresses.isEmpty()) {
9194
throw XMTPException("Cannot start an empty group chat.")
9295
}
93-
if (accountAddresses.size == 1 &&
94-
accountAddresses.first().lowercase() == client.address.lowercase()
96+
if (accountAddresses.size == 1 && accountAddresses.first()
97+
.lowercase() == client.address.lowercase()
9598
) {
9699
throw XMTPException("Recipient is sender")
97100
}
@@ -473,7 +476,21 @@ data class Conversations(
473476
* of the information of those conversations according to the topics
474477
* @return Stream of data information for the conversations
475478
*/
476-
fun stream(): Flow<Conversation> = flow {
479+
fun stream(includeGroups: Boolean = false): Flow<Conversation> = flow {
480+
if (includeGroups) {
481+
val groupEmitter = GroupEmitter()
482+
483+
coroutineScope {
484+
launch {
485+
groupEmitter.groups.collect { group ->
486+
emit(Conversation.Group(Group(client, group)))
487+
}
488+
}
489+
}
490+
491+
libXMTPConversations?.stream(groupEmitter.callback)
492+
}
493+
477494
val streamedConversationTopics: MutableSet<String> = mutableSetOf()
478495
client.subscribeTopic(
479496
listOf(Topic.userIntro(client.address), Topic.userInvite(client.address)),
@@ -497,6 +514,20 @@ data class Conversations(
497514
}
498515
}
499516

517+
fun streamGroups(): Flow<Group> = flow {
518+
val groupEmitter = GroupEmitter()
519+
520+
coroutineScope {
521+
launch {
522+
groupEmitter.groups.collect { group ->
523+
emit(Group(client, group))
524+
}
525+
}
526+
}
527+
528+
libXMTPConversations?.stream(groupEmitter.callback)
529+
}
530+
500531
/**
501532
* Get the stream of all messages of the current [Client]
502533
* @return Flow object of [DecodedMessage] that represents all the messages of the

library/src/main/java/org/xmtp/android/library/Group.kt

+33
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
package org.xmtp.android.library
22

3+
import kotlinx.coroutines.coroutineScope
4+
import kotlinx.coroutines.flow.Flow
5+
import kotlinx.coroutines.flow.flow
6+
import kotlinx.coroutines.launch
37
import kotlinx.coroutines.runBlocking
48
import org.xmtp.android.library.codecs.ContentCodec
59
import org.xmtp.android.library.codecs.EncodedContent
610
import org.xmtp.android.library.codecs.compress
711
import org.xmtp.android.library.libxmtp.Message
12+
import org.xmtp.android.library.libxmtp.MessageEmitter
813
import org.xmtp.android.library.messages.DecryptedMessage
914
import org.xmtp.android.library.messages.PagingInfoSortDirection
1015
import org.xmtp.proto.message.api.v1.MessageApiOuterClass
@@ -123,6 +128,34 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
123128
)
124129
}
125130

131+
fun streamMessages(): Flow<DecodedMessage> = flow {
132+
val messageEmitter = MessageEmitter()
133+
134+
coroutineScope {
135+
launch {
136+
messageEmitter.messages.collect { message ->
137+
emit(Message(client, message).decode())
138+
}
139+
}
140+
}
141+
142+
libXMTPGroup.stream(messageEmitter.callback)
143+
}
144+
145+
fun streamDecryptedMessages(): Flow<DecryptedMessage> = flow {
146+
val messageEmitter = MessageEmitter()
147+
148+
coroutineScope {
149+
launch {
150+
messageEmitter.messages.collect { message ->
151+
emit(decrypt(Message(client, message)))
152+
}
153+
}
154+
}
155+
156+
libXMTPGroup.stream(messageEmitter.callback)
157+
}
158+
126159
fun addMembers(addresses: List<String>) {
127160
runBlocking { libXMTPGroup.addMembers(addresses) }
128161
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package uniffi.xmtpv3.org.xmtp.android.library.libxmtp
2+
3+
import kotlinx.coroutines.flow.MutableSharedFlow
4+
import kotlinx.coroutines.flow.asSharedFlow
5+
import uniffi.xmtpv3.FfiConversationCallback
6+
import uniffi.xmtpv3.FfiGroup
7+
8+
class GroupEmitter {
9+
private val _groups = MutableSharedFlow<FfiGroup>()
10+
val groups = _groups.asSharedFlow()
11+
12+
val callback: FfiConversationCallback = object : FfiConversationCallback {
13+
override fun onConversation(conversation: FfiGroup) {
14+
_groups.tryEmit(conversation)
15+
}
16+
}
17+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package org.xmtp.android.library.libxmtp
2+
3+
import kotlinx.coroutines.flow.MutableSharedFlow
4+
import kotlinx.coroutines.flow.asSharedFlow
5+
import uniffi.xmtpv3.FfiMessage
6+
import uniffi.xmtpv3.FfiMessageCallback
7+
8+
class MessageEmitter {
9+
private val _messages = MutableSharedFlow<FfiMessage>()
10+
val messages = _messages.asSharedFlow()
11+
12+
val callback: FfiMessageCallback = object : FfiMessageCallback {
13+
override fun onMessage(message: FfiMessage) {
14+
_messages.tryEmit(message)
15+
}
16+
}
17+
}

0 commit comments

Comments
 (0)