diff --git a/build.bat b/build.bat new file mode 100644 index 0000000..7eb4fd2 --- /dev/null +++ b/build.bat @@ -0,0 +1,2 @@ +call nclean +call gradlew clean test diff --git a/src/main/java/io/nats/vertx/NatsStream.java b/src/main/java/io/nats/vertx/NatsStream.java index 9e309a3..b0b27c3 100644 --- a/src/main/java/io/nats/vertx/NatsStream.java +++ b/src/main/java/io/nats/vertx/NatsStream.java @@ -58,6 +58,16 @@ public interface NatsStream extends WriteStream { */ void publish(Message data, Handler> handler); + /** + * Subscribe to JetStream stream + * @param subject The subject of the stream. + * @param handler The message handler to listen to messages from the stream. + * @param autoAck Specify if message handler should auto acknowledge. + * @return future that returns status of subscription. + */ + Future subscribe( + String subject, Handler handler, boolean autoAck); + /** * Subscribe to JetStream stream * @param subject The subject of the stream. @@ -67,7 +77,7 @@ public interface NatsStream extends WriteStream { * @return future that returns status of subscription. */ Future subscribe( - String subject, Handler handler, boolean autoAck, PushSubscribeOptions so); + String subject, Handler handler, boolean autoAck, PushSubscribeOptions so); /** * Subscribe to JetStream stream diff --git a/src/main/java/io/nats/vertx/impl/NatsStreamImpl.java b/src/main/java/io/nats/vertx/impl/NatsStreamImpl.java index b423ab2..7c8f496 100644 --- a/src/main/java/io/nats/vertx/impl/NatsStreamImpl.java +++ b/src/main/java/io/nats/vertx/impl/NatsStreamImpl.java @@ -3,6 +3,7 @@ import io.nats.client.*; import io.nats.client.api.PublishAck; import io.nats.client.impl.Headers; +import io.nats.client.support.Debug; import io.nats.vertx.NatsStream; import io.nats.vertx.NatsVertxMessage; import io.nats.vertx.SubscriptionReadStream; @@ -175,6 +176,11 @@ public Future publish(String subject, Headers headers, byte[] body, return promise.future(); } + @Override + public Future subscribe(String subject, Handler handler, boolean autoAck) { + return subscribe(subject, handler, autoAck, null); + } + @Override public Future subscribe(String subject, Handler handler, boolean autoAck, PushSubscribeOptions so) { final Promise promise = context().promise(); @@ -186,6 +192,7 @@ public Future subscribe(String subject, Handler handler, dispatcherMap.put(subject, dispatcher); promise.complete(); } catch (Exception e) { + Debug.info("subscribe", e); handleException(promise, e); } }, false); @@ -216,7 +223,6 @@ public Future subscribe(final String subject, final Pull context().executeBlocking(evt -> { try { final JetStreamSubscription subscription = so != null ? js.subscribe(subject, so) : js.subscribe(subject); - final SubscriptionReadStream subscriptionReadStream = new SubscriptionReadStreamImpl(context(), subscription, exceptionHandler); subscriptionMap.put(subject, subscription); promise.complete(subscriptionReadStream); diff --git a/src/test/java/io/nats/vertx/NatsKeyValueTest.java b/src/test/java/io/nats/vertx/NatsKeyValueTest.java index d36dfca..1bab067 100644 --- a/src/test/java/io/nats/vertx/NatsKeyValueTest.java +++ b/src/test/java/io/nats/vertx/NatsKeyValueTest.java @@ -5,26 +5,70 @@ import io.nats.client.impl.NatsKeyValueWatchSubscription; import io.nats.client.support.NatsKeyValueUtil; import io.vertx.core.Future; -import org.junit.jupiter.api.*; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.ZoneId; import java.time.ZonedDateTime; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import static io.nats.client.api.KeyValueWatchOption.*; -import static io.nats.vertx.TestUtils2.*; +import static io.nats.vertx.TestUtils2.sleep; +import static io.nats.vertx.TestUtils2.unique; import static org.junit.jupiter.api.Assertions.*; public class NatsKeyValueTest { - public static final String META_KEY = "meta-test-key"; - public static final String META_VALUE = "meta-test-value"; + + // ---------------------------------------------------------------------------------------------------- + // Test Running + // ---------------------------------------------------------------------------------------------------- + static TestsRunner testRunner; + static int port; + + @AfterAll + public static void afterAll() throws Exception { + testRunner.close(); + } + + @BeforeAll + public static void beforeAll() throws Exception { + testRunner = TestsRunner.instance(); + port = testRunner.natsServerRunner.getPort(); + } + + @BeforeEach + public void beforeEach() throws Exception { + cleanupJs(); + } + + private static void cleanupJs() { + try { + JetStreamManagement jsm = testRunner.nc.jetStreamManagement(); + List streams = jsm.getStreamNames(); + for (String s : streams) + { + jsm.deleteStream(s); + } + } catch (Exception ignore) {} + } + + private static NatsClient getNatsClient() { + return TestUtils2.natsClient(port); + } + + // ---------------------------------------------------------------------------------------------------- @Test public void testWorkflow() throws Exception { @@ -39,14 +83,9 @@ public void testWorkflow() throws Exception { String stringValue1 = "String Value 1"; String stringValue2 = "String Value 2"; - // get the kv management context - Map metadata = new HashMap<>(); - metadata.put(META_KEY, META_VALUE); - - // create the bucket String desc = "desc" + unique(); KvProxy proxy = new KvProxy(b -> - b.description(desc).maxHistoryPerKey(3).metadata(metadata)); + b.description(desc).maxHistoryPerKey(3)); assertInitialStatus(proxy.status, proxy.bucket, desc); @@ -951,47 +990,8 @@ public void testMirrorSourceBuilderPrefixConversion() throws Exception { } // ---------------------------------------------------------------------------------------------------- - // UTILITIES + // Kv Proxy - Used to un-async calls since the tests are based on strict ordering of things // ---------------------------------------------------------------------------------------------------- - static TestsRunner testRunner; - static int port; - - @AfterAll - public static void afterAll() throws Exception { - testRunner.close(); - } - - @BeforeAll - public static void beforeAll() throws Exception { - testRunner = TestUtils2.startServer(); - port = testRunner.natsServerRunner.getPort(); - } - - @BeforeEach - public void beforeEach() throws Exception { - cleanupJs(); - } - - @AfterEach - public void afterEach() throws Exception { - cleanupJs(); - } - - private static void cleanupJs() { - try { - JetStreamManagement jsm = testRunner.nc.jetStreamManagement(); - List streams = jsm.getStreamNames(); - for (String s : streams) - { - jsm.deleteStream(s); - } - } catch (Exception ignore) {} - } - - private static NatsClient getNatsClient() { - return TestUtils2.natsClient(port); - } - static class KvProxy { final KeyValueManagement kvm; final String bucket; diff --git a/src/test/java/io/nats/vertx/TestUtils2.java b/src/test/java/io/nats/vertx/TestUtils2.java index dd11f07..f9f49db 100644 --- a/src/test/java/io/nats/vertx/TestUtils2.java +++ b/src/test/java/io/nats/vertx/TestUtils2.java @@ -1,16 +1,20 @@ package io.nats.vertx; -import io.nats.ConsoleOutput; -import io.nats.NatsServerRunner; -import io.nats.client.*; +import io.nats.client.JetStreamApiException; +import io.nats.client.JetStreamManagement; +import io.nats.client.NUID; +import io.nats.client.Options; +import io.nats.client.api.StorageType; +import io.nats.client.api.StreamConfiguration; +import io.nats.client.api.StreamInfo; import io.vertx.core.Future; import io.vertx.core.Handler; +import java.io.IOException; import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.logging.Level; public class TestUtils2 { @@ -18,6 +22,10 @@ public static String unique() { return new NUID().nextSequence(); } + public static String unique(String prefix) { + return prefix + unique(); + } + public static String unique(int i) { return unique() + i; } @@ -26,17 +34,6 @@ public static void sleep(long ms) { try { Thread.sleep(ms); } catch (InterruptedException ignored) { /* ignored */ } } - public static boolean waitUntilStatus(Connection conn, long millis, Connection.Status waitUntilStatus) { - long times = (millis + 99) / 100; - for (long x = 0; x < times; x++) { - sleep(100); - if (conn.getStatus() == waitUntilStatus) { - return true; - } - } - return false; - } - public static NatsStream jetStream(NatsClient natsClient) { return TestUtils.jetStream(natsClient); } @@ -49,7 +46,7 @@ public static NatsClient natsClient(int port, Handler exceptionHandle final NatsOptions natsOptions = new NatsOptions(); natsOptions.setExceptionHandler(exceptionHandler); natsOptions.setNatsBuilder(new Options.Builder().connectionTimeout(Duration.ofSeconds(5))); - natsOptions.getNatsBuilder().server("localhost:" + port); + natsOptions.getNatsBuilder().server("nats://localhost:" + port); final NatsClient natsClient = NatsClient.create(natsOptions); final Future connect = natsClient.connect(); @@ -77,61 +74,17 @@ public static NatsClient natsClient(int port, Handler exceptionHandle return natsClient; } - public static class TestsRunner { - NatsServerRunner natsServerRunner; - Connection nc; - int port; - - public TestsRunner(NatsServerRunner natsServerRunner) { - this.natsServerRunner = natsServerRunner; - port = natsServerRunner.getPort(); - } - - public void close() { - closeConnection(); - - try { - if (natsServerRunner != null) { - natsServerRunner.close(); - } - } - catch (Exception e) { - System.err.println("Closing Test Server Helper Runner: " + e); - } - } - - private void closeConnection() { - try { - if (nc != null) { - nc.close(); - } - } - catch (Exception e) { - System.err.println("Closing Test Server Helper Connection: " + e); - } - } - } - - public static TestsRunner startServer() throws Exception { - NatsServerRunner.setDefaultOutputSupplier(ConsoleOutput::new); - NatsServerRunner.setDefaultOutputLevel(Level.WARNING); - TestsRunner tr = new TestsRunner(new NatsServerRunner(-1, false, true)); - - for (int i = 0; i < 5; i++) { - Options.Builder builder = new Options.Builder() - .connectionTimeout(Duration.ofSeconds(5)) - .server("localhost:" + tr.port) - .errorListener(new ErrorListener() {}) - .connectionListener((conn, type) -> {}); - - Connection conn = Nats.connect(builder.build()); - if (waitUntilStatus(conn, 5000, Connection.Status.CONNECTED)) { - tr.nc = conn; - break; - } - conn.close(); + public static StreamInfo createStream(JetStreamManagement jsm, String streamName, String... subjects) throws IOException, JetStreamApiException { + try { + jsm.deleteStream(streamName); } - return tr; + catch (Exception ignore) {} + + StreamConfiguration sc = StreamConfiguration.builder() + .name(streamName) + .storageType(StorageType.Memory) + .subjects(subjects) + .build(); + return jsm.addStream(sc); } - } diff --git a/src/test/java/io/nats/vertx/TestsRunner.java b/src/test/java/io/nats/vertx/TestsRunner.java new file mode 100644 index 0000000..6009f7b --- /dev/null +++ b/src/test/java/io/nats/vertx/TestsRunner.java @@ -0,0 +1,78 @@ +package io.nats.vertx; + +import io.nats.ConsoleOutput; +import io.nats.NatsServerRunner; +import io.nats.client.*; + +import java.time.Duration; +import java.util.logging.Level; + +import static io.nats.vertx.TestUtils2.sleep; + +public class TestsRunner { + NatsServerRunner natsServerRunner; + int port; + Connection nc; + JetStreamManagement jsm; + JetStream js; + + public static TestsRunner instance() throws Exception { + NatsServerRunner.setDefaultOutputSupplier(ConsoleOutput::new); + NatsServerRunner.setDefaultOutputLevel(Level.WARNING); + TestsRunner tr = new TestsRunner(NatsServerRunner.builder().jetstream().build()); + + for (int i = 0; i < 5; i++) { + Options.Builder builder = new Options.Builder() + .connectionTimeout(Duration.ofSeconds(5)) + .server("nats://localhost:" + tr.port) + .errorListener(new ErrorListener() {}) + .connectionListener((conn, type) -> {}); + + Connection conn = Nats.connect(builder.build()); + if (waitUntilStatus(conn, 5000, Connection.Status.CONNECTED)) { + tr.nc = conn; + tr.jsm = conn.jetStreamManagement(); + tr.js = conn.jetStream(); + break; + } + conn.close(); + } + return tr; + } + + public static boolean waitUntilStatus(Connection conn, long millis, Connection.Status waitUntilStatus) { + long times = (millis + 99) / 100; + for (long x = 0; x < times; x++) { + sleep(100); + if (conn.getStatus() == waitUntilStatus) { + return true; + } + } + return false; + } + + private TestsRunner(NatsServerRunner natsServerRunner) { + this.natsServerRunner = natsServerRunner; + port = natsServerRunner.getPort(); + } + + public void close() { + try { + if (nc != null) { + nc.close(); + } + } + catch (Exception e) { + System.err.println("Closing Test Server Helper Connection: " + e); + } + + try { + if (natsServerRunner != null) { + natsServerRunner.close(); + } + } + catch (Exception e) { + System.err.println("Closing Test Server Helper Runner: " + e); + } + } +}