diff --git a/build.gradle b/build.gradle index 1a6ac04d..1beb8d7d 100644 --- a/build.gradle +++ b/build.gradle @@ -131,6 +131,7 @@ tasks.register("singleNodeTests", Test) { include("**/StreamsTests.class") include("**/PersistentSubscriptionsTests.class") include("**/TelemetryTests.class") + include("**/ConnectionShutdownTests.class") } } @@ -153,6 +154,7 @@ tasks.register("clusterTests", Test) { useJUnitPlatform { include("**/StreamsTests.class") include("**/PersistentSubscriptionsTests.class") + include("**/ConnectionShutdownTests.class") } } diff --git a/src/test/java/io/kurrent/dbclient/connection/ConnectionShutdownTests.java b/src/test/java/io/kurrent/dbclient/connection/ConnectionShutdownTests.java new file mode 100644 index 00000000..0afe060e --- /dev/null +++ b/src/test/java/io/kurrent/dbclient/connection/ConnectionShutdownTests.java @@ -0,0 +1,62 @@ +package io.kurrent.dbclient.connection; + +import io.kurrent.dbclient.*; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class ConnectionShutdownTests { + @Test + @Timeout(value = 1, unit = TimeUnit.MINUTES) + public void testDatabaseCleanupWithActiveSubscription() throws Throwable { + Database testDatabase = DatabaseFactory.spawn(); + KurrentDBClient client = testDatabase.defaultClient(); + + final AtomicInteger count = new AtomicInteger(0); + final AtomicInteger retryCount = new AtomicInteger(-1); + final AtomicBoolean cancellationReceived = new AtomicBoolean(false); + final CountDownLatch cancellationLatch = new CountDownLatch(1); + final AtomicReference<Throwable> reconnectError = new AtomicReference<>(); + + SubscriptionListener listener = new SubscriptionListener() { + @Override + public void onEvent(Subscription subscription, ResolvedEvent event) { + count.incrementAndGet(); + } + + @Override + public void onCancelled(Subscription subscription, Throwable throwable) { + cancellationReceived.set(true); + + retryCount.incrementAndGet(); + + try { + client.subscribeToAll(this).get(10, TimeUnit.SECONDS); + } catch (Throwable ex) { + reconnectError.set(ex); + } finally { + cancellationLatch.countDown(); + } + } + }; + + client.subscribeToAll(listener).get(); + + testDatabase.dispose(); + + boolean callbackReceived = cancellationLatch.await(30, TimeUnit.SECONDS); + Assertions.assertTrue(callbackReceived); + Assertions.assertTrue(cancellationReceived.get()); + Assertions.assertTrue(count.get() > 0); + Assertions.assertEquals(2, retryCount.get()); + + Throwable ex = reconnectError.get(); + Assertions.assertInstanceOf(ConnectionShutdownException.class, ex.getCause()); + } +}