Skip to content

Commit

Permalink
in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Dec 10, 2024
1 parent a717604 commit 35c8219
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 124 deletions.
2 changes: 2 additions & 0 deletions build.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
call nclean
call gradlew clean test
12 changes: 11 additions & 1 deletion src/main/java/io/nats/vertx/NatsStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ public interface NatsStream extends WriteStream<Message> {
*/
void publish(Message data, Handler<AsyncResult<PublishAck>> handler);

/**
* Subscribe to JetStream stream
* @param subject The subject of the stream.
* @param handler The message handler to listen to messages from the stream.
* @param autoAck Specify if message handler should auto acknowledge.
* @return future that returns status of subscription.
*/
Future<Void> subscribe(
String subject, Handler<NatsVertxMessage> handler, boolean autoAck);

/**
* Subscribe to JetStream stream
* @param subject The subject of the stream.
Expand All @@ -67,7 +77,7 @@ public interface NatsStream extends WriteStream<Message> {
* @return future that returns status of subscription.
*/
Future<Void> subscribe(
String subject, Handler<NatsVertxMessage> handler, boolean autoAck, PushSubscribeOptions so);
String subject, Handler<NatsVertxMessage> handler, boolean autoAck, PushSubscribeOptions so);

/**
* Subscribe to JetStream stream
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/io/nats/vertx/impl/NatsStreamImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.nats.client.*;
import io.nats.client.api.PublishAck;
import io.nats.client.impl.Headers;
import io.nats.client.support.Debug;
import io.nats.vertx.NatsStream;
import io.nats.vertx.NatsVertxMessage;
import io.nats.vertx.SubscriptionReadStream;
Expand Down Expand Up @@ -175,6 +176,11 @@ public Future<PublishAck> publish(String subject, Headers headers, byte[] body,
return promise.future();
}

@Override
public Future<Void> subscribe(String subject, Handler<NatsVertxMessage> handler, boolean autoAck) {
return subscribe(subject, handler, autoAck, null);
}

@Override
public Future<Void> subscribe(String subject, Handler<NatsVertxMessage> handler, boolean autoAck, PushSubscribeOptions so) {
final Promise<Void> promise = context().promise();
Expand All @@ -186,6 +192,7 @@ public Future<Void> subscribe(String subject, Handler<NatsVertxMessage> handler,
dispatcherMap.put(subject, dispatcher);
promise.complete();
} catch (Exception e) {
Debug.info("subscribe", e);
handleException(promise, e);
}
}, false);
Expand Down Expand Up @@ -216,7 +223,6 @@ public Future<SubscriptionReadStream> subscribe(final String subject, final Pull
context().executeBlocking(evt -> {
try {
final JetStreamSubscription subscription = so != null ? js.subscribe(subject, so) : js.subscribe(subject);

final SubscriptionReadStream subscriptionReadStream = new SubscriptionReadStreamImpl(context(), subscription, exceptionHandler);
subscriptionMap.put(subject, subscription);
promise.complete(subscriptionReadStream);
Expand Down
102 changes: 51 additions & 51 deletions src/test/java/io/nats/vertx/NatsKeyValueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,70 @@
import io.nats.client.impl.NatsKeyValueWatchSubscription;
import io.nats.client.support.NatsKeyValueUtil;
import io.vertx.core.Future;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static io.nats.client.api.KeyValueWatchOption.*;
import static io.nats.vertx.TestUtils2.*;
import static io.nats.vertx.TestUtils2.sleep;
import static io.nats.vertx.TestUtils2.unique;
import static org.junit.jupiter.api.Assertions.*;

public class NatsKeyValueTest {
public static final String META_KEY = "meta-test-key";
public static final String META_VALUE = "meta-test-value";

// ----------------------------------------------------------------------------------------------------
// Test Running
// ----------------------------------------------------------------------------------------------------
static TestsRunner testRunner;
static int port;

@AfterAll
public static void afterAll() throws Exception {
testRunner.close();
}

@BeforeAll
public static void beforeAll() throws Exception {
testRunner = TestsRunner.instance();
port = testRunner.natsServerRunner.getPort();
}

@BeforeEach
public void beforeEach() throws Exception {
cleanupJs();
}

private static void cleanupJs() {
try {
JetStreamManagement jsm = testRunner.nc.jetStreamManagement();
List<String> streams = jsm.getStreamNames();
for (String s : streams)
{
jsm.deleteStream(s);
}
} catch (Exception ignore) {}
}

private static NatsClient getNatsClient() {
return TestUtils2.natsClient(port);
}

// ----------------------------------------------------------------------------------------------------

@Test
public void testWorkflow() throws Exception {
Expand All @@ -39,14 +83,9 @@ public void testWorkflow() throws Exception {
String stringValue1 = "String Value 1";
String stringValue2 = "String Value 2";

// get the kv management context
Map<String, String> metadata = new HashMap<>();
metadata.put(META_KEY, META_VALUE);

// create the bucket
String desc = "desc" + unique();
KvProxy proxy = new KvProxy(b ->
b.description(desc).maxHistoryPerKey(3).metadata(metadata));
b.description(desc).maxHistoryPerKey(3));

assertInitialStatus(proxy.status, proxy.bucket, desc);

Expand Down Expand Up @@ -951,47 +990,8 @@ public void testMirrorSourceBuilderPrefixConversion() throws Exception {
}

// ----------------------------------------------------------------------------------------------------
// UTILITIES
// Kv Proxy - Used to un-async calls since the tests are based on strict ordering of things
// ----------------------------------------------------------------------------------------------------
static TestsRunner testRunner;
static int port;

@AfterAll
public static void afterAll() throws Exception {
testRunner.close();
}

@BeforeAll
public static void beforeAll() throws Exception {
testRunner = TestUtils2.startServer();
port = testRunner.natsServerRunner.getPort();
}

@BeforeEach
public void beforeEach() throws Exception {
cleanupJs();
}

@AfterEach
public void afterEach() throws Exception {
cleanupJs();
}

private static void cleanupJs() {
try {
JetStreamManagement jsm = testRunner.nc.jetStreamManagement();
List<String> streams = jsm.getStreamNames();
for (String s : streams)
{
jsm.deleteStream(s);
}
} catch (Exception ignore) {}
}

private static NatsClient getNatsClient() {
return TestUtils2.natsClient(port);
}

static class KvProxy {
final KeyValueManagement kvm;
final String bucket;
Expand Down
95 changes: 24 additions & 71 deletions src/test/java/io/nats/vertx/TestUtils2.java
Original file line number Diff line number Diff line change
@@ -1,23 +1,31 @@
package io.nats.vertx;

import io.nats.ConsoleOutput;
import io.nats.NatsServerRunner;
import io.nats.client.*;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.NUID;
import io.nats.client.Options;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import io.vertx.core.Future;
import io.vertx.core.Handler;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;

public class TestUtils2 {

public static String unique() {
return new NUID().nextSequence();
}

public static String unique(String prefix) {
return prefix + unique();
}

public static String unique(int i) {
return unique() + i;
}
Expand All @@ -26,17 +34,6 @@ public static void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException ignored) { /* ignored */ }
}

