Skip to content

Commit 78893b8

Browse files
authored
Fix stream timeouts (#228)
* add a test and don't kill the stream prematurely * get it working even up to 10 minutes
1 parent f0f702f commit 78893b8

File tree

2 files changed

+40
-7
lines changed

2 files changed

+40
-7
lines changed

library/src/androidTest/java/org/xmtp/android/library/ConversationsTest.kt

+39-5
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import kotlinx.coroutines.Dispatchers
66
import kotlinx.coroutines.launch
77
import kotlinx.coroutines.runBlocking
88
import org.junit.Assert.assertEquals
9-
import org.junit.Ignore
109
import org.junit.Test
1110
import org.junit.runner.RunWith
1211
import org.xmtp.android.library.codecs.TextCodec
@@ -79,15 +78,15 @@ class ConversationsTest {
7978
}
8079

8180
@Test
82-
@Ignore("CI Issues")
8381
fun testStreamAllMessages() {
8482
val bo = PrivateKeyBuilder()
8583
val alix = PrivateKeyBuilder()
8684
val clientOptions =
8785
ClientOptions(api = ClientOptions.Api(env = XMTPEnvironment.LOCAL, isSecure = false))
8886
val boClient = Client().create(bo, clientOptions)
8987
val alixClient = Client().create(alix, clientOptions)
90-
val boConversation = runBlocking { boClient.conversations.newConversation(alixClient.address) }
88+
val boConversation =
89+
runBlocking { boClient.conversations.newConversation(alixClient.address) }
9190

9291
// Record message stream across all conversations
9392
val allMessages = mutableListOf<DecodedMessage>()
@@ -97,7 +96,8 @@ class ConversationsTest {
9796
alixClient.conversations.streamAllMessages().collect { message ->
9897
allMessages.add(message)
9998
}
100-
} catch (e: Exception) {}
99+
} catch (e: Exception) {
100+
}
101101
}
102102
sleep(2500)
103103

@@ -109,7 +109,8 @@ class ConversationsTest {
109109

110110
val caro = PrivateKeyBuilder()
111111
val caroClient = Client().create(caro, clientOptions)
112-
val caroConversation = runBlocking { caroClient.conversations.newConversation(alixClient.address) }
112+
val caroConversation =
113+
runBlocking { caroClient.conversations.newConversation(alixClient.address) }
113114
sleep(2500)
114115

115116
for (i in 0 until 5) {
@@ -138,4 +139,37 @@ class ConversationsTest {
138139

139140
assertEquals(allMessages.size, 15)
140141
}
142+
143+
@Test
144+
fun testStreamTimeOutsAllMessages() {
145+
val bo = PrivateKeyBuilder()
146+
val alix = PrivateKeyBuilder()
147+
val clientOptions =
148+
ClientOptions(api = ClientOptions.Api(env = XMTPEnvironment.LOCAL, isSecure = false))
149+
val boClient = Client().create(bo, clientOptions)
150+
val alixClient = Client().create(alix, clientOptions)
151+
val boConversation =
152+
runBlocking { boClient.conversations.newConversation(alixClient.address) }
153+
154+
// Record message stream across all conversations
155+
val allMessages = mutableListOf<DecodedMessage>()
156+
157+
val job = CoroutineScope(Dispatchers.IO).launch {
158+
try {
159+
alixClient.conversations.streamAllMessages().collect { message ->
160+
allMessages.add(message)
161+
}
162+
} catch (e: Exception) {
163+
}
164+
}
165+
sleep(2500)
166+
167+
runBlocking { boConversation.send(text = "first message") }
168+
sleep(2000)
169+
assertEquals(allMessages.size, 1)
170+
sleep(121000)
171+
runBlocking { boConversation.send(text = "second message") }
172+
sleep(2000)
173+
assertEquals(allMessages.size, 2)
174+
}
141175
}

library/src/main/java/org/xmtp/android/library/ApiClient.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ data class GRPCApiClient(
9595
"backoffMultiplier" to 2.0,
9696
"retryableStatusCodes" to listOf(
9797
"UNAVAILABLE",
98-
"CANCELLED",
9998
)
10099
)
101100
)
@@ -107,7 +106,7 @@ data class GRPCApiClient(
107106
environment.getValue(),
108107
if (environment == XMTPEnvironment.LOCAL) 5556 else 443
109108
).apply {
110-
keepAliveTime(30L, TimeUnit.SECONDS)
109+
keepAliveTime(3, TimeUnit.MINUTES)
111110
keepAliveTimeout(20L, TimeUnit.SECONDS)
112111
keepAliveWithoutCalls(true)
113112
if (environment != XMTPEnvironment.LOCAL) {

0 commit comments

Comments
 (0)