|
1 | 1 | package org.xmtp.android.library
|
2 | 2 |
|
3 |
| -import kotlinx.coroutines.coroutineScope |
| 3 | +import android.util.Log |
| 4 | +import kotlinx.coroutines.channels.awaitClose |
4 | 5 | import kotlinx.coroutines.flow.Flow
|
5 |
| -import kotlinx.coroutines.flow.flow |
6 |
| -import kotlinx.coroutines.launch |
| 6 | +import kotlinx.coroutines.flow.callbackFlow |
7 | 7 | import kotlinx.coroutines.runBlocking
|
8 | 8 | import org.xmtp.android.library.codecs.ContentCodec
|
9 | 9 | import org.xmtp.android.library.codecs.EncodedContent
|
10 | 10 | import org.xmtp.android.library.codecs.compress
|
11 | 11 | import org.xmtp.android.library.libxmtp.Message
|
12 |
| -import org.xmtp.android.library.libxmtp.MessageEmitter |
13 | 12 | import org.xmtp.android.library.messages.DecryptedMessage
|
14 | 13 | import org.xmtp.android.library.messages.PagingInfoSortDirection
|
15 | 14 | import org.xmtp.proto.message.api.v1.MessageApiOuterClass
|
16 | 15 | import uniffi.xmtpv3.FfiGroup
|
17 | 16 | import uniffi.xmtpv3.FfiListMessagesOptions
|
| 17 | +import uniffi.xmtpv3.FfiMessage |
| 18 | +import uniffi.xmtpv3.FfiMessageCallback |
18 | 19 | import java.util.Date
|
19 | 20 | import kotlin.time.Duration.Companion.nanoseconds
|
20 | 21 | import kotlin.time.DurationUnit
|
@@ -142,31 +143,33 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
|
142 | 143 | }
|
143 | 144 | }
|
144 | 145 |
|
145 |
| - fun streamMessages(): Flow<DecodedMessage> = flow { |
146 |
| - val messageEmitter = MessageEmitter() |
147 |
| - |
148 |
| - coroutineScope { |
149 |
| - launch { |
150 |
| - messageEmitter.messages.collect { message -> |
151 |
| - emit(Message(client, message).decode()) |
152 |
| - } |
| 146 | + fun streamMessages(): Flow<DecodedMessage> = callbackFlow { |
| 147 | + val messageCallback = object : FfiMessageCallback { |
| 148 | + override fun onMessage(message: FfiMessage) { |
| 149 | + Log.e( |
| 150 | + "LOPI", |
| 151 | + "INFO Callback - Message callback with ID: " + message.id.toHex() + ", members: " + message.addrFrom |
| 152 | + ) |
| 153 | + trySend(Message(client, message).decode()) |
153 | 154 | }
|
154 | 155 | }
|
155 | 156 |
|
156 |
| - libXMTPGroup.stream(messageEmitter.callback) |
| 157 | + val stream = libXMTPGroup.stream(messageCallback) |
| 158 | + awaitClose { stream.end() } |
157 | 159 | }
|
158 | 160 |
|
159 |
| - fun streamDecryptedMessages(): Flow<DecryptedMessage> = flow { |
160 |
| - val messageEmitter = MessageEmitter() |
161 |
| - |
162 |
| - coroutineScope { |
163 |
| - launch { |
164 |
| - messageEmitter.messages.collect { message -> |
165 |
| - emit(decrypt(Message(client, message))) |
166 |
| - } |
| 161 | + fun streamDecryptedMessages(): Flow<DecryptedMessage> = callbackFlow { |
| 162 | + val messageCallback = object : FfiMessageCallback { |
| 163 | + override fun onMessage(message: FfiMessage) { |
| 164 | + Log.e( |
| 165 | + "LOPI", |
| 166 | + "INFO Callback - Message callback with ID: " + message.id.toHex() + ", members: " + message.addrFrom |
| 167 | + ) |
| 168 | + trySend(decrypt(Message(client, message))) |
167 | 169 | }
|
168 | 170 | }
|
169 | 171 |
|
170 |
| - libXMTPGroup.stream(messageEmitter.callback) |
| 172 | + val stream = libXMTPGroup.stream(messageCallback) |
| 173 | + awaitClose { stream.end() } |
171 | 174 | }
|
172 | 175 | }
|
0 commit comments