Skip to content

Commit b6f090b

Browse files
authored
fix broadcast group stream bug (#1613)
1 parent 458bdaf commit b6f090b

File tree

1 file changed

+15
-10
lines changed

1 file changed

+15
-10
lines changed

xmtp_mls/src/subscriptions/stream_conversations.rs

+15-10
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,23 @@ impl Stream for BroadcastGroupStream {
6161

6262
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
6363
use std::task::Poll::*;
64-
let this = self.project();
65-
if let Some(event) = ready!(this.inner.poll_next(cx)) {
66-
if let Some(group) =
67-
xmtp_common::optify!(event, "Missed messages due to event queue lag")
68-
.and_then(LocalEvents::group_filter)
69-
{
70-
Ready(Some(Ok(WelcomeOrGroup::Group(group))))
64+
let mut this = self.project();
65+
// loop until the inner stream returns:
66+
// - Ready with a group
67+
// - Ready(None) - stream ended
68+
// ignore None values, since it is not a group, but may indicate more values in the stream
69+
// itself
70+
loop {
71+
if let Some(event) = ready!(this.inner.as_mut().poll_next(cx)) {
72+
if let Some(group) =
73+
xmtp_common::optify!(event, "Missed messages due to event queue lag")
74+
.and_then(LocalEvents::group_filter)
75+
{
76+
return Ready(Some(Ok(WelcomeOrGroup::Group(group))));
77+
}
7178
} else {
72-
Pending
79+
return Ready(None);
7380
}
74-
} else {
75-
Ready(None)
7681
}
7782
}
7883
}

0 commit comments

Comments
 (0)