Skip to content

Commit

Permalink
Key Value Implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Dec 10, 2024
1 parent 39db172 commit ed10ce9
Show file tree
Hide file tree
Showing 20 changed files with 2,254 additions and 100 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ and any optional configuration options. Here is an example:
```java
// Set options
final NatsOptions natsOptions = new NatsOptions();
natsOptions.getNatsBuilder().servers(new String[]{"localhost:" + port});
natsOptions.getNatsBuilder().server("localhost:" + port);

// Create client
final NatsClient natsClient = NatsClient.create(natsOptions);
Expand Down
7 changes: 5 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,24 @@ java {

repositories {
mavenCentral()
maven {
url "https://oss.sonatype.org/content/repositories/releases"
}
maven {
url "https://oss.sonatype.org/content/repositories/snapshots"
}
}

dependencies {

implementation 'io.nats:jnats:2.17.2'
implementation 'io.nats:jnats:2.20.5'
implementation("com.fasterxml.jackson.core:jackson-core:2.14.2")
implementation("io.netty:netty-handler:4.1.97.Final")
implementation(platform("io.vertx:vertx-stack-depchain:4.5.1"))
implementation("io.vertx:vertx-core")
implementation("io.netty:netty-resolver-dns-native-macos:4.1.80.Final:osx-x86_64")
testImplementation("io.vertx:vertx-junit5")
testImplementation 'io.nats:jnats-server-runner:1.2.5'
testImplementation 'io.nats:jnats-server-runner:2.0.0'
}

test {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/impl/VertxDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class VertxDispatcher extends NatsDispatcher {

VertxDispatcher(NatsConnection conn, MessageHandler handler, ContextInternal context) {
super(conn, handler);
vertxIncoming = new VertxDispatcherMessageQueue(this, context);
vertxIncoming = new VertxDispatcherMessageQueue(this, context, conn);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ public class VertxDispatcherMessageQueue extends MessageQueue {
private final VertxDispatcher dispatcher;
private final ContextInternal context;

VertxDispatcherMessageQueue(VertxDispatcher dispatcher, ContextInternal context) {
super(true);
VertxDispatcherMessageQueue(VertxDispatcher dispatcher, ContextInternal context, NatsConnection conn) {
super(true, conn.getOptions().getRequestCleanupInterval());
this.dispatcher = dispatcher;
this.context = context;
}
Expand Down Expand Up @@ -65,11 +65,6 @@ void poisonTheQueue() {
throw new IllegalStateException("poisonTheQueue not used.");
}

@Override
boolean offer(NatsMessage msg) {
throw new IllegalStateException("offer not used.");
}

@Override
NatsMessage poll(Duration timeout) throws InterruptedException {
return super.poll(timeout);
Expand Down
28 changes: 18 additions & 10 deletions src/main/java/io/nats/vertx/NatsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import io.nats.client.*;
import io.nats.client.impl.Headers;
import io.nats.vertx.impl.NatsClientImpl;
import io.vertx.core.*;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.streams.WriteStream;

import java.time.Duration;
Expand Down Expand Up @@ -42,6 +44,21 @@ static NatsClient create(final NatsOptions natsOptions) {
*/
Future<NatsStream> jetStream(JetStreamOptions options);

/**
* Get interface to Key Value.
* @param bucketName the bucket name
* @return Key Value instance
*/
Future<NatsKeyValue> keyValue(String bucketName);

/**
* Get interface to Key Value.
* @param bucketName the bucket name
* @param options KeyValue options.
* @return Key Value instance
*/
Future<NatsKeyValue> keyValue(String bucketName, KeyValueOptions options);

/**
* Drain handler.
* @param handler the handler
Expand Down Expand Up @@ -137,9 +154,6 @@ static NatsClient create(final NatsOptions natsOptions) {
*/
Future<Message> request(String subject, byte[] message);




/**
* Send a request. The returned future will be completed when the
* response comes back.
Expand All @@ -151,7 +165,6 @@ static NatsClient create(final NatsOptions natsOptions) {
*/
Future<Message> request(String subject, Headers headers, byte[] body);


/**
* Send a request. The returned future will be completed when the
* response comes back.
Expand All @@ -164,7 +177,6 @@ static NatsClient create(final NatsOptions natsOptions) {
*/
Future<Message> requestWithTimeout(String subject, Headers headers, byte[] body, Duration timeout);


/**
*
* Send request.
Expand Down Expand Up @@ -199,7 +211,6 @@ static NatsClient create(final NatsOptions natsOptions) {
*/
Future<Message> request(String subject, byte[] message, Duration timeout);


/**
* Send a message to the specified subject. The message body <strong>will
* not</strong> be copied. The expected usage with string content is something
Expand Down Expand Up @@ -259,7 +270,6 @@ static NatsClient create(final NatsOptions natsOptions) {
*/
Future<Void> subscribe(String subject, Handler<Message> handler);


/**
*
* Subscribe to subject.
Expand Down Expand Up @@ -290,6 +300,4 @@ static NatsClient create(final NatsOptions natsOptions) {
default Future<Void> end() {
return close();
}


}
Loading

0 comments on commit ed10ce9

Please sign in to comment.