public static boolean waitUntilStatus(Connection conn, long millis, Connection.Status waitUntilStatus) {
long times = (millis + 99) / 100;
for (long x = 0; x < times; x++) {
sleep(100);
if (conn.getStatus() == waitUntilStatus) {
return true;
}
}
return false;
}

public static NatsStream jetStream(NatsClient natsClient) {
return TestUtils.jetStream(natsClient);
}
Expand All @@ -49,7 +46,7 @@ public static NatsClient natsClient(int port, Handler<Throwable> exceptionHandle
final NatsOptions natsOptions = new NatsOptions();
natsOptions.setExceptionHandler(exceptionHandler);
natsOptions.setNatsBuilder(new Options.Builder().connectionTimeout(Duration.ofSeconds(5)));
natsOptions.getNatsBuilder().server("localhost:" + port);
natsOptions.getNatsBuilder().server("nats://localhost:" + port);

final NatsClient natsClient = NatsClient.create(natsOptions);
final Future<Void> connect = natsClient.connect();
Expand Down Expand Up @@ -77,61 +74,17 @@ public static NatsClient natsClient(int port, Handler<Throwable> exceptionHandle
return natsClient;
}

public static class TestsRunner {
NatsServerRunner natsServerRunner;
Connection nc;
int port;

public TestsRunner(NatsServerRunner natsServerRunner) {
this.natsServerRunner = natsServerRunner;
port = natsServerRunner.getPort();
}

public void close() {
closeConnection();

try {
if (natsServerRunner != null) {
natsServerRunner.close();
}
}
catch (Exception e) {
System.err.println("Closing Test Server Helper Runner: " + e);
}
}

private void closeConnection() {
try {
if (nc != null) {
nc.close();
}
}
catch (Exception e) {
System.err.println("Closing Test Server Helper Connection: " + e);
}
}
}

public static TestsRunner startServer() throws Exception {
NatsServerRunner.setDefaultOutputSupplier(ConsoleOutput::new);
NatsServerRunner.setDefaultOutputLevel(Level.WARNING);
TestsRunner tr = new TestsRunner(new NatsServerRunner(-1, false, true));

for (int i = 0; i < 5; i++) {
Options.Builder builder = new Options.Builder()
.connectionTimeout(Duration.ofSeconds(5))
.server("localhost:" + tr.port)
.errorListener(new ErrorListener() {})
.connectionListener((conn, type) -> {});

Connection conn = Nats.connect(builder.build());
if (waitUntilStatus(conn, 5000, Connection.Status.CONNECTED)) {
tr.nc = conn;
break;
}
conn.close();
public static StreamInfo createStream(JetStreamManagement jsm, String streamName, String... subjects) throws IOException, JetStreamApiException {
try {
jsm.deleteStream(streamName);
}
return tr;
catch (Exception ignore) {}

StreamConfiguration sc = StreamConfiguration.builder()
.name(streamName)
.storageType(StorageType.Memory)
.subjects(subjects)
.build();
return jsm.addStream(sc);
}

}
Loading

0 comments on commit 35c8219

Please sign in to comment.