@@ -354,29 +354,44 @@ data class Conversations(
354
354
}
355
355
356
356
fun streamAllMessages (): Flow <DecodedMessage > = flow {
357
- val topics : MutableList < String > =
358
- mutableListOf (
357
+ while ( true ) {
358
+ val topics = mutableListOf (
359
359
Topic .userInvite(client.address).description,
360
360
Topic .userIntro(client.address).description
361
361
)
362
- for (conversation in list()) {
363
- topics.add(conversation.topic)
364
- }
365
- client.subscribe(topics).collect { envelope ->
366
- var conversation = conversationsByTopic[envelope.contentTopic]
367
- var decoded: DecodedMessage ? = null
368
- if (conversation != null ) {
369
- decoded = conversation.decodeOrNull(envelope)
370
- } else if (envelope.contentTopic.startsWith(" /xmtp/0/invite-" )) {
371
- conversation = fromInvite(envelope = envelope)
372
- conversationsByTopic[conversation.topic] = conversation
373
- } else if (envelope.contentTopic.startsWith(" /xmtp/0/intro-" )) {
374
- conversation = fromIntro(envelope = envelope)
375
- conversationsByTopic[conversation.topic] = conversation
376
- decoded = conversation.decodeOrNull(envelope)
362
+
363
+ for (conversation in list()) {
364
+ topics.add(conversation.topic)
377
365
}
378
- if (decoded != null ) {
379
- emit(decoded)
366
+
367
+ try {
368
+ client.subscribe(topics = topics).collect { envelope ->
369
+ when {
370
+ conversationsByTopic.containsKey(envelope.contentTopic) -> {
371
+ val conversation = conversationsByTopic[envelope.contentTopic]
372
+ val decoded = conversation?.decode(envelope)
373
+ decoded?.let { emit(it) }
374
+ }
375
+
376
+ envelope.contentTopic.startsWith(" /xmtp/0/invite-" ) -> {
377
+ val conversation = fromInvite(envelope)
378
+ conversationsByTopic[conversation.topic] = conversation
379
+ // Break so we can resubscribe with the new conversation
380
+ return @collect
381
+ }
382
+
383
+ envelope.contentTopic.startsWith(" /xmtp/0/intro-" ) -> {
384
+ val conversation = fromIntro(envelope)
385
+ conversationsByTopic[conversation.topic] = conversation
386
+ val decoded = conversation.decode(envelope)
387
+ emit(decoded)
388
+ // Break so we can resubscribe with the new conversation
389
+ return @collect
390
+ }
391
+ }
392
+ }
393
+ } catch (error: Exception ) {
394
+ throw error
380
395
}
381
396
}
382
397
}
0 commit comments