Skip to content

Commit 36665ae

Browse files
authoredApr 16, 2025
chore: improve test coverage for connection shutdown (#320)
1 parent 5f09768 commit 36665ae

File tree

2 files changed

+64
-0
lines changed

2 files changed

+64
-0
lines changed
 

‎build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ tasks.register("singleNodeTests", Test) {
131131
include("**/StreamsTests.class")
132132
include("**/PersistentSubscriptionsTests.class")
133133
include("**/TelemetryTests.class")
134+
include("**/ConnectionShutdownTests.class")
134135
}
135136
}
136137

@@ -153,6 +154,7 @@ tasks.register("clusterTests", Test) {
153154
useJUnitPlatform {
154155
include("**/StreamsTests.class")
155156
include("**/PersistentSubscriptionsTests.class")
157+
include("**/ConnectionShutdownTests.class")
156158
}
157159
}
158160

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package io.kurrent.dbclient.connection;
2+
3+
import io.kurrent.dbclient.*;
4+
import org.junit.jupiter.api.Assertions;
5+
import org.junit.jupiter.api.Test;
6+
import org.junit.jupiter.api.Timeout;
7+
8+
import java.util.concurrent.CountDownLatch;
9+
import java.util.concurrent.TimeUnit;
10+
import java.util.concurrent.atomic.AtomicBoolean;
11+
import java.util.concurrent.atomic.AtomicInteger;
12+
import java.util.concurrent.atomic.AtomicReference;
13+
14+
public class ConnectionShutdownTests {
15+
@Test
16+
@Timeout(value = 1, unit = TimeUnit.MINUTES)
17+
public void testDatabaseCleanupWithActiveSubscription() throws Throwable {
18+
Database testDatabase = DatabaseFactory.spawn();
19+
KurrentDBClient client = testDatabase.defaultClient();
20+
21+
final AtomicInteger count = new AtomicInteger(0);
22+
final AtomicInteger retryCount = new AtomicInteger(-1);
23+
final AtomicBoolean cancellationReceived = new AtomicBoolean(false);
24+
final CountDownLatch cancellationLatch = new CountDownLatch(1);
25+
final AtomicReference<Throwable> reconnectError = new AtomicReference<>();
26+
27+
SubscriptionListener listener = new SubscriptionListener() {
28+
@Override
29+
public void onEvent(Subscription subscription, ResolvedEvent event) {
30+
count.incrementAndGet();
31+
}
32+
33+
@Override
34+
public void onCancelled(Subscription subscription, Throwable throwable) {
35+
cancellationReceived.set(true);
36+
37+
retryCount.incrementAndGet();
38+
39+
try {
40+
client.subscribeToAll(this).get(10, TimeUnit.SECONDS);
41+
} catch (Throwable ex) {
42+
reconnectError.set(ex);
43+
} finally {
44+
cancellationLatch.countDown();
45+
}
46+
}
47+
};
48+
49+
client.subscribeToAll(listener).get();
50+
51+
testDatabase.dispose();
52+
53+
boolean callbackReceived = cancellationLatch.await(30, TimeUnit.SECONDS);
54+
Assertions.assertTrue(callbackReceived);
55+
Assertions.assertTrue(cancellationReceived.get());
56+
Assertions.assertTrue(count.get() > 0);
57+
Assertions.assertEquals(2, retryCount.get());
58+
59+
Throwable ex = reconnectError.get();
60+
Assertions.assertInstanceOf(ConnectionShutdownException.class, ex.getCause());
61+
}
62+
}

0 commit comments

Comments
 (0)