Skip to content

Commit

Permalink
Allow to have more than a single outstanding batch (#145)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored May 2, 2024
1 parent f3f6a31 commit 246a919
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,25 @@
import static java.util.stream.Collectors.toList;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.stub.StreamObserver;
import io.streamnative.oxia.client.grpc.OxiaStub;
import io.streamnative.oxia.proto.GetResponse;
import io.streamnative.oxia.proto.ReadRequest;
import io.streamnative.oxia.proto.ReadResponse;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import lombok.NonNull;
import reactor.core.publisher.Flux;

final class ReadBatch extends BatchBase implements Batch {
final class ReadBatch extends BatchBase implements Batch, StreamObserver<ReadResponse> {

private final ReadBatchFactory factory;

@VisibleForTesting final List<Operation.ReadOperation.GetOperation> gets = new ArrayList<>();

private int responseIndex = 0;
long startSendTimeNanos;

ReadBatch(ReadBatchFactory factory, Function<Long, OxiaStub> stubByShardId, long shardId) {
super(stubByShardId, shardId);
this.factory = factory;
Expand All @@ -57,31 +60,33 @@ public int size() {

@Override
public void send() {
long startSendTimeNanos = System.nanoTime();

LongAdder bytes = new LongAdder();
startSendTimeNanos = System.nanoTime();
try {
var responses =
getStub()
.reactor()
.read(toProto())
.flatMapSequential(response -> Flux.fromIterable(response.getGetsList()))
.doOnNext(r -> bytes.add(r.getValue().size()));
Flux.fromIterable(gets).zipWith(responses, this::complete).then().block();
factory
.getReadRequestLatencyHistogram()
.recordSuccess(System.nanoTime() - startSendTimeNanos);
} catch (Throwable batchError) {
gets.forEach(g -> g.fail(batchError));
factory
.getReadRequestLatencyHistogram()
.recordFailure(System.nanoTime() - startSendTimeNanos);
getStub().async().read(toProto(), this);
} catch (Throwable t) {
onError(t);
}
}

private boolean complete(Operation.ReadOperation.GetOperation operation, GetResponse response) {
operation.complete(response);
return true;
@Override
public void onNext(ReadResponse response) {
for (int i = 0; i < response.getGetsCount(); i++) {
GetResponse gr = response.getGets(i);
gets.get(responseIndex).complete(gr);

++responseIndex;
}
}

@Override
public void onError(Throwable batchError) {
gets.forEach(g -> g.fail(batchError));
factory.getReadRequestLatencyHistogram().recordFailure(System.nanoTime() - startSendTimeNanos);
}

@Override
public void onCompleted() {
factory.getReadRequestLatencyHistogram().recordSuccess(System.nanoTime() - startSendTimeNanos);
}

@NonNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@
import static java.util.stream.Collectors.toList;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.stub.StreamObserver;
import io.streamnative.oxia.client.grpc.OxiaStub;
import io.streamnative.oxia.client.session.SessionManager;
import io.streamnative.oxia.proto.WriteRequest;
import io.streamnative.oxia.proto.WriteResponse;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import lombok.NonNull;

final class WriteBatch extends BatchBase implements Batch {
final class WriteBatch extends BatchBase implements Batch, StreamObserver<WriteResponse> {

private final WriteBatchFactory factory;

Expand All @@ -47,6 +49,7 @@ final class WriteBatch extends BatchBase implements Batch {
private boolean containsEphemeral;
private int byteSize;
private long bytes;
private long startSendTimeNanos;

WriteBatch(
@NonNull WriteBatchFactory factory,
Expand Down Expand Up @@ -101,28 +104,42 @@ public int size() {

@Override
public void send() {
long startSendTimeNanos = System.nanoTime();
startSendTimeNanos = System.nanoTime();
try {
var response = getStub().reactor().write(toProto()).block();
factory.writeRequestLatencyHistogram.recordSuccess(System.nanoTime() - startSendTimeNanos);

for (var i = 0; i < deletes.size(); i++) {
deletes.get(i).complete(response.getDeletes(i));
}
for (var i = 0; i < deleteRanges.size(); i++) {
deleteRanges.get(i).complete(response.getDeleteRanges(i));
}
for (var i = 0; i < puts.size(); i++) {
puts.get(i).complete(response.getPuts(i));
}
} catch (Throwable batchError) {
factory.writeRequestLatencyHistogram.recordFailure(System.nanoTime() - startSendTimeNanos);
deletes.forEach(d -> d.fail(batchError));
deleteRanges.forEach(f -> f.fail(batchError));
puts.forEach(p -> p.fail(batchError));
getStub().async().write(toProto(), this);
} catch (Throwable t) {
onError(t);
}
}

@Override
public void onNext(WriteResponse response) {
factory.writeRequestLatencyHistogram.recordSuccess(System.nanoTime() - startSendTimeNanos);

for (var i = 0; i < deletes.size(); i++) {
deletes.get(i).complete(response.getDeletes(i));
}
for (var i = 0; i < deleteRanges.size(); i++) {
deleteRanges.get(i).complete(response.getDeleteRanges(i));
}
for (var i = 0; i < puts.size(); i++) {
puts.get(i).complete(response.getPuts(i));
}
}

@Override
public void onError(Throwable batchError) {
factory.writeRequestLatencyHistogram.recordFailure(System.nanoTime() - startSendTimeNanos);
deletes.forEach(d -> d.fail(batchError));
deleteRanges.forEach(f -> f.fail(batchError));
puts.forEach(p -> p.fail(batchError));
}

@Override
public void onCompleted() {
// Write is just single-rpc
}

@NonNull
WriteRequest toProto() {
Optional<Operation.WriteOperation.PutOperation.SessionInfo> sessionInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class OxiaStub implements AutoCloseable {

private final @NonNull OxiaClientGrpc.OxiaClientBlockingStub blockingStub;

private final @NonNull OxiaClientGrpc.OxiaClientStub asyncStub;

public OxiaStub(String address) {
this(Grpc.newChannelBuilder(address, InsecureChannelCredentials.create()).build());
}
Expand All @@ -39,6 +41,7 @@ public OxiaStub(ManagedChannel channel) {
this.channel = channel;
this.reactorStub = ReactorOxiaClientGrpc.newReactorStub(channel);
this.blockingStub = OxiaClientGrpc.newBlockingStub(channel);
this.asyncStub = OxiaClientGrpc.newStub(channel);
}

public ReactorOxiaClientGrpc.ReactorOxiaClientStub reactor() {
Expand All @@ -49,6 +52,10 @@ public OxiaClientGrpc.OxiaClientBlockingStub blocking() {
return blockingStub;
}

public OxiaClientGrpc.OxiaClientStub async() {
return asyncStub;
}

@Override
public void close() throws Exception {
channel.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
Expand Down Expand Up @@ -277,7 +278,11 @@ public void sendFailNoClient() {

batch.send();

assertThat(putCallable).isCompletedExceptionally();
Awaitility.await()
.untilAsserted(
() -> {
assertThat(putCallable).isCompletedExceptionally();
});
assertThatThrownBy(putCallable::get)
.satisfies(
e -> {
Expand Down Expand Up @@ -382,7 +387,11 @@ public void sendFailNoClient() {
batch.add(get);
batch.send();

assertThat(getCallable).isCompletedExceptionally();
Awaitility.await()
.untilAsserted(
() -> {
assertThat(getCallable).isCompletedExceptionally();
});
assertThatThrownBy(getCallable::get)
.satisfies(
e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.beust.jcommander.ParameterException;
import com.google.common.util.concurrent.RateLimiter;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.streamnative.oxia.client.OxiaClientBuilderImpl;
import io.streamnative.oxia.client.api.AsyncOxiaClient;
import io.streamnative.oxia.client.api.OxiaClientBuilder;
import java.time.Duration;
Expand Down

0 comments on commit 246a919

Please sign in to comment.