diff --git a/client/src/main/java/io/streamnative/oxia/client/batch/ReadBatch.java b/client/src/main/java/io/streamnative/oxia/client/batch/ReadBatch.java index 01b2130c..ba5c87e4 100644 --- a/client/src/main/java/io/streamnative/oxia/client/batch/ReadBatch.java +++ b/client/src/main/java/io/streamnative/oxia/client/batch/ReadBatch.java @@ -18,22 +18,25 @@ import static java.util.stream.Collectors.toList; import com.google.common.annotations.VisibleForTesting; +import io.grpc.stub.StreamObserver; import io.streamnative.oxia.client.grpc.OxiaStub; import io.streamnative.oxia.proto.GetResponse; import io.streamnative.oxia.proto.ReadRequest; +import io.streamnative.oxia.proto.ReadResponse; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; import lombok.NonNull; -import reactor.core.publisher.Flux; -final class ReadBatch extends BatchBase implements Batch { +final class ReadBatch extends BatchBase implements Batch, StreamObserver { private final ReadBatchFactory factory; @VisibleForTesting final List gets = new ArrayList<>(); + private int responseIndex = 0; + long startSendTimeNanos; + ReadBatch(ReadBatchFactory factory, Function stubByShardId, long shardId) { super(stubByShardId, shardId); this.factory = factory; @@ -57,31 +60,33 @@ public int size() { @Override public void send() { - long startSendTimeNanos = System.nanoTime(); - - LongAdder bytes = new LongAdder(); + startSendTimeNanos = System.nanoTime(); try { - var responses = - getStub() - .reactor() - .read(toProto()) - .flatMapSequential(response -> Flux.fromIterable(response.getGetsList())) - .doOnNext(r -> bytes.add(r.getValue().size())); - Flux.fromIterable(gets).zipWith(responses, this::complete).then().block(); - factory - .getReadRequestLatencyHistogram() - .recordSuccess(System.nanoTime() - startSendTimeNanos); - } catch (Throwable batchError) { - gets.forEach(g -> g.fail(batchError)); - factory - .getReadRequestLatencyHistogram() - .recordFailure(System.nanoTime() - startSendTimeNanos); + getStub().async().read(toProto(), this); + } catch (Throwable t) { + onError(t); } } - private boolean complete(Operation.ReadOperation.GetOperation operation, GetResponse response) { - operation.complete(response); - return true; + @Override + public void onNext(ReadResponse response) { + for (int i = 0; i < response.getGetsCount(); i++) { + GetResponse gr = response.getGets(i); + gets.get(responseIndex).complete(gr); + + ++responseIndex; + } + } + + @Override + public void onError(Throwable batchError) { + gets.forEach(g -> g.fail(batchError)); + factory.getReadRequestLatencyHistogram().recordFailure(System.nanoTime() - startSendTimeNanos); + } + + @Override + public void onCompleted() { + factory.getReadRequestLatencyHistogram().recordSuccess(System.nanoTime() - startSendTimeNanos); } @NonNull diff --git a/client/src/main/java/io/streamnative/oxia/client/batch/WriteBatch.java b/client/src/main/java/io/streamnative/oxia/client/batch/WriteBatch.java index f940ce77..92f74010 100644 --- a/client/src/main/java/io/streamnative/oxia/client/batch/WriteBatch.java +++ b/client/src/main/java/io/streamnative/oxia/client/batch/WriteBatch.java @@ -20,16 +20,18 @@ import static java.util.stream.Collectors.toList; import com.google.common.annotations.VisibleForTesting; +import io.grpc.stub.StreamObserver; import io.streamnative.oxia.client.grpc.OxiaStub; import io.streamnative.oxia.client.session.SessionManager; import io.streamnative.oxia.proto.WriteRequest; +import io.streamnative.oxia.proto.WriteResponse; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.function.Function; import lombok.NonNull; -final class WriteBatch extends BatchBase implements Batch { +final class WriteBatch extends BatchBase implements Batch, StreamObserver { private final WriteBatchFactory factory; @@ -47,6 +49,7 @@ final class WriteBatch extends BatchBase implements Batch { private boolean containsEphemeral; private int byteSize; private long bytes; + private long startSendTimeNanos; WriteBatch( @NonNull WriteBatchFactory factory, @@ -101,28 +104,42 @@ public int size() { @Override public void send() { - long startSendTimeNanos = System.nanoTime(); + startSendTimeNanos = System.nanoTime(); try { - var response = getStub().reactor().write(toProto()).block(); - factory.writeRequestLatencyHistogram.recordSuccess(System.nanoTime() - startSendTimeNanos); - - for (var i = 0; i < deletes.size(); i++) { - deletes.get(i).complete(response.getDeletes(i)); - } - for (var i = 0; i < deleteRanges.size(); i++) { - deleteRanges.get(i).complete(response.getDeleteRanges(i)); - } - for (var i = 0; i < puts.size(); i++) { - puts.get(i).complete(response.getPuts(i)); - } - } catch (Throwable batchError) { - factory.writeRequestLatencyHistogram.recordFailure(System.nanoTime() - startSendTimeNanos); - deletes.forEach(d -> d.fail(batchError)); - deleteRanges.forEach(f -> f.fail(batchError)); - puts.forEach(p -> p.fail(batchError)); + getStub().async().write(toProto(), this); + } catch (Throwable t) { + onError(t); } } + @Override + public void onNext(WriteResponse response) { + factory.writeRequestLatencyHistogram.recordSuccess(System.nanoTime() - startSendTimeNanos); + + for (var i = 0; i < deletes.size(); i++) { + deletes.get(i).complete(response.getDeletes(i)); + } + for (var i = 0; i < deleteRanges.size(); i++) { + deleteRanges.get(i).complete(response.getDeleteRanges(i)); + } + for (var i = 0; i < puts.size(); i++) { + puts.get(i).complete(response.getPuts(i)); + } + } + + @Override + public void onError(Throwable batchError) { + factory.writeRequestLatencyHistogram.recordFailure(System.nanoTime() - startSendTimeNanos); + deletes.forEach(d -> d.fail(batchError)); + deleteRanges.forEach(f -> f.fail(batchError)); + puts.forEach(p -> p.fail(batchError)); + } + + @Override + public void onCompleted() { + // Write is just single-rpc + } + @NonNull WriteRequest toProto() { Optional sessionInfo; diff --git a/client/src/main/java/io/streamnative/oxia/client/grpc/OxiaStub.java b/client/src/main/java/io/streamnative/oxia/client/grpc/OxiaStub.java index 28cafbc8..40ecae38 100644 --- a/client/src/main/java/io/streamnative/oxia/client/grpc/OxiaStub.java +++ b/client/src/main/java/io/streamnative/oxia/client/grpc/OxiaStub.java @@ -31,6 +31,8 @@ public class OxiaStub implements AutoCloseable { private final @NonNull OxiaClientGrpc.OxiaClientBlockingStub blockingStub; + private final @NonNull OxiaClientGrpc.OxiaClientStub asyncStub; + public OxiaStub(String address) { this(Grpc.newChannelBuilder(address, InsecureChannelCredentials.create()).build()); } @@ -39,6 +41,7 @@ public OxiaStub(ManagedChannel channel) { this.channel = channel; this.reactorStub = ReactorOxiaClientGrpc.newReactorStub(channel); this.blockingStub = OxiaClientGrpc.newBlockingStub(channel); + this.asyncStub = OxiaClientGrpc.newStub(channel); } public ReactorOxiaClientGrpc.ReactorOxiaClientStub reactor() { @@ -49,6 +52,10 @@ public OxiaClientGrpc.OxiaClientBlockingStub blocking() { return blockingStub; } + public OxiaClientGrpc.OxiaClientStub async() { + return asyncStub; + } + @Override public void close() throws Exception { channel.shutdown(); diff --git a/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java b/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java index b5d4d24c..f5186d9a 100644 --- a/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java @@ -64,6 +64,7 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Function; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -277,7 +278,11 @@ public void sendFailNoClient() { batch.send(); - assertThat(putCallable).isCompletedExceptionally(); + Awaitility.await() + .untilAsserted( + () -> { + assertThat(putCallable).isCompletedExceptionally(); + }); assertThatThrownBy(putCallable::get) .satisfies( e -> { @@ -382,7 +387,11 @@ public void sendFailNoClient() { batch.add(get); batch.send(); - assertThat(getCallable).isCompletedExceptionally(); + Awaitility.await() + .untilAsserted( + () -> { + assertThat(getCallable).isCompletedExceptionally(); + }); assertThatThrownBy(getCallable::get) .satisfies( e -> { diff --git a/perf/src/main/java/io/streamnative/oxia/client/perf/PerfClient.java b/perf/src/main/java/io/streamnative/oxia/client/perf/PerfClient.java index aad19326..0193e93d 100644 --- a/perf/src/main/java/io/streamnative/oxia/client/perf/PerfClient.java +++ b/perf/src/main/java/io/streamnative/oxia/client/perf/PerfClient.java @@ -21,7 +21,6 @@ import com.beust.jcommander.ParameterException; import com.google.common.util.concurrent.RateLimiter; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; -import io.streamnative.oxia.client.OxiaClientBuilderImpl; import io.streamnative.oxia.client.api.AsyncOxiaClient; import io.streamnative.oxia.client.api.OxiaClientBuilder; import java.time.Duration;