diff --git a/README.md b/README.md index ae3d0b0..657a16e 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ and any optional configuration options. Here is an example: ```java // Set options final NatsOptions natsOptions = new NatsOptions(); - natsOptions.getNatsBuilder().servers(new String[]{"localhost:" + port}); + natsOptions.getNatsBuilder().server("localhost:" + port); // Create client final NatsClient natsClient = NatsClient.create(natsOptions); diff --git a/build.gradle b/build.gradle index 29aa2e7..2946dc4 100644 --- a/build.gradle +++ b/build.gradle @@ -26,6 +26,9 @@ java { repositories { mavenCentral() + maven { + url "https://oss.sonatype.org/content/repositories/releases" + } maven { url "https://oss.sonatype.org/content/repositories/snapshots" } @@ -33,14 +36,14 @@ repositories { dependencies { - implementation 'io.nats:jnats:2.17.2' + implementation 'io.nats:jnats:2.20.5' implementation("com.fasterxml.jackson.core:jackson-core:2.14.2") implementation("io.netty:netty-handler:4.1.97.Final") implementation(platform("io.vertx:vertx-stack-depchain:4.5.1")) implementation("io.vertx:vertx-core") implementation("io.netty:netty-resolver-dns-native-macos:4.1.80.Final:osx-x86_64") testImplementation("io.vertx:vertx-junit5") - testImplementation 'io.nats:jnats-server-runner:1.2.5' + testImplementation 'io.nats:jnats-server-runner:2.0.0' } test { diff --git a/src/main/java/io/nats/client/impl/VertxDispatcher.java b/src/main/java/io/nats/client/impl/VertxDispatcher.java index ab95e3a..b98a1aa 100644 --- a/src/main/java/io/nats/client/impl/VertxDispatcher.java +++ b/src/main/java/io/nats/client/impl/VertxDispatcher.java @@ -8,7 +8,7 @@ public class VertxDispatcher extends NatsDispatcher { VertxDispatcher(NatsConnection conn, MessageHandler handler, ContextInternal context) { super(conn, handler); - vertxIncoming = new VertxDispatcherMessageQueue(this, context); + vertxIncoming = new VertxDispatcherMessageQueue(this, context, conn); } @Override diff --git a/src/main/java/io/nats/client/impl/VertxDispatcherMessageQueue.java b/src/main/java/io/nats/client/impl/VertxDispatcherMessageQueue.java index 749bd0d..ea45d9e 100644 --- a/src/main/java/io/nats/client/impl/VertxDispatcherMessageQueue.java +++ b/src/main/java/io/nats/client/impl/VertxDispatcherMessageQueue.java @@ -10,8 +10,8 @@ public class VertxDispatcherMessageQueue extends MessageQueue { private final VertxDispatcher dispatcher; private final ContextInternal context; - VertxDispatcherMessageQueue(VertxDispatcher dispatcher, ContextInternal context) { - super(true); + VertxDispatcherMessageQueue(VertxDispatcher dispatcher, ContextInternal context, NatsConnection conn) { + super(true, conn.getOptions().getRequestCleanupInterval()); this.dispatcher = dispatcher; this.context = context; } @@ -65,11 +65,6 @@ void poisonTheQueue() { throw new IllegalStateException("poisonTheQueue not used."); } - @Override - boolean offer(NatsMessage msg) { - throw new IllegalStateException("offer not used."); - } - @Override NatsMessage poll(Duration timeout) throws InterruptedException { return super.poll(timeout); diff --git a/src/main/java/io/nats/vertx/NatsClient.java b/src/main/java/io/nats/vertx/NatsClient.java index a089039..be30342 100644 --- a/src/main/java/io/nats/vertx/NatsClient.java +++ b/src/main/java/io/nats/vertx/NatsClient.java @@ -4,7 +4,9 @@ import io.nats.client.*; import io.nats.client.impl.Headers; import io.nats.vertx.impl.NatsClientImpl; -import io.vertx.core.*; +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; import io.vertx.core.streams.WriteStream; import java.time.Duration; @@ -42,6 +44,21 @@ static NatsClient create(final NatsOptions natsOptions) { */ Future jetStream(JetStreamOptions options); + /** + * Get interface to Key Value. + * @param bucketName the bucket name + * @return Key Value instance + */ + Future keyValue(String bucketName); + + /** + * Get interface to Key Value. + * @param bucketName the bucket name + * @param options KeyValue options. + * @return Key Value instance + */ + Future keyValue(String bucketName, KeyValueOptions options); + /** * Drain handler. * @param handler the handler @@ -137,9 +154,6 @@ static NatsClient create(final NatsOptions natsOptions) { */ Future request(String subject, byte[] message); - - - /** * Send a request. The returned future will be completed when the * response comes back. @@ -151,7 +165,6 @@ static NatsClient create(final NatsOptions natsOptions) { */ Future request(String subject, Headers headers, byte[] body); - /** * Send a request. The returned future will be completed when the * response comes back. @@ -164,7 +177,6 @@ static NatsClient create(final NatsOptions natsOptions) { */ Future requestWithTimeout(String subject, Headers headers, byte[] body, Duration timeout); - /** * * Send request. @@ -199,7 +211,6 @@ static NatsClient create(final NatsOptions natsOptions) { */ Future request(String subject, byte[] message, Duration timeout); - /** * Send a message to the specified subject. The message body will * not be copied. The expected usage with string content is something @@ -259,7 +270,6 @@ static NatsClient create(final NatsOptions natsOptions) { */ Future subscribe(String subject, Handler handler); - /** * * Subscribe to subject. @@ -290,6 +300,4 @@ static NatsClient create(final NatsOptions natsOptions) { default Future end() { return close(); } - - } diff --git a/src/main/java/io/nats/vertx/NatsKeyValue.java b/src/main/java/io/nats/vertx/NatsKeyValue.java new file mode 100644 index 0000000..56f3c43 --- /dev/null +++ b/src/main/java/io/nats/vertx/NatsKeyValue.java @@ -0,0 +1,314 @@ +package io.nats.vertx; + +import io.nats.client.JetStreamApiException; +import io.nats.client.api.*; +import io.nats.client.impl.NatsKeyValueWatchSubscription; +import io.vertx.core.Future; +import io.vertx.core.streams.StreamBase; + +import java.io.IOException; +import java.util.List; + +/** + * Provides a Vert.x WriteStream interface with Futures and Promises. + */ +public interface NatsKeyValue extends StreamBase { + + /** + * Get the name of the bucket. + * @return the name + */ + String getBucketName(); + + /** + * Get the entry for a key + * when the key exists and is live (not deleted and not purged) + * @param key the key + * @return the KvEntry object or null if not found. + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws IllegalArgumentException the server is not JetStream enabled + */ + Future get(String key); + + /** + * Get the specific revision of an entry for a key + * when the key exists and is live (not deleted and not purged) + * @param key the key + * @param revision the revision + * @return the KvEntry object or null if not found. + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws IllegalArgumentException the server is not JetStream enabled + */ + Future get(String key, long revision); + + /** + * Put a byte[] as the value for a key + * @param key the key + * @param value the bytes of the value + * @return the revision number for the key + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws IllegalArgumentException the server is not JetStream enabled + */ + Future put(String key, byte[] value); + + /** + * Put a string as the value for a key + * @param key the key + * @param value the UTF-8 string + * @return the revision number for the key + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws IllegalArgumentException the server is not JetStream enabled + */ + Future put(String key, String value); + + /** + * Put a long as the value for a key + * @param key the key + * @param value the number + * @return the revision number for the key + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws IllegalArgumentException the server is not JetStream enabled + */ + Future put(String key, Number value); + + /** + * Put as the value for a key iff the key does not exist (there is no history) + * or is deleted (history shows the key is deleted) + * @param key the key + * @param value the bytes of the value + * @return the revision number for the key + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws IllegalArgumentException the server is not JetStream enabled + */ + Future create(String key, byte[] value); + + /** + * Put as the value for a key iff the key exists and its last revision matches the expected + * @param key the key + * @param value the bytes of the value + * @param expectedRevision the expected last revision + * @return the revision number for the key + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws IllegalArgumentException the server is not JetStream enabled + */ + Future update(String key, byte[] value, long expectedRevision); + + /** + * Put a string as the value for a key iff the key exists and its last revision matches the expected + * @param key the key + * @param value the UTF-8 string + * @param expectedRevision the expected last revision + * @return the revision number for the key + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws IllegalArgumentException the server is not JetStream enabled + */ + Future update(String key, String value, long expectedRevision); + + /** + * Soft deletes the key by placing a delete marker. + * @param key the key + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + Future delete(String key); + + /** + * Soft deletes the key by placing a delete marker iff the key exists and its last revision matches the expected + * @param key the key + * @param expectedRevision the expected last revision + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + Future delete(String key, long expectedRevision); + + /** + * Purge all values/history from the specific key + * @param key the key + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + Future purge(String key); + + /** + * Purge all values/history from the specific key iff the key exists and its last revision matches the expected + * @param key the key + * @param expectedRevision the expected last revision + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + Future purge(String key, long expectedRevision); + + /** + * Watch updates for a specific key. + * @param key the key. + * @param watcher the watcher the implementation to receive changes + * @param watchOptions the watch options to apply. If multiple conflicting options are supplied, the last options wins. + * @return The KeyValueWatchSubscription + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws InterruptedException if the thread is interrupted + */ + NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions); + + /** + * Watch updates for a specific key, starting at a specific revision. + * @param key the key. + * @param watcher the watcher the implementation to receive changes + * @param fromRevision the revision to start from + * @param watchOptions the watch options to apply. If multiple conflicting options are supplied, the last options wins. + * @return The KeyValueWatchSubscription + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws InterruptedException if the thread is interrupted + */ + NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions); + + /** + * Watch updates for specific keys. + * @param keys the keys + * @param watcher the watcher the implementation to receive changes + * @param watchOptions the watch options to apply. If multiple conflicting options are supplied, the last options wins. + * @return The KeyValueWatchSubscription + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws InterruptedException if the thread is interrupted + */ + NatsKeyValueWatchSubscription watch(List keys, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions); + + /** + * Watch updates for specific keys, starting at a specific revision. + * @param keys the keys + * @param watcher the watcher the implementation to receive changes + * @param fromRevision the revision to start from + * @param watchOptions the watch options to apply. If multiple conflicting options are supplied, the last options wins. + * @return The KeyValueWatchSubscription + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws InterruptedException if the thread is interrupted + */ + NatsKeyValueWatchSubscription watch(List keys, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions); + + /** + * Watch updates for all keys. + * @param watcher the watcher the implementation to receive changes + * @param watchOptions the watch options to apply. If multiple conflicting options are supplied, the last options wins. + * @return The KeyValueWatchSubscription + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws InterruptedException if the thread is interrupted + */ + NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, KeyValueWatchOption... watchOptions); + + /** + * Watch updates for all keys starting from a specific revision + * @param watcher the watcher the implementation to receive changes + * @param fromRevision the revision to start from + * @param watchOptions the watch options to apply. If multiple conflicting options are supplied, the last options wins. + * @return The KeyValueWatchSubscription + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws InterruptedException if the thread is interrupted + */ + NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions); + + /** + * Get a list of the keys in a bucket. + * @return List of keys + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws InterruptedException if the thread is interrupted + */ + Future> keys(); + + /** + * Get a list of the keys in a bucket filtered by a + * subject-like string, for instance "key" or "key.foo.*" or "key.>" + * @param filter the subject like key filter + * @return List of keys + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws InterruptedException if the thread is interrupted + */ + Future> keys(String filter); + + /** + * Get a list of the keys in a bucket filtered by + * subject-like strings, for instance "aaa.*", "bbb.*;" + * @param filters the subject like key filters + * @return List of keys + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws InterruptedException if the thread is interrupted + */ + Future> keys(List filters); + + /** + * Get the history (list of KeyValueEntry) for a key + * @param key the key + * @return List of KvEntry + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws InterruptedException if the thread is interrupted + */ + Future> history(String key); + + /** + * Remove history from all keys that currently are deleted or purged + * with using a default KeyValuePurgeOptions + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws InterruptedException if the thread is interrupted + */ + Future purgeDeletes(); + + /** + * Remove history from all keys that currently are deleted or purged, considering options. + * @param options the purge options + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws InterruptedException if the thread is interrupted + */ + Future purgeDeletes(KeyValuePurgeOptions options); + + /** + * Get the KeyValueStatus object + * @return the status object + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + * @throws InterruptedException if the thread is interrupted + */ + Future getStatus(); +} diff --git a/src/main/java/io/nats/vertx/NatsStream.java b/src/main/java/io/nats/vertx/NatsStream.java index abf4ee2..9e309a3 100644 --- a/src/main/java/io/nats/vertx/NatsStream.java +++ b/src/main/java/io/nats/vertx/NatsStream.java @@ -172,7 +172,6 @@ Future subscribe( */ Future publish(String subject, Headers headers, byte[] body, PublishOptions options); - /** * Retrieve a message from the subscription. * @param subject subject The subject for the subscription. @@ -223,5 +222,4 @@ default Future> fetch(final String subject, final int bat default Future> iterate(final String subject, final int batchSize, final Duration maxWait) { return iterate(subject, batchSize, maxWait.toMillis()); } - } diff --git a/src/main/java/io/nats/vertx/impl/NatsClientImpl.java b/src/main/java/io/nats/vertx/impl/NatsClientImpl.java index 325bb11..13011bc 100644 --- a/src/main/java/io/nats/vertx/impl/NatsClientImpl.java +++ b/src/main/java/io/nats/vertx/impl/NatsClientImpl.java @@ -4,6 +4,7 @@ import io.nats.client.impl.Headers; import io.nats.client.impl.VertxDispatcherFactory; import io.nats.vertx.NatsClient; +import io.nats.vertx.NatsKeyValue; import io.nats.vertx.NatsOptions; import io.nats.vertx.NatsStream; import io.vertx.core.*; @@ -137,33 +138,40 @@ private void runFlush() { */ @Override public Future jetStream() { + return jetStream(null); + } + + /** + * Return new JetStream stream instance. + * @param jso JetStream options. + * @return JetStream. + */ + @Override + public Future jetStream(final JetStreamOptions jso) { final Promise promise = context().promise(); + context().executeBlocking(event -> { try { - - final JetStream jetStream = connection.get().jetStream(); - promise.complete(new NatsStreamImpl(jetStream, this.connection.get(), vertx, exceptionHandler.get())); + promise.complete(new NatsStreamImpl(connection.get(), vertx, exceptionHandler.get(), jso)); } catch (Exception e) { handleException(promise, e); } - }, false); return promise.future(); } - /** - * Return new JetStream stream instance. - * @param options JetStream options. - * @return JetStream. - */ @Override - public Future jetStream(final JetStreamOptions options) { - final Promise promise = context().promise(); + public Future keyValue(String bucketName) { + return keyValue(bucketName, null); + } + @Override + public Future keyValue(String bucketName, KeyValueOptions kvo) { + final Promise promise = context().promise(); context().executeBlocking(event -> { try { - final JetStream jetStream = connection.get().jetStream(options); - promise.complete(new NatsStreamImpl(jetStream, this.connection.get(), vertx, exceptionHandler.get())); + promise.complete( + new NatsKeyValueImpl(connection.get(), vertx, exceptionHandler.get(), bucketName, kvo)); } catch (Exception e) { handleException(promise, e); } @@ -274,7 +282,6 @@ public Future publish(String subject, String replyTo, String message) { return this.publish(subject, replyTo, message.getBytes(StandardCharsets.UTF_8)); } - @Override public Future publish(String subject, String replyTo, byte[] message) { final Promise promise = context().promise(); diff --git a/src/main/java/io/nats/vertx/impl/NatsImpl.java b/src/main/java/io/nats/vertx/impl/NatsImpl.java new file mode 100644 index 0000000..08f4802 --- /dev/null +++ b/src/main/java/io/nats/vertx/impl/NatsImpl.java @@ -0,0 +1,121 @@ +package io.nats.vertx.impl; + +import io.nats.client.*; +import io.nats.client.api.AckPolicy; +import io.nats.client.api.ConsumerConfiguration; +import io.nats.client.api.DeliverPolicy; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.impl.ContextInternal; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +/** + * NATS base implementation + */ +public class NatsImpl { + + protected final Vertx vertx; + + protected final ConcurrentHashMap dispatcherMap = new ConcurrentHashMap<>(); + protected final ConcurrentHashMap subscriptionMap = new ConcurrentHashMap<>(); + + protected final Connection conn; + protected final Duration timeout; + protected final JetStreamOptions jso; + protected final JetStreamManagement jsm; + protected final JetStream js; + + protected final AtomicReference> exceptionHandler = new AtomicReference<>(); + + protected NatsImpl(final Connection conn, + final Vertx vertx, + final Handler exceptionHandler, + final JetStreamOptions jso) + { + this.conn = conn; + this.timeout = jso == null || jso.getRequestTimeout() == null ? conn.getOptions().getConnectionTimeout() : jso.getRequestTimeout(); + this.jso = JetStreamOptions.builder(jso).requestTimeout(this.timeout).build(); + + try { + this.jsm = conn.jetStreamManagement(this.jso); + this.js = jsm.jetStream(); + } + catch (IOException e) { + if (exceptionHandler != null) { + exceptionHandler.handle(e); + } + throw new RuntimeException(e); + } + this.vertx = vertx; + this.exceptionHandler.set(exceptionHandler); + + } + + protected ContextInternal context() { + return (ContextInternal) vertx.getOrCreateContext(); + } + + protected void endImpl(Handler> handler) { + //No Op + final Promise promise = context().promise(); + handler.handle(promise.future()); + } + + protected void handleException(Promise promise, Exception e) { + promise.fail(e); + exceptionHandler.get().handle(e); + } + + protected void visitSubject(String streamName, String subject, DeliverPolicy deliverPolicy, boolean headersOnly, boolean ordered, MessageHandler handler) throws IOException, JetStreamApiException { + visitSubject(streamName, Collections.singletonList(subject), deliverPolicy, headersOnly, ordered, handler); + } + + protected void visitSubject(String streamName, List subjects, DeliverPolicy deliverPolicy, boolean headersOnly, boolean ordered, MessageHandler handler) throws IOException, JetStreamApiException { + ConsumerConfiguration.Builder ccb = ConsumerConfiguration.builder() + .ackPolicy(AckPolicy.None) + .deliverPolicy(deliverPolicy) + .headersOnly(headersOnly) + .filterSubjects(subjects); + + PushSubscribeOptions pso = PushSubscribeOptions.builder() + .stream(streamName) + .ordered(ordered) + .configuration(ccb.build()) + .build(); + + JetStreamSubscription sub = js.subscribe(null, pso); + try { + boolean lastWasNull = false; + long pending = sub.getConsumerInfo().getCalculatedPending(); + while (pending > 0) { // no need to loop if nothing pending + Message m = sub.nextMessage(timeout); + if (m == null) { + if (lastWasNull) { + return; // two timeouts in a row is enough + } + lastWasNull = true; + } + else { + handler.onMessage(m); + if (--pending == 0) { + return; + } + lastWasNull = false; + } + } + } catch (Exception e) { + exceptionHandler.get().handle(e); + } + finally { + sub.unsubscribe(); + } + } +} diff --git a/src/main/java/io/nats/vertx/impl/NatsKeyValueImpl.java b/src/main/java/io/nats/vertx/impl/NatsKeyValueImpl.java new file mode 100644 index 0000000..4569498 --- /dev/null +++ b/src/main/java/io/nats/vertx/impl/NatsKeyValueImpl.java @@ -0,0 +1,441 @@ +package io.nats.vertx.impl; + +import io.nats.client.*; +import io.nats.client.api.*; +import io.nats.client.impl.Headers; +import io.nats.client.impl.NatsKeyValueWatchSubscription; +import io.nats.client.impl.NatsMessage; +import io.nats.client.support.DateTimeUtils; +import io.nats.client.support.Debug; +import io.nats.client.support.Validator; +import io.nats.vertx.NatsKeyValue; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static io.nats.client.support.NatsConstants.GREATER_THAN; +import static io.nats.client.support.NatsJetStreamConstants.*; +import static io.nats.client.support.NatsKeyValueUtil.*; +import static io.nats.client.support.Validator.validateNonWildcardKvKeyRequired; + + +/** + * NATS Client implementation. + */ +public class NatsKeyValueImpl extends NatsImpl implements NatsKeyValue { + + // JNats KeyValue parallel variables + private final String bucketName; + private String streamName; + private final String streamSubject; + private final String readPrefix; + private final String writePrefix; + + /** + * Create instance + * + * @param conn Nats connection + * @param vertx vertx + * @param exceptionHandler + * @param bucketName + * @param kvo keyValueOptions also contains jetStreamOptions use to make jetstream/management implementations + */ + public NatsKeyValueImpl(final Connection conn, + final Vertx vertx, + final Handler exceptionHandler, + final String bucketName, + final KeyValueOptions kvo) + { + super(conn, vertx, exceptionHandler, kvo == null ? null : kvo.getJetStreamOptions()); + + this.bucketName = Validator.validateBucketName(bucketName, true); + streamName = toStreamName(bucketName); + streamSubject = toStreamSubject(bucketName); + readPrefix = toKeyPrefix(bucketName); + + if (kvo == null || kvo.getJetStreamOptions().isDefaultPrefix()) { + writePrefix = readPrefix; + } + else { + writePrefix = kvo.getJetStreamOptions().getPrefix() + readPrefix; + } + } + + @Override + public NatsKeyValueImpl exceptionHandler(Handler handler) { + exceptionHandler.set(handler); + return this; + } + + String readSubject(String key) { + return readPrefix + key; + } + + String writeSubject(String key) { + return writePrefix + key; + } + + /** + * {@inheritDoc} + */ + @Override + public String getBucketName() { + return bucketName; + } + + /** + * {@inheritDoc} + */ + @Override + public Future get(String key) { + return _getFuture(key, null, true); // null indicates get last, not get revision + } + + /** + * {@inheritDoc} + */ + @Override + public Future get(String key, long revision) { + return _getFuture(key, revision, true); + } + + @Override + public Future put(String key, byte[] value) { + return publishReturnSequence(key, value, null); + } + + @Override + public Future put(String key, String value) { + return publishReturnSequence(key, value.getBytes(StandardCharsets.UTF_8), null); + } + + @Override + public Future put(String key, Number value) { + return publishReturnSequence(key, value.toString().getBytes(StandardCharsets.US_ASCII), null); + } + + @Override + public Future create(String key, byte[] value) { + final Promise promise = context().promise(); + context().executeBlocking(event -> { + try { + try { + Headers h = new Headers().add(EXPECTED_LAST_SUB_SEQ_HDR, Long.toString(0)); + PublishAck pa = _publish(key, value, h); + promise.complete(pa.getSeqno()); + } + catch (JetStreamApiException e) { + if (e.getApiErrorCode() == JS_WRONG_LAST_SEQUENCE) { + // must check if the last message for this subject is a delete or purge + KeyValueEntry kve = _getLastEntry(key, false); + if (kve != null && kve.getOperation() != KeyValueOperation.PUT) { + Headers h = new Headers().add(EXPECTED_LAST_SUB_SEQ_HDR, Long.toString(kve.getRevision())); + PublishAck pa = _publish(key, value, h); + promise.complete(pa.getSeqno()); + return; + } + } + promise.fail(e); + } + } catch (Exception e) { + Debug.info("CR EX", e); + handleException(promise, e); + } + }, false); + return promise.future(); + } + + @Override + public Future update(String key, byte[] value, long expectedRevision) { + Headers h = new Headers().add(EXPECTED_LAST_SUB_SEQ_HDR, Long.toString(expectedRevision)); + return publishReturnSequence(key, value, h); + } + + @Override + public Future update(String key, String value, long expectedRevision) { + return update(key, value.getBytes(StandardCharsets.UTF_8), expectedRevision); + } + + @Override + public Future delete(String key) { + return publishReturnVoid(key, null, getDeleteHeaders()); + } + + @Override + public Future delete(String key, long expectedRevision) { + Headers h = getDeleteHeaders().put(EXPECTED_LAST_SUB_SEQ_HDR, Long.toString(expectedRevision)); + return publishReturnVoid(key, null, h); + } + + @Override + public Future purge(String key) { + return publishReturnVoid(key, null, getPurgeHeaders()); + } + + @Override + public Future purge(String key, long expectedRevision) { + Headers h = getPurgeHeaders().put(EXPECTED_LAST_SUB_SEQ_HDR, Long.toString(expectedRevision)); + return publishReturnVoid(key, null, h); + } + + @Override + public NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) { + return null; + } + + @Override + public NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) { + return null; + } + + @Override + public NatsKeyValueWatchSubscription watch(List keys, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) { + return null; + } + + @Override + public NatsKeyValueWatchSubscription watch(List keys, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) { + return null; + } + + @Override + public NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) { + return null; + } + + @Override + public NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) { + return null; + } + + public Future> keys() { + return _keys(Collections.singletonList(readSubject(GREATER_THAN))); + } + + @Override + public Future> keys(String filter) { + return _keys(Collections.singletonList(readSubject(filter))); + } + + @Override + public Future> keys(List filters) { + List readSubjectFilters = new ArrayList<>(filters.size()); + for (String f : filters) { + readSubjectFilters.add(readSubject(f)); + } + return _keys(readSubjectFilters); + } + + private Future> _keys(List readSubjectFilters) { + final Promise> promise = context().promise(); + context().executeBlocking(event -> { + try { + List list = new ArrayList<>(); + visitSubject(streamName, readSubjectFilters, DeliverPolicy.LastPerSubject, true, false, m -> { + KeyValueOperation op = getOperation(m.getHeaders()); + if (op == KeyValueOperation.PUT) { + list.add(new BucketAndKey(m).key); + } + }); + promise.complete(list); + } catch (Exception e) { + handleException(promise, e); + } + }, false); + return promise.future(); + } + + @Override + public Future> history(String key) { + final Promise> promise = context().promise(); + context().executeBlocking(event -> { + try { + validateNonWildcardKvKeyRequired(key); + List list = new ArrayList<>(); + visitSubject(streamName, readSubject(key), DeliverPolicy.All, false, true, m -> list.add(new KeyValueEntry(m))); + promise.complete(list); + } catch (Exception e) { + handleException(promise, e); + } + }, false); + return promise.future(); + } + + @Override + public Future purgeDeletes() { + return purgeDeletes(null); + } + + @Override + public Future purgeDeletes(KeyValuePurgeOptions options) { + final Promise promise = context().promise(); + context().executeBlocking(event -> { + try { + long dmThresh = options == null + ? KeyValuePurgeOptions.DEFAULT_THRESHOLD_MILLIS + : options.getDeleteMarkersThresholdMillis(); + + ZonedDateTime limit; + if (dmThresh < 0) { + limit = DateTimeUtils.fromNow(600000); // long enough in the future to clear all + } + else if (dmThresh == 0) { + limit = DateTimeUtils.fromNow(KeyValuePurgeOptions.DEFAULT_THRESHOLD_MILLIS); + } + else { + limit = DateTimeUtils.fromNow(-dmThresh); + } + + List keep0List = new ArrayList<>(); + List keep1List = new ArrayList<>(); + visitSubject(streamName, streamSubject, DeliverPolicy.LastPerSubject, true, false, m -> { + KeyValueEntry kve = new KeyValueEntry(m); + if (kve.getOperation() != KeyValueOperation.PUT) { + if (kve.getCreated().isAfter(limit)) { + keep1List.add(new BucketAndKey(m).key); + } + else { + keep0List.add(new BucketAndKey(m).key); + } + } + }); + + for (String key : keep0List) { + jsm.purgeStream(streamName, PurgeOptions.subject(readSubject(key))); + } + + for (String key : keep1List) { + PurgeOptions po = PurgeOptions.builder() + .subject(readSubject(key)) + .keep(1) + .build(); + jsm.purgeStream(streamName, po); + } + promise.complete(); + } catch (Exception e) { + handleException(promise, e); + } + }, false); + return promise.future(); + } + + @Override + public Future getStatus() { + final Promise promise = context().promise(); + context().executeBlocking(event -> { + try { + promise.complete(new KeyValueStatus(jsm.getStreamInfo(streamName))); + } catch (Exception e) { + handleException(promise, e); + } + }, false); + return promise.future(); + } + + private PublishAck _publish(String key, byte[] d, Headers h) throws IOException, JetStreamApiException { + validateNonWildcardKvKeyRequired(key); + Message m = NatsMessage.builder().subject(writeSubject(key)).data(d).headers(h).build(); + return js.publish(m); + } + + private Future publishReturnSequence(String key, byte[] data, Headers h) { + final Promise promise = context().promise(); + context().executeBlocking(event -> { + try { + PublishAck pa = _publish(key, data, h); + promise.complete(pa.getSeqno()); + } catch (Exception e) { + handleException(promise, e); + } + }, false); + return promise.future(); + } + + private Future publishReturnVoid(String key, byte[] data, Headers h) { + final Promise promise = context().promise(); + context().executeBlocking(event -> { + try { + _publish(key, data, h); + promise.complete(); + } catch (Exception e) { + handleException(promise, e); + } + }, false); + return promise.future(); + } + + Future _getFuture(String key, Long revision, boolean existingOnly) { + final Promise promise = context().promise(); + context().executeBlocking(event -> { + try { + validateNonWildcardKvKeyRequired(key); + KeyValueEntry kve = revision == null + ? _getLastEntry(key, existingOnly) + : _getRevisionEntry(key, revision, existingOnly); + promise.complete(kve); + } catch (Exception e) { + handleException(promise, e); + } + }, false); + return promise.future(); + } + + private KeyValueEntry resolveExistingOnly(KeyValueEntry kve, boolean existingOnly) { + return existingOnly && kve.getOperation() != KeyValueOperation.PUT ? null : kve; + } + + private KeyValueEntry _getLastEntry(String key, boolean existingOnly) throws IOException, JetStreamApiException { + MessageInfo mi = _getLastMi(readSubject(key)); + KeyValueEntry kve = mi == null ? null : new KeyValueEntry(mi); + if (kve != null) { + kve = resolveExistingOnly(kve, existingOnly); + } + return kve; + } + + private KeyValueEntry _getRevisionEntry(String key, long revision, boolean existingOnly) throws IOException, JetStreamApiException { + MessageInfo mi = _getRevisionMi(revision); + KeyValueEntry kve = mi == null ? null : new KeyValueEntry(mi); + if (kve != null) { + if (key.equals(kve.getKey())) { + kve = resolveExistingOnly(kve, existingOnly); + } + else { + kve = null; + } + } + return kve; + } + + protected MessageInfo _getLastMi(String subject) throws IOException, JetStreamApiException { + try { + return jsm.getLastMessage(streamName, subject); + } + catch (JetStreamApiException jsae) { + if (jsae.getApiErrorCode() == JS_NO_MESSAGE_FOUND_ERR) { + return null; + } + throw jsae; + } + } + + protected MessageInfo _getRevisionMi(long seq) throws IOException, JetStreamApiException { + try { + return jsm.getMessage(streamName, seq); + } + catch (JetStreamApiException jsae) { + if (jsae.getApiErrorCode() == JS_NO_MESSAGE_FOUND_ERR) { + return null; + } + throw jsae; + } + } +} diff --git a/src/main/java/io/nats/vertx/impl/NatsStreamImpl.java b/src/main/java/io/nats/vertx/impl/NatsStreamImpl.java index 7e2baab..b423ab2 100644 --- a/src/main/java/io/nats/vertx/impl/NatsStreamImpl.java +++ b/src/main/java/io/nats/vertx/impl/NatsStreamImpl.java @@ -7,47 +7,26 @@ import io.nats.vertx.NatsVertxMessage; import io.nats.vertx.SubscriptionReadStream; import io.vertx.core.*; -import io.vertx.core.impl.ContextInternal; import io.vertx.core.streams.WriteStream; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; /** * NATS stream implementation. */ -public class NatsStreamImpl implements NatsStream { +public class NatsStreamImpl extends NatsImpl implements NatsStream { - private final JetStream jetStream; - private final Vertx vertx; - - private final ConcurrentHashMap dispatcherMap = new ConcurrentHashMap<>(); - private final ConcurrentHashMap subscriptionMap = new ConcurrentHashMap<>(); - - - private final Connection connection; - - private final AtomicReference> exceptionHandler = new AtomicReference<>(); /** * Create instance - * @param jetStream jetStream * @param connection Nats connection + * @param jso jetStreamOptions * @param vertx vertx */ - public NatsStreamImpl(final JetStream jetStream, final Connection connection, final Vertx vertx, - final Handler exceptionHandler) { - - this.connection = connection; - this.jetStream = jetStream; - this.vertx = vertx; - this.exceptionHandler.set(exceptionHandler); - } - - private ContextInternal context() { - return (ContextInternal) vertx.getOrCreateContext(); + public NatsStreamImpl(final Connection connection, final Vertx vertx, + final Handler exceptionHandler, final JetStreamOptions jso) { + super(connection, vertx, exceptionHandler, jso); } @Override @@ -73,7 +52,7 @@ public void write(Message data, Handler> handler) { private void doPublish(Message data, Promise promise) { try { - PublishAck publish = jetStream.publish(data); + PublishAck publish = js.publish(data); if (publish.isDuplicate()) { promise.fail("Duplicate message " + publish); } else if (publish.hasError()) { @@ -88,9 +67,7 @@ private void doPublish(Message data, Promise promise) { @Override public void end(Handler> handler) { - //No Op - final Promise promise = context().promise(); - handler.handle(promise.future()); + endImpl(handler); } @Override @@ -108,13 +85,12 @@ public NatsStream drainHandler(Handler handler) { return this; } - @Override public Future publish(final Message data) { final Promise promise = context().promise(); context().executeBlocking(event -> { try { - PublishAck ack = jetStream.publish(data); + PublishAck ack = js.publish(data); promise.complete(ack); } catch (Exception e) { handleException(promise, e); @@ -131,10 +107,9 @@ public Future publish(String subject, String message) { @Override public Future publish(final String subject, final byte[] message) { final Promise promise = context().promise(); - context().executeBlocking(event -> { try { - final PublishAck ack = jetStream.publish(subject, message); + final PublishAck ack = js.publish(subject, message); promise.complete(ack); } catch (Exception e) { handleException(promise, e); @@ -149,7 +124,7 @@ public void publish(Message data, Handler> handler) { final Promise promise = context().promise(); context().executeBlocking(event -> { try { - PublishAck ack = jetStream.publish(data); + PublishAck ack = js.publish(data); promise.complete(ack); } catch (Exception e) { handleException(promise, e); @@ -163,7 +138,7 @@ public Future publish(Message data, PublishOptions options) { final Promise promise = context().promise(); context().executeBlocking(event -> { try { - final PublishAck ack = jetStream.publish(data, options); + final PublishAck ack = js.publish(data, options); promise.complete(ack); } catch (Exception e) { handleException(promise, e); @@ -177,7 +152,7 @@ public Future publish(String subject, Headers headers, byte[] body) final Promise promise = context().promise(); context().executeBlocking(event -> { try { - final PublishAck ack = jetStream.publish(subject, headers, body); + final PublishAck ack = js.publish(subject, headers, body); promise.complete(ack); } catch (Exception e) { handleException(promise, e); @@ -191,7 +166,7 @@ public Future publish(String subject, Headers headers, byte[] body, final Promise promise = context().promise(); context().executeBlocking(event -> { try { - final PublishAck ack = jetStream.publish(subject, headers, body, options); + final PublishAck ack = js.publish(subject, headers, body, options); promise.complete(ack); } catch (Exception e) { handleException(promise, e); @@ -204,10 +179,10 @@ public Future publish(String subject, Headers headers, byte[] body, public Future subscribe(String subject, Handler handler, boolean autoAck, PushSubscribeOptions so) { final Promise promise = context().promise(); final Handler handlerWrapper = event -> handler.handle(new NatsVertxMessageImpl(event, context())); - final Dispatcher dispatcher = connection.createDispatcher(); + final Dispatcher dispatcher = conn.createDispatcher(); context().executeBlocking(event -> { try { - jetStream.subscribe(subject, dispatcher, handlerWrapper::handle, autoAck, so); + js.subscribe(subject, dispatcher, handlerWrapper::handle, autoAck, so); dispatcherMap.put(subject, dispatcher); promise.complete(); } catch (Exception e) { @@ -221,10 +196,10 @@ public Future subscribe(String subject, Handler handler, public Future subscribe(String subject, String queue, final Handler handler, boolean autoAck, PushSubscribeOptions so) { final Promise promise = context().promise(); final Handler handlerWrapper = event -> handler.handle(new NatsVertxMessageImpl(event, context())); - final Dispatcher dispatcher = connection.createDispatcher(); + final Dispatcher dispatcher = conn.createDispatcher(); context().executeBlocking(event -> { try { - jetStream.subscribe(subject, queue, dispatcher, handlerWrapper::handle, autoAck, so); + js.subscribe(subject, queue, dispatcher, handlerWrapper::handle, autoAck, so); dispatcherMap.put(subject, dispatcher); promise.complete(); } catch (Exception e) { @@ -240,7 +215,7 @@ public Future subscribe(final String subject, final Pull final Promise promise = context().promise(); context().executeBlocking(evt -> { try { - final JetStreamSubscription subscription = so != null ? jetStream.subscribe(subject, so) : jetStream.subscribe(subject); + final JetStreamSubscription subscription = so != null ? js.subscribe(subject, so) : js.subscribe(subject); final SubscriptionReadStream subscriptionReadStream = new SubscriptionReadStreamImpl(context(), subscription, exceptionHandler); subscriptionMap.put(subject, subscription); @@ -321,7 +296,7 @@ public Future unsubscribe(final String subject) { } } else { dispatcherMap.remove(subject); - connection.closeDispatcher(dispatcher); + conn.closeDispatcher(dispatcher); promise.complete(); } } catch (Exception e) { @@ -330,10 +305,4 @@ public Future unsubscribe(final String subject) { }, false); return promise.future(); } - - - private void handleException(Promise promise, Exception e) { - promise.fail(e); - exceptionHandler.get().handle(e); - } } diff --git a/src/test/java/io/nats/vertx/LoadTestMain.java b/src/test/java/io/nats/vertx/LoadTestMain.java index 03a06b6..9ff45ff 100644 --- a/src/test/java/io/nats/vertx/LoadTestMain.java +++ b/src/test/java/io/nats/vertx/LoadTestMain.java @@ -112,7 +112,7 @@ public static void setup(int port) throws Exception { Options.Builder builder = new Options.Builder().connectionTimeout(Duration.ofSeconds(5)) - .servers(new String[]{"localhost:" + port}); + .server("localhost:" + port); Connection nc = Nats.connect(builder.build()); JetStreamManagement jsm = nc.jetStreamManagement(); StreamInfo streamInfo = null; @@ -166,7 +166,7 @@ static NatsClient getNatsClient(int port) throws InterruptedException { final NatsOptions natsOptions = new NatsOptions(); natsOptions.setVertx(VERTX); natsOptions.setNatsBuilder(new Options.Builder()); - natsOptions.getNatsBuilder().servers(new String[]{"localhost:" + port}).connectionListener(new ConnectionListener() { + natsOptions.getNatsBuilder().server("localhost:" + port).connectionListener(new ConnectionListener() { @Override public void connectionEvent(Connection conn, Events type) { System.out.println("Connection EVENT " + type); diff --git a/src/test/java/io/nats/vertx/LoadTestOriginalMain.java b/src/test/java/io/nats/vertx/LoadTestOriginalMain.java index e33f6fa..1d22def 100644 --- a/src/test/java/io/nats/vertx/LoadTestOriginalMain.java +++ b/src/test/java/io/nats/vertx/LoadTestOriginalMain.java @@ -85,7 +85,7 @@ public static void setup(int port) throws Exception { Options.Builder builder = new Options.Builder().connectionTimeout(Duration.ofSeconds(5)) - .servers(new String[]{"localhost:" + port}); + .server("localhost:" + port); Connection nc = Nats.connect(builder.build()); JetStreamManagement jsm = nc.jetStreamManagement(); StreamInfo streamInfo = null; @@ -114,7 +114,7 @@ static Connection getNatsClient(int port) throws Exception { final Options.Builder natsOptions = new Options.Builder().connectionTimeout(Duration.ofSeconds(5)); final CountDownLatch latch = new CountDownLatch(1); - natsOptions.servers(new String[]{"localhost:" + port}).connectionListener(new ConnectionListener() { + natsOptions.server("localhost:" + port).connectionListener(new ConnectionListener() { @Override public void connectionEvent(Connection conn, Events type) { System.out.println("Connection EVENT " + type); diff --git a/src/test/java/io/nats/vertx/NatsClientTest.java b/src/test/java/io/nats/vertx/NatsClientTest.java index 9e5023b..1eda70e 100644 --- a/src/test/java/io/nats/vertx/NatsClientTest.java +++ b/src/test/java/io/nats/vertx/NatsClientTest.java @@ -1,5 +1,6 @@ package io.nats.vertx; +import io.nats.NatsServerRunner; import io.nats.client.*; import io.nats.client.api.StorageType; import io.nats.client.api.StreamConfiguration; @@ -11,7 +12,6 @@ import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.junit5.VertxExtension; -import nats.io.NatsServerRunner; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/src/test/java/io/nats/vertx/NatsKeyValueTest.java b/src/test/java/io/nats/vertx/NatsKeyValueTest.java new file mode 100644 index 0000000..d36dfca --- /dev/null +++ b/src/test/java/io/nats/vertx/NatsKeyValueTest.java @@ -0,0 +1,1161 @@ +package io.nats.vertx; + +import io.nats.client.*; +import io.nats.client.api.*; +import io.nats.client.impl.NatsKeyValueWatchSubscription; +import io.nats.client.support.NatsKeyValueUtil; +import io.vertx.core.Future; +import org.junit.jupiter.api.*; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import static io.nats.client.api.KeyValueWatchOption.*; +import static io.nats.vertx.TestUtils2.*; +import static org.junit.jupiter.api.Assertions.*; + +public class NatsKeyValueTest { + public static final String META_KEY = "meta-test-key"; + public static final String META_VALUE = "meta-test-value"; + + @Test + public void testWorkflow() throws Exception { + long now = ZonedDateTime.now().toEpochSecond(); + + String byteKey = "key.byte" + unique(); + String stringKey = "key.string" + unique(); + String longKey = "key.long" + unique(); + String notFoundKey = "notFound" + unique(); + String byteValue1 = "Byte Value 1"; + String byteValue2 = "Byte Value 2"; + String stringValue1 = "String Value 1"; + String stringValue2 = "String Value 2"; + + // get the kv management context + Map metadata = new HashMap<>(); + metadata.put(META_KEY, META_VALUE); + + // create the bucket + String desc = "desc" + unique(); + KvProxy proxy = new KvProxy(b -> + b.description(desc).maxHistoryPerKey(3).metadata(metadata)); + + assertInitialStatus(proxy.status, proxy.bucket, desc); + + // get the kv context for the specific bucket + + assertEquals(proxy.bucket, proxy.kv.getBucketName()); + KeyValueStatus status = proxy.getStatus(); + assertInitialStatus(status, proxy.bucket, desc); + + // Put some keys. Each key is put in a subject in the bucket (stream) + // The put returns the sequence number in the bucket (stream) + assertEquals(1, proxy.put(byteKey, byteValue1.getBytes())); + assertEquals(2, proxy.put(stringKey, stringValue1)); + assertEquals(3, proxy.put(longKey, 1)); + + // retrieve the values. all types are stored as bytes + // so you can always get the bytes directly + assertEquals(byteValue1, new String(proxy.get(byteKey).getValue())); + assertEquals(stringValue1, new String(proxy.get(stringKey).getValue())); + assertEquals("1", new String(proxy.get(longKey).getValue())); + + // if you know the value is not binary and can safely be read + // as a UTF-8 string, the getStringValue method is ok to use + assertEquals(byteValue1, proxy.get(byteKey).getValueAsString()); + assertEquals(stringValue1, proxy.get(stringKey).getValueAsString()); + assertEquals("1", proxy.get(longKey).getValueAsString()); + + // if you know the value is a long, you can use + // the getLongValue method + // if it's not a number a NumberFormatException is thrown + assertEquals(1, proxy.get(longKey).getValueAsLong()); + assertThrows(NumberFormatException.class, () -> proxy.get(stringKey).getValueAsLong()); + + // going to manually track history for verification later + List byteHistory = new ArrayList<>(); + List stringHistory = new ArrayList<>(); + List longHistory = new ArrayList<>(); + + // entry gives detail about the latest entry of the key + byteHistory.add( + assertEntry(proxy.bucket, byteKey, KeyValueOperation.PUT, 1, byteValue1, now, proxy.get(byteKey))); + + stringHistory.add( + assertEntry(proxy.bucket, stringKey, KeyValueOperation.PUT, 2, stringValue1, now, proxy.get(stringKey))); + + longHistory.add( + assertEntry(proxy.bucket, longKey, KeyValueOperation.PUT, 3, "1", now, proxy.get(longKey))); + + // history gives detail about the key + assertHistory(byteHistory, proxy.history(byteKey)); + assertHistory(stringHistory, proxy.history(stringKey)); + assertHistory(longHistory, proxy.history(longKey)); + + // let's check the bucket info + status = proxy.kvm.getStatus(proxy.bucket); + assertState(status, 3, 3); + status = proxy.kvm.getBucketInfo(proxy.bucket); // coverage for deprecated + assertState(status, 3, 3); + + // delete a key. Its entry will still exist, but its value is null + proxy.delete(byteKey); + assertNull(proxy.get(byteKey)); + byteHistory.add(KeyValueOperation.DELETE); + assertHistory(byteHistory, proxy.history(byteKey)); + + // hashCode coverage + assertEquals(byteHistory.get(0).hashCode(), byteHistory.get(0).hashCode()); + assertNotEquals(byteHistory.get(0).hashCode(), byteHistory.get(1).hashCode()); + + // let's check the bucket info + status = proxy.kvm.getStatus(proxy.bucket); + assertState(status, 4, 4); + + // if the key has been deleted no etnry is returned + assertNull(proxy.get(byteKey)); + + // if the key does not exist (no history) there is no entry + assertNull(proxy.get(notFoundKey)); + + // Update values. You can even update a deleted key + assertEquals(5, proxy.put(byteKey, byteValue2.getBytes())); + assertEquals(6, proxy.put(stringKey, stringValue2)); + assertEquals(7, proxy.put(longKey, 2)); + + // values after updates + assertEquals(byteValue2, new String(proxy.get(byteKey).getValue())); + assertEquals(stringValue2, proxy.get(stringKey).getValueAsString()); + assertEquals(2, proxy.get(longKey).getValueAsLong()); + + // entry and history after update + byteHistory.add( + assertEntry(proxy.bucket, byteKey, KeyValueOperation.PUT, 5, byteValue2, now, proxy.get(byteKey))); + assertHistory(byteHistory, proxy.history(byteKey)); + + stringHistory.add( + assertEntry(proxy.bucket, stringKey, KeyValueOperation.PUT, 6, stringValue2, now, proxy.get(stringKey))); + assertHistory(stringHistory, proxy.history(stringKey)); + + longHistory.add( + assertEntry(proxy.bucket, longKey, KeyValueOperation.PUT, 7, "2", now, proxy.get(longKey))); + assertHistory(longHistory, proxy.history(longKey)); + + // let's check the bucket info + status = proxy.kvm.getStatus(proxy.bucket); + assertState(status, 7, 7); + + // make sure it only keeps the correct amount of history + assertEquals(8, proxy.put(longKey, 3)); + assertEquals(3, proxy.get(longKey).getValueAsLong()); + + longHistory.add( + assertEntry(proxy.bucket, longKey, KeyValueOperation.PUT, 8, "3", now, proxy.get(longKey))); + assertHistory(longHistory, proxy.history(longKey)); + + status = proxy.kvm.getStatus(proxy.bucket); + assertState(status, 8, 8); + + // this would be the 4th entry for the longKey + // sp the total records will stay the same + assertEquals(9, proxy.put(longKey, 4)); + assertEquals(4, proxy.get(longKey).getValueAsLong()); + + // history only retains 3 records + longHistory.remove(0); + longHistory.add( + assertEntry(proxy.bucket, longKey, KeyValueOperation.PUT, 9, "4", now, proxy.get(longKey))); + assertHistory(longHistory, proxy.history(longKey)); + + // record count does not increase + status = proxy.kvm.getStatus(proxy.bucket); + assertState(status, 8, 9); + + assertKeys(proxy.keys(), byteKey, stringKey, longKey); + assertKeys(proxy.keys("key.>"), byteKey, stringKey, longKey); + assertKeys(proxy.keys(byteKey), byteKey); + assertKeys(proxy.keys(Arrays.asList(longKey, stringKey)), longKey, stringKey); + + // purge + proxy.purge(longKey); + longHistory.clear(); + assertNull(proxy.get(longKey)); + longHistory.add(KeyValueOperation.PURGE); + assertHistory(longHistory, proxy.history(longKey)); + + status = proxy.kvm.getStatus(proxy.bucket); + assertState(status, 6, 10); + + // only 2 keys now + assertKeys(proxy.keys(), byteKey, stringKey); + + proxy.purge(byteKey); + byteHistory.clear(); + assertNull(proxy.get(byteKey)); + byteHistory.add(KeyValueOperation.PURGE); + assertHistory(byteHistory, proxy.history(byteKey)); + + status = proxy.kvm.getStatus(proxy.bucket); + assertState(status, 4, 11); + + // only 1 key now + assertKeys(proxy.keys(), stringKey); + + proxy.purge(stringKey); + stringHistory.clear(); + assertNull(proxy.get(stringKey)); + stringHistory.add(KeyValueOperation.PURGE); + assertHistory(stringHistory, proxy.history(stringKey)); + + status = proxy.kvm.getStatus(proxy.bucket); + assertState(status, 3, 12); + + // no more keys left + assertKeys(proxy.keys()); + + // clear things + KeyValuePurgeOptions kvpo = KeyValuePurgeOptions.builder().deleteMarkersNoThreshold().build(); + proxy.purgeDeletes(kvpo); + status = proxy.kvm.getStatus(proxy.bucket); + assertState(status, 0, 12); + + longHistory.clear(); + assertHistory(longHistory, proxy.history(longKey)); + + stringHistory.clear(); + assertHistory(stringHistory, proxy.history(stringKey)); + + // put some more + assertEquals(13, proxy.put(longKey, 110)); + longHistory.add( + assertEntry(proxy.bucket, longKey, KeyValueOperation.PUT, 13, "110", now, proxy.get(longKey))); + + assertEquals(14, proxy.put(longKey, 111)); + longHistory.add( + assertEntry(proxy.bucket, longKey, KeyValueOperation.PUT, 14, "111", now, proxy.get(longKey))); + + assertEquals(15, proxy.put(longKey, 112)); + longHistory.add( + assertEntry(proxy.bucket, longKey, KeyValueOperation.PUT, 15, "112", now, proxy.get(longKey))); + + assertEquals(16, proxy.put(stringKey, stringValue1)); + stringHistory.add( + assertEntry(proxy.bucket, stringKey, KeyValueOperation.PUT, 16, stringValue1, now, proxy.get(stringKey))); + + assertEquals(17, proxy.put(stringKey, stringValue2)); + stringHistory.add( + assertEntry(proxy.bucket, stringKey, KeyValueOperation.PUT, 17, stringValue2, now, proxy.get(stringKey))); + + assertHistory(longHistory, proxy.history(longKey)); + assertHistory(stringHistory, proxy.history(stringKey)); + + status = proxy.kvm.getStatus(proxy.bucket); + assertState(status, 5, 17); + + // delete the bucket + proxy.kvm.delete(proxy.bucket); + assertThrows(JetStreamApiException.class, () -> proxy.kvm.delete(proxy.bucket)); + assertThrows(JetStreamApiException.class, () -> proxy.kvm.getStatus(proxy.bucket)); + + assertEquals(0, proxy.kvm.getBucketNames().size()); + } + + private static void assertState(KeyValueStatus status, int entryCount, int lastSeq) { + assertEquals(entryCount, status.getEntryCount()); + assertEquals(lastSeq, status.getBackingStreamInfo().getStreamState().getLastSequence()); + assertEquals(status.getByteCount(), status.getBackingStreamInfo().getStreamState().getByteCount()); + } + + private void assertInitialStatus(KeyValueStatus status, String bucket, String desc) { + KeyValueConfiguration kvc = status.getConfiguration(); + assertEquals(bucket, status.getBucketName()); + assertEquals(bucket, kvc.getBucketName()); + assertEquals(desc, status.getDescription()); + assertEquals(desc, kvc.getDescription()); + assertEquals(NatsKeyValueUtil.toStreamName(bucket), kvc.getBackingConfig().getName()); + assertEquals(3, status.getMaxHistoryPerKey()); + assertEquals(3, kvc.getMaxHistoryPerKey()); + assertEquals(-1, status.getMaxBucketSize()); + assertEquals(-1, kvc.getMaxBucketSize()); + assertEquals(-1, status.getMaxValueSize()); // COVERAGE for deprecated + assertEquals(-1, kvc.getMaxValueSize()); + assertEquals(-1, status.getMaximumValueSize()); + assertEquals(-1, kvc.getMaximumValueSize()); + assertEquals(Duration.ZERO, status.getTtl()); + assertEquals(Duration.ZERO, kvc.getTtl()); + assertEquals(StorageType.Memory, status.getStorageType()); + assertEquals(StorageType.Memory, kvc.getStorageType()); + assertNull(status.getPlacement()); + assertNull(status.getRepublish()); + assertEquals(1, status.getReplicas()); + assertEquals(1, kvc.getReplicas()); + assertEquals(0, status.getEntryCount()); + assertEquals("JetStream", status.getBackingStore()); + assertNotNull(status.getConfiguration()); // coverage + assertNotNull(status.getConfiguration().toString()); // coverage + assertNotNull(status.toString()); // coverage + assertTrue(status.toString().contains(bucket)); + assertTrue(status.toString().contains(desc)); + } + + @Test + public void testGetRevision() throws Exception { + KvProxy proxy = new KvProxy(b -> b.maxHistoryPerKey(2)); + + String key = unique(); + + long seq1 = proxy.put(key, 1); + long seq2 = proxy.put(key, 2); + long seq3 = proxy.put(key, 3); + + KeyValueEntry kve = proxy.get(key); + assertNotNull(kve); + assertEquals(3, kve.getValueAsLong()); + + kve = proxy.get(key, seq3); + assertNotNull(kve); + assertEquals(3, kve.getValueAsLong()); + + kve = proxy.get(key, seq2); + assertNotNull(kve); + assertEquals(2, kve.getValueAsLong()); + + kve = proxy.get(key, seq1); + assertNull(kve); + + kve = proxy.get("notkey", seq3); + assertNull(kve); + } + + @Test + public void testKeys() throws Exception { + KvProxy proxy = new KvProxy(); + + for (int x = 1; x <= 10; x++) { + proxy.put("k" + x, x); + } + + List keys = proxy.keys(); + assertEquals(10, keys.size()); + for (int x = 1; x <= 10; x++) { + assertTrue(keys.contains("k" + x)); + } + } + + @Test + public void testMaxHistoryPerKey() throws Exception { + KvProxy proxy1 = new KvProxy(); + + String key = unique(); + proxy1.put(key, 1); + proxy1.put(key, 2); + + List history = proxy1.history(key); + assertEquals(1, history.size()); + assertEquals(2, history.get(0).getValueAsLong()); + + KvProxy proxy2 = new KvProxy(b -> b.maxHistoryPerKey(2)); + + key = unique(); + proxy2.put(key, 1); + proxy2.put(key, 2); + proxy2.put(key, 3); + + history = proxy2.history(key); + assertEquals(2, history.size()); + assertEquals(2, history.get(0).getValueAsLong()); + assertEquals(3, history.get(1).getValueAsLong()); + } + + @Test + public void testCreateUpdate() throws Exception { + KvProxy proxy = new KvProxy(); + + assertEquals(proxy.bucket, proxy.status.getBucketName()); + assertNull(proxy.status.getDescription()); + assertEquals(1, proxy.status.getMaxHistoryPerKey()); + assertEquals(-1, proxy.status.getMaxBucketSize()); + assertEquals(-1, proxy.status.getMaximumValueSize()); + assertEquals(Duration.ZERO, proxy.status.getTtl()); + assertEquals(StorageType.Memory, proxy.status.getStorageType()); + assertEquals(1, proxy.status.getReplicas()); + assertEquals(0, proxy.status.getEntryCount()); + assertEquals("JetStream", proxy.status.getBackingStore()); + + String key = unique(); + proxy.put(key, 1); + proxy.put(key, 2); + + List history = proxy.history(key); + assertEquals(1, history.size()); + assertEquals(2, history.get(0).getValueAsLong()); + + boolean compression = true; + String desc = unique(); + KeyValueConfiguration kvc = KeyValueConfiguration.builder(proxy.status.getConfiguration()) + .description(desc) + .maxHistoryPerKey(3) + .maxBucketSize(10_000) + .maximumValueSize(100) + .ttl(Duration.ofHours(1)) + .compression(compression) + .build(); + + KeyValueStatus kvs = proxy.kvm.update(kvc); + + assertEquals(proxy.bucket, kvs.getBucketName()); + assertEquals(desc, kvs.getDescription()); + assertEquals(3, kvs.getMaxHistoryPerKey()); + assertEquals(10_000, kvs.getMaxBucketSize()); + assertEquals(100, kvs.getMaximumValueSize()); + assertEquals(Duration.ofHours(1), kvs.getTtl()); + assertEquals(StorageType.Memory, kvs.getStorageType()); + assertEquals(1, kvs.getReplicas()); + assertEquals(1, kvs.getEntryCount()); + assertEquals("JetStream", kvs.getBackingStore()); + assertEquals(compression, kvs.isCompressed()); + + history = proxy.history(key); + assertEquals(1, history.size()); + assertEquals(2, history.get(0).getValueAsLong()); + + KeyValueConfiguration kvcStor = KeyValueConfiguration.builder(kvs.getConfiguration()) + .storageType(StorageType.File) + .build(); + assertThrows(JetStreamApiException.class, () -> proxy.kvm.update(kvcStor)); + } + + @Test + public void testHistoryDeletePurge() throws Exception { + KvProxy proxy = new KvProxy(b -> b.maxHistoryPerKey(64)); + + String key = unique(); + proxy.put(key, "a"); + proxy.put(key, "b"); + proxy.put(key, "c"); + List list = proxy.history(key); + assertEquals(3, list.size()); + + proxy.delete(key); + list = proxy.history(key); + assertEquals(4, list.size()); + + proxy.purge(key); + list = proxy.history(key); + assertEquals(1, list.size()); + } + + @Test + public void testAtomicDeleteAtomicPurge() throws Exception { + KvProxy proxy = new KvProxy(b -> b.maxHistoryPerKey(64)); + + String key = unique(); + proxy.put(key, "a"); + proxy.put(key, "b"); + proxy.put(key, "c"); + assertEquals(3, proxy.get(key).getRevision()); + + // Delete wrong revision rejected + proxy.delete(key, 1); + proxy.assertThrew(JetStreamApiException.class); + + // Correct revision writes tombstone and bumps revision + proxy.delete(key, 3); + + assertHistory(Arrays.asList( + proxy.get(key, 1L), + proxy.get(key, 2L), + proxy.get(key, 3L), + KeyValueOperation.DELETE), + proxy.history(key)); + + // Wrong revision rejected again + proxy.delete(key, 3); + proxy.assertThrew(JetStreamApiException.class); + + // Delete is idempotent: two consecutive tombstones + proxy.delete(key, 4); + + assertHistory(Arrays.asList( + proxy.get(key, 1L), + proxy.get(key, 2L), + proxy.get(key, 3L), + KeyValueOperation.DELETE, + KeyValueOperation.DELETE), + proxy.history(key)); + + // Purge wrong revision rejected + proxy.purge(key, 1); + proxy.assertThrew(JetStreamApiException.class); + + // Correct revision writes roll-up purge tombstone + proxy.purge(key, 5); + + assertHistory(Arrays.asList(KeyValueOperation.PURGE), proxy.history(key)); + } + + @Test + public void testPurgeDeletes() throws Exception { + KvProxy proxy = new KvProxy(b -> b.maxHistoryPerKey(64)); + + String key1 = unique(); + String key2 = unique(); + String key3 = unique(); + String key4 = unique(); + proxy.put(key1, "a"); + proxy.delete(key1); + proxy.put(key2, "b"); + proxy.put(key3, "c"); + proxy.put(key4, "d"); + proxy.purge(key4); + + JetStream js = testRunner.nc.jetStream(); + + assertPurgeDeleteEntries(js, proxy.bucket, new String[]{"a", null, "b", "c", null}); + + // default purge deletes uses the default threshold + // so no markers will be deleted + proxy.purgeDeletes(); + assertPurgeDeleteEntries(js, proxy.bucket, new String[]{null, "b", "c", null}); + + // deleteMarkersThreshold of 0 the default threshold + // so no markers will be deleted + proxy.purgeDeletes(KeyValuePurgeOptions.builder().deleteMarkersThreshold(0).build()); + assertPurgeDeleteEntries(js, proxy.bucket, new String[]{null, "b", "c", null}); + + // no threshold causes all to be removed + proxy.purgeDeletes(KeyValuePurgeOptions.builder().deleteMarkersNoThreshold().build()); + assertPurgeDeleteEntries(js, proxy.bucket, new String[]{"b", "c"}); + } + + private void assertPurgeDeleteEntries(JetStream js, String bucket, String[] expected) throws IOException, JetStreamApiException, InterruptedException { + JetStreamSubscription sub = js.subscribe(NatsKeyValueUtil.toStreamSubject(bucket)); + + for (String s : expected) { + Message m = sub.nextMessage(1000); + KeyValueEntry kve = new KeyValueEntry(m); + if (s == null) { + assertNotEquals(KeyValueOperation.PUT, kve.getOperation()); + assertEquals(0, kve.getDataLen()); + } + else { + assertEquals(KeyValueOperation.PUT, kve.getOperation()); + assertEquals(s, kve.getValueAsString()); + } + } + + sub.unsubscribe(); + } + + @Test + public void testCreateAndUpdate() throws Exception { + KvProxy proxy = new KvProxy(b -> b.maxHistoryPerKey(64)); + + String key = unique(); + // 1. allowed to create something that does not exist + long rev1 = proxy.create(key, "a".getBytes()); + + // 2. allowed to update with proper revision + proxy.update(key, "ab".getBytes(), rev1); + + // 3. not allowed to update with wrong revision + proxy.update(key, "zzz".getBytes(), rev1); + proxy.assertThrew(JetStreamApiException.class); + + // 4. not allowed to create a key that exists + proxy.create(key, "zzz".getBytes()); + proxy.assertThrew(JetStreamApiException.class); + + // 5. not allowed to update a key that does not exist + proxy.update(key, "zzz".getBytes(), 1); + proxy.assertThrew(JetStreamApiException.class); + + // 6. allowed to create a key that is deleted + proxy.delete(key); + proxy.create(key, "abc".getBytes()); + + // 7. allowed to update a key that is deleted, as long as you have it's revision + proxy.delete(key); + testRunner.nc.flush(Duration.ofSeconds(1)); + + sleep(200); // a little pause to make sure things get flushed + List hist = proxy.history(key); + proxy.update(key, "abcd".getBytes(), hist.get(hist.size() - 1).getRevision()); + + // 8. allowed to create a key that is purged + proxy.purge(key); + proxy.create(key, "abcde".getBytes()); + + // 9. allowed to update a key that is deleted, as long as you have its revision + proxy.purge(key); + + sleep(200); // a little pause to make sure things get flushed + hist = proxy.history(key); + proxy.update(key, "abcdef".getBytes(), hist.get(hist.size() - 1).getRevision()); + } + + private void assertKeys(List apiKeys, String... manualKeys) { + assertEquals(manualKeys.length, apiKeys.size()); + for (String k : manualKeys) { + assertTrue(apiKeys.contains(k)); + } + } + + private void assertHistory(List manualHistory, List apiHistory) { + assertEquals(apiHistory.size(), manualHistory.size()); + for (int x = 0; x < apiHistory.size(); x++) { + Object o = manualHistory.get(x); + if (o instanceof KeyValueOperation) { + assertEquals((KeyValueOperation)o, apiHistory.get(x).getOperation()); + } + else { + assertKvEquals((KeyValueEntry)o, apiHistory.get(x)); + } + } + } + + @SuppressWarnings("SameParameterValue") + private KeyValueEntry assertEntry(String bucket, String key, KeyValueOperation op, long seq, String value, long now, KeyValueEntry entry) { + assertEquals(bucket, entry.getBucket()); + assertEquals(key, entry.getKey()); + assertEquals(op, entry.getOperation()); + assertEquals(seq, entry.getRevision()); + assertEquals(0, entry.getDelta()); + if (op == KeyValueOperation.PUT) { + assertEquals(value, new String(entry.getValue())); + } + else { + assertNull(entry.getValue()); + } + assertTrue(now <= entry.getCreated().toEpochSecond()); + + // coverage + assertNotNull(entry.toString()); + return entry; + } + + private void assertKvEquals(KeyValueEntry kv1, KeyValueEntry kv2) { + assertEquals(kv1.getOperation(), kv2.getOperation()); + assertEquals(kv1.getRevision(), kv2.getRevision()); + assertEquals(kv1.getKey(), kv2.getKey()); + assertEquals(kv1.getKey(), kv2.getKey()); + assertArrayEquals(kv1.getValue(), kv2.getValue()); + long es1 = kv1.getCreated().toEpochSecond(); + long es2 = kv2.getCreated().toEpochSecond(); + assertEquals(es1, es2); + } + + static class TestKeyValueWatcher implements KeyValueWatcher { + public String name; + public List entries = new ArrayList<>(); + public KeyValueWatchOption[] watchOptions; + public boolean beforeWatcher; + public boolean metaOnly; + public int endOfDataReceived; + public boolean endBeforeEntries; + + public TestKeyValueWatcher(String name, boolean beforeWatcher, KeyValueWatchOption... watchOptions) { + this.name = name; + this.beforeWatcher = beforeWatcher; + this.watchOptions = watchOptions; + for (KeyValueWatchOption wo : watchOptions) { + if (wo == META_ONLY) { + metaOnly = true; + break; + } + } + } + + @Override + public String toString() { + return "TestKeyValueWatcher{" + + "name='" + name + '\'' + + ", beforeWatcher=" + beforeWatcher + + ", metaOnly=" + metaOnly + + ", watchOptions=" + Arrays.toString(watchOptions) + + '}'; + } + + @Override + public void watch(KeyValueEntry kve) { + entries.add(kve); + } + + @Override + public void endOfData() { + if (++endOfDataReceived == 1 && entries.isEmpty()) { + endBeforeEntries = true; + } + } + } + + static String TEST_WATCH_KEY_NULL = "key.nl"; + static String TEST_WATCH_KEY_1 = "key.1"; + static String TEST_WATCH_KEY_2 = "key.2"; + + interface TestWatchSubSupplier { + NatsKeyValueWatchSubscription get(KeyValue kv) throws Exception; + } + + @Test + public void testWatch() throws Exception { + Object[] key1AllExpecteds = new Object[]{ + "a", "aa", KeyValueOperation.DELETE, "aaa", KeyValueOperation.DELETE, KeyValueOperation.PURGE + }; + + Object[] key1FromRevisionExpecteds = new Object[]{ + "aa", KeyValueOperation.DELETE, "aaa" + }; + + Object[] noExpecteds = new Object[0]; + Object[] purgeOnlyExpecteds = new Object[]{KeyValueOperation.PURGE}; + + Object[] key2AllExpecteds = new Object[]{ + "z", "zz", KeyValueOperation.DELETE, "zzz" + }; + + Object[] key2AfterExpecteds = new Object[]{"zzz"}; + + Object[] allExpecteds = new Object[]{ + "a", "aa", "z", "zz", + KeyValueOperation.DELETE, KeyValueOperation.DELETE, + "aaa", "zzz", + KeyValueOperation.DELETE, KeyValueOperation.PURGE, + null + }; + + Object[] allPutsExpecteds = new Object[]{ + "a", "aa", "z", "zz", "aaa", "zzz", null + }; + + Object[] allFromRevisionExpecteds = new Object[]{ + "aa", "z", "zz", + KeyValueOperation.DELETE, KeyValueOperation.DELETE, + "aaa", "zzz", + }; + + TestKeyValueWatcher key1FullWatcher = new TestKeyValueWatcher("key1FullWatcher", true); + TestKeyValueWatcher key1MetaWatcher = new TestKeyValueWatcher("key1MetaWatcher", true, META_ONLY); + TestKeyValueWatcher key1StartNewWatcher = new TestKeyValueWatcher("key1StartNewWatcher", true, META_ONLY, UPDATES_ONLY); + TestKeyValueWatcher key1StartAllWatcher = new TestKeyValueWatcher("key1StartAllWatcher", true, META_ONLY); + TestKeyValueWatcher key2FullWatcher = new TestKeyValueWatcher("key2FullWatcher", true); + TestKeyValueWatcher key2MetaWatcher = new TestKeyValueWatcher("key2MetaWatcher", true, META_ONLY); + TestKeyValueWatcher allAllFullWatcher = new TestKeyValueWatcher("allAllFullWatcher", true); + TestKeyValueWatcher allAllMetaWatcher = new TestKeyValueWatcher("allAllMetaWatcher", true, META_ONLY); + TestKeyValueWatcher allIgDelFullWatcher = new TestKeyValueWatcher("allIgDelFullWatcher", true, IGNORE_DELETE); + TestKeyValueWatcher allIgDelMetaWatcher = new TestKeyValueWatcher("allIgDelMetaWatcher", true, META_ONLY, IGNORE_DELETE); + TestKeyValueWatcher starFullWatcher = new TestKeyValueWatcher("starFullWatcher", true); + TestKeyValueWatcher starMetaWatcher = new TestKeyValueWatcher("starMetaWatcher", true, META_ONLY); + TestKeyValueWatcher gtFullWatcher = new TestKeyValueWatcher("gtFullWatcher", true); + TestKeyValueWatcher gtMetaWatcher = new TestKeyValueWatcher("gtMetaWatcher", true, META_ONLY); + TestKeyValueWatcher multipleFullWatcher = new TestKeyValueWatcher("multipleFullWatcher", true); + TestKeyValueWatcher multipleMetaWatcher = new TestKeyValueWatcher("multipleMetaWatcher", true, META_ONLY); + TestKeyValueWatcher key1AfterWatcher = new TestKeyValueWatcher("key1AfterWatcher", false, META_ONLY); + TestKeyValueWatcher key1AfterIgDelWatcher = new TestKeyValueWatcher("key1AfterIgDelWatcher", false, META_ONLY, IGNORE_DELETE); + TestKeyValueWatcher key1AfterStartNewWatcher = new TestKeyValueWatcher("key1AfterStartNewWatcher", false, META_ONLY, UPDATES_ONLY); + TestKeyValueWatcher key1AfterStartFirstWatcher = new TestKeyValueWatcher("key1AfterStartFirstWatcher", false, META_ONLY, INCLUDE_HISTORY); + TestKeyValueWatcher key2AfterWatcher = new TestKeyValueWatcher("key2AfterWatcher", false, META_ONLY); + TestKeyValueWatcher key2AfterStartNewWatcher = new TestKeyValueWatcher("key2AfterStartNewWatcher", false, META_ONLY, UPDATES_ONLY); + TestKeyValueWatcher key2AfterStartFirstWatcher = new TestKeyValueWatcher("key2AfterStartFirstWatcher", false, META_ONLY, INCLUDE_HISTORY); + TestKeyValueWatcher key1FromRevisionAfterWatcher = new TestKeyValueWatcher("key1FromRevisionAfterWatcher", false); + TestKeyValueWatcher allFromRevisionAfterWatcher = new TestKeyValueWatcher("allFromRevisionAfterWatcher", false); + + List allKeys = Arrays.asList(TEST_WATCH_KEY_1, TEST_WATCH_KEY_2, TEST_WATCH_KEY_NULL); + + _testWatch(key1FullWatcher, key1AllExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1FullWatcher, key1FullWatcher.watchOptions)); + _testWatch(key1MetaWatcher, key1AllExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1MetaWatcher, key1MetaWatcher.watchOptions)); + _testWatch(key1StartNewWatcher, key1AllExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1StartNewWatcher, key1StartNewWatcher.watchOptions)); + _testWatch(key1StartAllWatcher, key1AllExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1StartAllWatcher, key1StartAllWatcher.watchOptions)); + _testWatch(key2FullWatcher, key2AllExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_2, key2FullWatcher, key2FullWatcher.watchOptions)); + _testWatch(key2MetaWatcher, key2AllExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_2, key2MetaWatcher, key2MetaWatcher.watchOptions)); + _testWatch(allAllFullWatcher, allExpecteds, -1, kv -> kv.watchAll(allAllFullWatcher, allAllFullWatcher.watchOptions)); + _testWatch(allAllMetaWatcher, allExpecteds, -1, kv -> kv.watchAll(allAllMetaWatcher, allAllMetaWatcher.watchOptions)); + _testWatch(allIgDelFullWatcher, allPutsExpecteds, -1, kv -> kv.watchAll(allIgDelFullWatcher, allIgDelFullWatcher.watchOptions)); + _testWatch(allIgDelMetaWatcher, allPutsExpecteds, -1, kv -> kv.watchAll(allIgDelMetaWatcher, allIgDelMetaWatcher.watchOptions)); + _testWatch(starFullWatcher, allExpecteds, -1, kv -> kv.watch("key.*", starFullWatcher, starFullWatcher.watchOptions)); + _testWatch(starMetaWatcher, allExpecteds, -1, kv -> kv.watch("key.*", starMetaWatcher, starMetaWatcher.watchOptions)); + _testWatch(gtFullWatcher, allExpecteds, -1, kv -> kv.watch("key.>", gtFullWatcher, gtFullWatcher.watchOptions)); + _testWatch(gtMetaWatcher, allExpecteds, -1, kv -> kv.watch("key.>", gtMetaWatcher, gtMetaWatcher.watchOptions)); + _testWatch(key1AfterWatcher, purgeOnlyExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1AfterWatcher, key1AfterWatcher.watchOptions)); + _testWatch(key1AfterIgDelWatcher, noExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1AfterIgDelWatcher, key1AfterIgDelWatcher.watchOptions)); + _testWatch(key1AfterStartNewWatcher, noExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1AfterStartNewWatcher, key1AfterStartNewWatcher.watchOptions)); + _testWatch(key1AfterStartFirstWatcher, purgeOnlyExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1AfterStartFirstWatcher, key1AfterStartFirstWatcher.watchOptions)); + _testWatch(key2AfterWatcher, key2AfterExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_2, key2AfterWatcher, key2AfterWatcher.watchOptions)); + _testWatch(key2AfterStartNewWatcher, noExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_2, key2AfterStartNewWatcher, key2AfterStartNewWatcher.watchOptions)); + _testWatch(key2AfterStartFirstWatcher, key2AllExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_2, key2AfterStartFirstWatcher, key2AfterStartFirstWatcher.watchOptions)); + _testWatch(key1FromRevisionAfterWatcher, key1FromRevisionExpecteds, 2, kv -> kv.watch(TEST_WATCH_KEY_1, key1FromRevisionAfterWatcher, 2, key1FromRevisionAfterWatcher.watchOptions)); + _testWatch(allFromRevisionAfterWatcher, allFromRevisionExpecteds, 2, kv -> kv.watchAll(allFromRevisionAfterWatcher, 2, allFromRevisionAfterWatcher.watchOptions)); + + _testWatch(multipleFullWatcher, allExpecteds, -1, kv -> kv.watch(allKeys, multipleFullWatcher, multipleFullWatcher.watchOptions)); + _testWatch(multipleMetaWatcher, allExpecteds, -1, kv -> kv.watch(allKeys, multipleMetaWatcher, multipleMetaWatcher.watchOptions)); + } + + private void _testWatch(TestKeyValueWatcher watcher, Object[] expectedKves, long fromRevision, TestWatchSubSupplier supplier) throws Exception { + KeyValueManagement kvm = testRunner.nc.keyValueManagement(); + + String bucket = unique() + watcher.name + "Bucket"; + kvm.create(KeyValueConfiguration.builder() + .name(bucket) + .maxHistoryPerKey(10) + .storageType(StorageType.Memory) + .build()); + + KeyValue kv = testRunner.nc.keyValue(bucket); + + NatsKeyValueWatchSubscription sub = null; + + if (watcher.beforeWatcher) { + sub = supplier.get(kv); + } + + if (fromRevision == -1) { + kv.put(TEST_WATCH_KEY_1, "a"); + kv.put(TEST_WATCH_KEY_1, "aa"); + kv.put(TEST_WATCH_KEY_2, "z"); + kv.put(TEST_WATCH_KEY_2, "zz"); + kv.delete(TEST_WATCH_KEY_1); + kv.delete(TEST_WATCH_KEY_2); + kv.put(TEST_WATCH_KEY_1, "aaa"); + kv.put(TEST_WATCH_KEY_2, "zzz"); + kv.delete(TEST_WATCH_KEY_1); + kv.purge(TEST_WATCH_KEY_1); + kv.put(TEST_WATCH_KEY_NULL, (byte[]) null); + } + else { + kv.put(TEST_WATCH_KEY_1, "a"); + kv.put(TEST_WATCH_KEY_1, "aa"); + kv.put(TEST_WATCH_KEY_2, "z"); + kv.put(TEST_WATCH_KEY_2, "zz"); + kv.delete(TEST_WATCH_KEY_1); + kv.delete(TEST_WATCH_KEY_2); + kv.put(TEST_WATCH_KEY_1, "aaa"); + kv.put(TEST_WATCH_KEY_2, "zzz"); + } + + if (!watcher.beforeWatcher) { + sub = supplier.get(kv); + } + + sleep(1500); // give time for the watches to get messages + + validateWatcher(expectedKves, watcher); + //noinspection ConstantConditions + sub.unsubscribe(); + kvm.delete(bucket); + } + + private void validateWatcher(Object[] expectedKves, TestKeyValueWatcher watcher) { + assertEquals(expectedKves.length, watcher.entries.size()); + assertEquals(1, watcher.endOfDataReceived); + + if (expectedKves.length > 0) { + assertEquals(watcher.beforeWatcher, watcher.endBeforeEntries); + } + + int aix = 0; + ZonedDateTime lastCreated = ZonedDateTime.of(2000, 4, 1, 0, 0, 0, 0, ZoneId.systemDefault()); + long lastRevision = -1; + + for (KeyValueEntry kve : watcher.entries) { + assertTrue(kve.getCreated().isAfter(lastCreated) || kve.getCreated().isEqual(lastCreated)); + lastCreated = kve.getCreated(); + + assertTrue(lastRevision < kve.getRevision()); + lastRevision = kve.getRevision(); + + Object expected = expectedKves[aix++]; + if (expected == null) { + assertSame(KeyValueOperation.PUT, kve.getOperation()); + assertTrue(kve.getValue() == null || kve.getValue().length == 0); + assertEquals(0, kve.getDataLen()); + } + else if (expected instanceof String) { + assertSame(KeyValueOperation.PUT, kve.getOperation()); + String s = (String) expected; + if (watcher.metaOnly) { + assertTrue(kve.getValue() == null || kve.getValue().length == 0); + assertEquals(s.length(), kve.getDataLen()); + } + else { + assertNotNull(kve.getValue()); + assertEquals(s.length(), kve.getDataLen()); + assertEquals(s, kve.getValueAsString()); + } + } + else { + assertTrue(kve.getValue() == null || kve.getValue().length == 0); + assertEquals(0, kve.getDataLen()); + assertSame(expected, kve.getOperation()); + } + } + } + + @Test + public void testMirrorSourceBuilderPrefixConversion() throws Exception { + String bucket = unique(); + String name = unique(); + String kvName = "KV_" + name; + KeyValueConfiguration kvc = KeyValueConfiguration.builder() + .name(bucket) + .mirror(Mirror.builder().name(name).build()) + .build(); + assertEquals(kvName, kvc.getBackingConfig().getMirror().getName()); + + kvc = KeyValueConfiguration.builder() + .name(bucket) + .mirror(Mirror.builder().name(kvName).build()) + .build(); + assertEquals(kvName, kvc.getBackingConfig().getMirror().getName()); + + Source s1 = Source.builder().name("s1").build(); + Source s2 = Source.builder().name("s2").build(); + Source s3 = Source.builder().name("s3").build(); + Source s4 = Source.builder().name("s4").build(); + Source s5 = Source.builder().name("KV_s5").build(); + Source s6 = Source.builder().name("KV_s6").build(); + + kvc = KeyValueConfiguration.builder() + .name(bucket) + .sources(s3, s4) + .sources(Arrays.asList(s1, s2)) + .addSources(s1, s2) + .addSources(Arrays.asList(s1, s2, null)) + .addSources(s3, s4) + .addSource(null) + .addSource(s5) + .addSource(s5) + .addSources(s6) + .addSources((Source[])null) + .addSources((Collection)null) + .build(); + + assertEquals(6, kvc.getBackingConfig().getSources().size()); + List names = new ArrayList<>(); + for (Source source : kvc.getBackingConfig().getSources()) { + names.add(source.getName()); + } + assertTrue(names.contains("KV_s1")); + assertTrue(names.contains("KV_s2")); + assertTrue(names.contains("KV_s3")); + assertTrue(names.contains("KV_s4")); + assertTrue(names.contains("KV_s5")); + assertTrue(names.contains("KV_s6")); + } + + // ---------------------------------------------------------------------------------------------------- + // UTILITIES + // ---------------------------------------------------------------------------------------------------- + static TestsRunner testRunner; + static int port; + + @AfterAll + public static void afterAll() throws Exception { + testRunner.close(); + } + + @BeforeAll + public static void beforeAll() throws Exception { + testRunner = TestUtils2.startServer(); + port = testRunner.natsServerRunner.getPort(); + } + + @BeforeEach + public void beforeEach() throws Exception { + cleanupJs(); + } + + @AfterEach + public void afterEach() throws Exception { + cleanupJs(); + } + + private static void cleanupJs() { + try { + JetStreamManagement jsm = testRunner.nc.jetStreamManagement(); + List streams = jsm.getStreamNames(); + for (String s : streams) + { + jsm.deleteStream(s); + } + } catch (Exception ignore) {} + } + + private static NatsClient getNatsClient() { + return TestUtils2.natsClient(port); + } + + static class KvProxy { + final KeyValueManagement kvm; + final String bucket; + final KeyValueStatus status; + final NatsClient natsClient; + final NatsKeyValue kv; + final AtomicReference executionError = new AtomicReference<>(); + + public KvProxy() throws IOException, JetStreamApiException, InterruptedException { + this(unique(), b -> {}); + } + + public KvProxy(String bucketName) throws IOException, JetStreamApiException, InterruptedException { + this(bucketName, b -> {}); + } + + public KvProxy(java.util.function.Consumer customizer) throws IOException, JetStreamApiException, InterruptedException { + this(unique(), customizer); + } + + public KvProxy(String bucket, + java.util.function.Consumer customizer) + throws IOException, JetStreamApiException, InterruptedException + { + this.bucket = bucket; + KeyValueConfiguration.Builder builder = + KeyValueConfiguration.builder(this.bucket) + .storageType(StorageType.Memory); + + customizer.accept(builder); + + kvm = testRunner.nc.keyValueManagement(); + status = kvm.create(builder.build()); + + natsClient = getNatsClient(); + kv = keyValue(natsClient, this.bucket); + } + + public void assertThrew(Class clazz) { + assertNotNull(executionError.get()); + assertEquals(clazz, executionError.get().getClass()); + } + + private T execute(Supplier> supplier) throws InterruptedException { + executionError.set(null); + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference tRef = new AtomicReference<>(); + final Future f = supplier.get(); + f.onSuccess(event -> { + tRef.set(event); + latch.countDown(); + }).onFailure(event -> { + executionError.set(event); + latch.countDown(); + }); + if (!latch.await(1, TimeUnit.SECONDS)) { + executionError.set(new IOException("Execution Timeout")); + } + return tRef.get(); + } + + KeyValueEntry get(String key) throws InterruptedException { + return execute(() -> kv.get(key)); + } + + KeyValueEntry get(String key, long revision) throws InterruptedException { + return execute(() -> kv.get(key, revision)); + } + + Long put(String key, byte[] value) throws InterruptedException { + return execute(() -> kv.put(key, value)); + } + + Long put(String key, String value) throws InterruptedException { + return execute(() -> kv.put(key, value.getBytes(StandardCharsets.UTF_8))); + } + + Long put(String key, Number value) throws InterruptedException { + return execute(() -> kv.put(key, value.toString().getBytes(StandardCharsets.US_ASCII))); + } + + Long create(String key, byte[] value) throws InterruptedException { + return execute(() -> kv.create(key, value)); + } + + Long update(String key, byte[] value, long expectedRevision) throws InterruptedException { + return execute(() -> kv.update(key, value, expectedRevision)); + } + + NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws InterruptedException { + throw new RuntimeException("Not Implemented"); + } + + + NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws InterruptedException { + throw new RuntimeException("Not Implemented"); + } + + NatsKeyValueWatchSubscription watch(List keys, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws InterruptedException { + throw new RuntimeException("Not Implemented"); + } + + NatsKeyValueWatchSubscription watch(List keys, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws InterruptedException { + throw new RuntimeException("Not Implemented"); + } + + NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws InterruptedException { + throw new RuntimeException("Not Implemented"); + } + + NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws InterruptedException { + throw new RuntimeException("Not Implemented"); + } + + Long update(String key, String value, long expectedRevision) throws InterruptedException { + return execute(() -> kv.update(key, value, expectedRevision)); + } + + void delete(String key) throws InterruptedException { + execute(() -> kv.delete(key)); + } + + void delete(String key, long expectedRevision) throws InterruptedException { + execute(() -> kv.delete(key, expectedRevision)); + } + + void purge(String key) throws InterruptedException { + execute(() -> kv.purge(key)); + } + + void purge(String key, long expectedRevision) throws InterruptedException { + execute(() -> kv.purge(key, expectedRevision)); + } + + NatsKeyValue keyValue(NatsClient natsClient, String bucketName) throws InterruptedException { + return execute(() -> natsClient.keyValue(bucketName)); + } + + List keys() throws InterruptedException { + return execute(kv::keys); + } + + List keys(String filter) throws InterruptedException { + return execute(() -> kv.keys(filter)); + } + + List keys(List filters) throws InterruptedException { + return execute(() -> kv.keys(filters)); + } + + List history(String key) throws InterruptedException { + return execute(() -> kv.history(key)); + } + + void purgeDeletes() throws InterruptedException { + execute(kv::purgeDeletes); + } + + void purgeDeletes(KeyValuePurgeOptions options) throws InterruptedException { + execute(() -> kv.purgeDeletes(options)); + } + + KeyValueStatus getStatus() throws InterruptedException { + return execute(kv::getStatus); + } + } +} diff --git a/src/test/java/io/nats/vertx/NatsSimplePerfTest.java b/src/test/java/io/nats/vertx/NatsSimplePerfTest.java index d9e42b9..0ed72d3 100644 --- a/src/test/java/io/nats/vertx/NatsSimplePerfTest.java +++ b/src/test/java/io/nats/vertx/NatsSimplePerfTest.java @@ -1,12 +1,12 @@ package io.nats.vertx; +import io.nats.NatsServerRunner; import io.nats.client.*; import io.nats.client.api.StorageType; import io.nats.client.api.StreamConfiguration; import io.nats.client.api.StreamInfo; import io.vertx.core.Future; import io.vertx.core.Vertx; -import nats.io.NatsServerRunner; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -48,7 +48,7 @@ public void setup() throws Exception { Options.Builder builder = new Options.Builder().connectionTimeout(Duration.ofSeconds(5)) - .servers(new String[]{"localhost:" + port}); + .server("localhost:" + port); nc = Nats.connect(builder.build()); JetStreamManagement jsm = nc.jetStreamManagement(); StreamInfo streamInfo = null; @@ -202,7 +202,7 @@ private NatsClient getNatsClient() throws InterruptedException { final NatsOptions natsOptions = new NatsOptions(); natsOptions.setVertx(Vertx.vertx()); natsOptions.setNatsBuilder(new Options.Builder()); - natsOptions.getNatsBuilder().servers(new String[]{"localhost:" + port}).connectionListener(new ConnectionListener() { + natsOptions.getNatsBuilder().server("localhost:" + port).connectionListener(new ConnectionListener() { @Override public void connectionEvent(Connection conn, Events type) { System.out.println("Connection EVENT " + type); diff --git a/src/test/java/io/nats/vertx/NatsStreamTest.java b/src/test/java/io/nats/vertx/NatsStreamTest.java index 207e9c2..fb171eb 100644 --- a/src/test/java/io/nats/vertx/NatsStreamTest.java +++ b/src/test/java/io/nats/vertx/NatsStreamTest.java @@ -1,5 +1,6 @@ package io.nats.vertx; +import io.nats.NatsServerRunner; import io.nats.client.*; import io.nats.client.api.StorageType; import io.nats.client.api.StreamConfiguration; @@ -8,7 +9,6 @@ import io.nats.client.impl.NatsMessage; import io.vertx.core.Future; import io.vertx.core.Vertx; -import nats.io.NatsServerRunner; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/src/test/java/io/nats/vertx/TestUtils.java b/src/test/java/io/nats/vertx/TestUtils.java index e161c83..da3b23d 100644 --- a/src/test/java/io/nats/vertx/TestUtils.java +++ b/src/test/java/io/nats/vertx/TestUtils.java @@ -1,11 +1,11 @@ package io.nats.vertx; +import io.nats.NatsServerRunner; import io.nats.client.*; import io.nats.client.support.Status; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Vertx; -import nats.io.NatsServerRunner; import java.time.Duration; import java.util.concurrent.CountDownLatch; @@ -100,7 +100,7 @@ public static NatsClient natsClient(int port, Vertx vertx, Handler ex public static NatsServerRunner startServer() throws Exception { NatsServerRunner.setDefaultOutputLevel(Level.WARNING); - NatsServerRunner natsServerRunner = new NatsServerRunner(0, false, true); + NatsServerRunner natsServerRunner = new NatsServerRunner(-1, false, true); int port = natsServerRunner.getPort(); for (int i = 0; i < 100; i++) { diff --git a/src/test/java/io/nats/vertx/TestUtils2.java b/src/test/java/io/nats/vertx/TestUtils2.java new file mode 100644 index 0000000..dd11f07 --- /dev/null +++ b/src/test/java/io/nats/vertx/TestUtils2.java @@ -0,0 +1,137 @@ +package io.nats.vertx; + +import io.nats.ConsoleOutput; +import io.nats.NatsServerRunner; +import io.nats.client.*; +import io.vertx.core.Future; +import io.vertx.core.Handler; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; + +public class TestUtils2 { + + public static String unique() { + return new NUID().nextSequence(); + } + + public static String unique(int i) { + return unique() + i; + } + + public static void sleep(long ms) { + try { Thread.sleep(ms); } catch (InterruptedException ignored) { /* ignored */ } + } + + public static boolean waitUntilStatus(Connection conn, long millis, Connection.Status waitUntilStatus) { + long times = (millis + 99) / 100; + for (long x = 0; x < times; x++) { + sleep(100); + if (conn.getStatus() == waitUntilStatus) { + return true; + } + } + return false; + } + + public static NatsStream jetStream(NatsClient natsClient) { + return TestUtils.jetStream(natsClient); + } + + public static NatsClient natsClient(int port) { + return natsClient(port, e-> {}); + } + + public static NatsClient natsClient(int port, Handler exceptionHandler) { + final NatsOptions natsOptions = new NatsOptions(); + natsOptions.setExceptionHandler(exceptionHandler); + natsOptions.setNatsBuilder(new Options.Builder().connectionTimeout(Duration.ofSeconds(5))); + natsOptions.getNatsBuilder().server("localhost:" + port); + + final NatsClient natsClient = NatsClient.create(natsOptions); + final Future connect = natsClient.connect(); + + natsClient.setWriteQueueMaxSize(100); + natsClient.writeQueueFull(); + natsClient.drainHandler(event -> {}); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference error = new AtomicReference<>(); + connect.onSuccess(event -> { + latch.countDown(); + }).onFailure(event -> { + error.set(event); + latch.countDown(); + }); + try { + latch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (error.get() != null) { + throw new IllegalStateException(error.get()); + } + return natsClient; + } + + public static class TestsRunner { + NatsServerRunner natsServerRunner; + Connection nc; + int port; + + public TestsRunner(NatsServerRunner natsServerRunner) { + this.natsServerRunner = natsServerRunner; + port = natsServerRunner.getPort(); + } + + public void close() { + closeConnection(); + + try { + if (natsServerRunner != null) { + natsServerRunner.close(); + } + } + catch (Exception e) { + System.err.println("Closing Test Server Helper Runner: " + e); + } + } + + private void closeConnection() { + try { + if (nc != null) { + nc.close(); + } + } + catch (Exception e) { + System.err.println("Closing Test Server Helper Connection: " + e); + } + } + } + + public static TestsRunner startServer() throws Exception { + NatsServerRunner.setDefaultOutputSupplier(ConsoleOutput::new); + NatsServerRunner.setDefaultOutputLevel(Level.WARNING); + TestsRunner tr = new TestsRunner(new NatsServerRunner(-1, false, true)); + + for (int i = 0; i < 5; i++) { + Options.Builder builder = new Options.Builder() + .connectionTimeout(Duration.ofSeconds(5)) + .server("localhost:" + tr.port) + .errorListener(new ErrorListener() {}) + .connectionListener((conn, type) -> {}); + + Connection conn = Nats.connect(builder.build()); + if (waitUntilStatus(conn, 5000, Connection.Status.CONNECTED)) { + tr.nc = conn; + break; + } + conn.close(); + } + return tr; + } + +} diff --git a/src/test/java/io/nats/vertx/impl/SubscriptionReadStreamImplTest.java b/src/test/java/io/nats/vertx/impl/SubscriptionReadStreamImplTest.java index 0ed37e2..837e7d5 100644 --- a/src/test/java/io/nats/vertx/impl/SubscriptionReadStreamImplTest.java +++ b/src/test/java/io/nats/vertx/impl/SubscriptionReadStreamImplTest.java @@ -1,6 +1,7 @@ package io.nats.vertx.impl; +import io.nats.NatsServerRunner; import io.nats.client.*; import io.nats.client.api.StorageType; import io.nats.client.api.StreamConfiguration; @@ -10,7 +11,6 @@ import io.vertx.core.Vertx; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; -import nats.io.NatsServerRunner; import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; @@ -126,7 +126,7 @@ public void setupEach() throws Exception { subjectIndex++; Options.Builder builder = new Options.Builder().connectionTimeout(Duration.ofSeconds(5)) - .servers(new String[]{"localhost:" + port}); + .server("localhost:" + port); nc = Nats.connect(builder.build()); JetStreamManagement jsm = nc.jetStreamManagement(); StreamInfo streamInfo = null;