From 5c2df52664929ce1f6452c15f70b3c476ecd071d Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Sun, 26 Jan 2025 14:24:50 +0800 Subject: [PATCH 1/5] fix: range scan more than one onError callback --- .../oxia/client/AsyncOxiaClientImpl.java | 111 ++++++++++-------- .../oxia/client/AsyncOxiaClientImplTest.java | 107 ++++++++++++++++- 2 files changed, 170 insertions(+), 48 deletions(-) diff --git a/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java b/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java index cf956cd7..36b71e01 100644 --- a/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java +++ b/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java @@ -54,7 +54,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -684,13 +683,13 @@ public void rangeScan( @NonNull Set options) { gaugePendingRangeScanRequests.increment(); - RangeScanConsumerWithShard timedConsumer = - new RangeScanConsumerWithShard() { + final RangeScanConsumer timedConsumer = + new RangeScanConsumer() { final long startTime = System.nanoTime(); final AtomicLong totalSize = new AtomicLong(); @Override - public void onNext(long shardId, GetResult result) { + public void onNext(GetResult result) { totalSize.addAndGet(result.getValue().length); consumer.onNext(result); } @@ -703,7 +702,7 @@ public void onError(Throwable throwable) { } @Override - public void onCompleted(long shardId) { + public void onCompleted() { gaugePendingRangeScanRequests.decrement(); counterRangeScanBytes.add(totalSize.longValue()); histogramRangeScanLatency.recordSuccess(System.nanoTime() - startTime); @@ -731,20 +730,12 @@ public void onCompleted(long shardId) { } } - interface RangeScanConsumerWithShard { - void onNext(long shardId, GetResult result); - - void onError(Throwable throwable); - - void onCompleted(long shardId); - } - private void internalShardRangeScan( long shardId, String startKeyInclusive, String endKeyExclusive, Optional secondaryIndexName, - RangeScanConsumerWithShard consumer) { + RangeScanConsumer consumer) { var leader = shardManager.leader(shardId); var stub = stubManager.getStub(leader); var requestBuilder = @@ -760,22 +751,28 @@ private void internalShardRangeScan( .rangeScan( request, new StreamObserver<>() { + // using those two fields to help debug with heap dump + private boolean completed = false; + private Throwable completedException = null; + @Override public void onNext(RangeScanResponse response) { for (int i = 0; i < response.getRecordsCount(); i++) { - consumer.onNext( - shardId, ProtoUtil.getResultFromProto("", response.getRecords(i))); + consumer.onNext(ProtoUtil.getResultFromProto("", response.getRecords(i))); } } @Override public void onError(Throwable t) { + completed = true; + completedException = t; consumer.onError(t); } @Override public void onCompleted() { - consumer.onCompleted(shardId); + completed = true; + consumer.onCompleted(); } }); } @@ -784,42 +781,62 @@ private void internalRangeScanMultiShards( String startKeyInclusive, String endKeyExclusive, Optional secondaryIndexName, - RangeScanConsumerWithShard consumer) { - Set shardIds = shardManager.allShardIds(); + RangeScanConsumer consumer) { + final Set shardIds = shardManager.allShardIds(); + final RangeScanConsumer multiShardConsumer = new ShardRangeScanConsumer(shardIds.size(), consumer); + for (long shardId : shardIds) { + internalShardRangeScan( + shardId, startKeyInclusive, endKeyExclusive, secondaryIndexName, multiShardConsumer); + } + } - RangeScanConsumerWithShard multiShardConsumer = - new RangeScanConsumerWithShard() { - private final Set pendingShards = new HashSet<>(shardIds); - private boolean failed = false; + static class ShardRangeScanConsumer implements RangeScanConsumer { + private final RangeScanConsumer delegate; - @Override - public synchronized void onNext(long shardId, GetResult result) { - if (!failed) { - consumer.onNext(shardId, result); - } - } - @Override - public synchronized void onError(Throwable throwable) { - failed = true; - consumer.onError(throwable); - } + private int pendingCompletedRequests; + private boolean completed = false; + private Throwable completedException = null; - @Override - public synchronized void onCompleted(long shardId) { - if (!failed) { - pendingShards.remove(shardId); - if (pendingShards.isEmpty()) { - consumer.onCompleted(shardId); - } - } - } - }; + ShardRangeScanConsumer(int shards, RangeScanConsumer delegate) { + this.pendingCompletedRequests = shards; + this.delegate = delegate; + } - for (long shardId : shardIds) { - internalShardRangeScan( - shardId, startKeyInclusive, endKeyExclusive, secondaryIndexName, multiShardConsumer); + @Override + public synchronized void onNext(GetResult result) { + if (completed) { + return; + } + delegate.onNext(result); } + + @Override + public synchronized void onError(Throwable throwable) { + if (completedException == null) { + completedException = throwable; + } else { + completedException.addSuppressed(throwable); + } + if (completed) { + return; + } + completed = true; + delegate.onError(throwable); + } + + @Override + public synchronized void onCompleted() { + if (completed) { + return; + } + pendingCompletedRequests -= 1; + if (pendingCompletedRequests == 0) { + completed = true; + delegate.onCompleted(); + } + } + } @Override diff --git a/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java b/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java index 0f4bd322..a177ea1c 100644 --- a/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java @@ -32,6 +32,7 @@ import io.streamnative.oxia.client.api.DeleteOption; import io.streamnative.oxia.client.api.GetResult; import io.streamnative.oxia.client.api.PutResult; +import io.streamnative.oxia.client.api.RangeScanConsumer; import io.streamnative.oxia.client.api.Version; import io.streamnative.oxia.client.batch.BatchManager; import io.streamnative.oxia.client.batch.Batcher; @@ -49,10 +50,18 @@ import io.streamnative.oxia.proto.ListResponse; import io.streamnative.oxia.proto.OxiaClientGrpc; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -566,4 +575,100 @@ void close() throws Exception { inOrder.verify(stubManager).close(); client = null; } + + + @Test + void testShardShardRangeScanConsumer() { + final int shards = 5; + final List results = new ArrayList<>(); + final AtomicInteger onErrorCount = new AtomicInteger(0); + final AtomicInteger onCompletedCount = new AtomicInteger(0); + final Supplier newShardRangeScanConsumer = () -> new AsyncOxiaClientImpl.ShardRangeScanConsumer(5, new RangeScanConsumer() { + @Override + public void onNext(GetResult result) { + results.add(result); + } + + @Override + public void onError(Throwable throwable) { + onErrorCount.incrementAndGet(); + } + + @Override + public void onCompleted() { + onCompletedCount.incrementAndGet(); + } + }); + final var tasks = new ArrayList>(); + + // (1) complete ok + final var shardRangeScanConsumer1 = newShardRangeScanConsumer.get(); + for (int i = 0; i < shards; i++) { + final int fi = i; + final ForkJoinTask task = ForkJoinPool.commonPool().submit(() -> { + shardRangeScanConsumer1.onNext(new GetResult("shard-" + fi + "-0", + new byte[10], + new Version(1, 2, 3, 4, empty(), empty()))); + shardRangeScanConsumer1.onNext(new GetResult("shard-" + fi + "-1", + new byte[10], + new Version(1, 2, 3, 4, empty(), empty()))); + shardRangeScanConsumer1.onCompleted(); + }); + tasks.add(task); + } + tasks.forEach(ForkJoinTask::join); + var keys = results.stream().map(GetResult::getKey).toList(); + for (int i = 0; i < shards; i++) { + Assertions.assertTrue(keys.contains("shard-" + i + "-0")); + Assertions.assertTrue(keys.contains("shard-" + i + "-1")); + } + Assertions.assertEquals(0, onErrorCount.get()); + Assertions.assertEquals(1, onCompletedCount.get()); + + tasks.clear(); + onErrorCount.set(0); + onCompletedCount.set(0); + results.clear(); + + + // (2) complete partial exception + final var shardRangeScanConsumer2 = newShardRangeScanConsumer.get(); + for (int i = 0; i < shards; i++) { + final int fi = i; + final ForkJoinTask task = ForkJoinPool.commonPool().submit(() -> { + if (fi %2 == 0) { + shardRangeScanConsumer2.onError(new IllegalStateException()); + return; + } + shardRangeScanConsumer2.onNext(new GetResult("shard-" + fi + "-0", + new byte[10], + new Version(1, 2, 3, 4, empty(), empty()))); + shardRangeScanConsumer2.onNext(new GetResult("shard-" + fi + "-1", + new byte[10], + new Version(1, 2, 3, 4, empty(), empty()))); + shardRangeScanConsumer2.onCompleted(); + }); + tasks.add(task); + } + tasks.forEach(ForkJoinTask::join); + + Assertions.assertEquals(1, onErrorCount.get()); + Assertions.assertEquals(0, onCompletedCount.get()); + + tasks.clear(); + onErrorCount.set(0); + onCompletedCount.set(0); + results.clear(); + + // (3) complete all exception + final var shardRangeScanConsumer3 = newShardRangeScanConsumer.get(); + for (int i = 0; i < shards; i++) { + final ForkJoinTask task = ForkJoinPool.commonPool().submit(() -> shardRangeScanConsumer3.onError(new IllegalStateException())); + tasks.add(task); + } + tasks.forEach(ForkJoinTask::join); + Assertions.assertEquals(1, onErrorCount.get()); + Assertions.assertEquals(0, onCompletedCount.get()); + Assertions.assertEquals(0, results.size()); + } } From cb76088c17ab359f6b0b694cc6c86862d31e1d73 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Sun, 26 Jan 2025 14:48:33 +0800 Subject: [PATCH 2/5] fix spotless --- .../oxia/client/AsyncOxiaClientImpl.java | 5 +- .../oxia/client/AsyncOxiaClientImplTest.java | 100 +++++++++++------- 2 files changed, 61 insertions(+), 44 deletions(-) diff --git a/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java b/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java index 36b71e01..c657d35b 100644 --- a/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java +++ b/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java @@ -783,7 +783,8 @@ private void internalRangeScanMultiShards( Optional secondaryIndexName, RangeScanConsumer consumer) { final Set shardIds = shardManager.allShardIds(); - final RangeScanConsumer multiShardConsumer = new ShardRangeScanConsumer(shardIds.size(), consumer); + final RangeScanConsumer multiShardConsumer = + new ShardRangeScanConsumer(shardIds.size(), consumer); for (long shardId : shardIds) { internalShardRangeScan( shardId, startKeyInclusive, endKeyExclusive, secondaryIndexName, multiShardConsumer); @@ -793,7 +794,6 @@ private void internalRangeScanMultiShards( static class ShardRangeScanConsumer implements RangeScanConsumer { private final RangeScanConsumer delegate; - private int pendingCompletedRequests; private boolean completed = false; private Throwable completedException = null; @@ -836,7 +836,6 @@ public synchronized void onCompleted() { delegate.onCompleted(); } } - } @Override diff --git a/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java b/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java index a177ea1c..7d1b0220 100644 --- a/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java @@ -576,44 +576,54 @@ void close() throws Exception { client = null; } - @Test void testShardShardRangeScanConsumer() { final int shards = 5; final List results = new ArrayList<>(); final AtomicInteger onErrorCount = new AtomicInteger(0); final AtomicInteger onCompletedCount = new AtomicInteger(0); - final Supplier newShardRangeScanConsumer = () -> new AsyncOxiaClientImpl.ShardRangeScanConsumer(5, new RangeScanConsumer() { - @Override - public void onNext(GetResult result) { - results.add(result); - } - - @Override - public void onError(Throwable throwable) { - onErrorCount.incrementAndGet(); - } - - @Override - public void onCompleted() { - onCompletedCount.incrementAndGet(); - } - }); + final Supplier newShardRangeScanConsumer = + () -> + new AsyncOxiaClientImpl.ShardRangeScanConsumer( + 5, + new RangeScanConsumer() { + @Override + public void onNext(GetResult result) { + results.add(result); + } + + @Override + public void onError(Throwable throwable) { + onErrorCount.incrementAndGet(); + } + + @Override + public void onCompleted() { + onCompletedCount.incrementAndGet(); + } + }); final var tasks = new ArrayList>(); // (1) complete ok final var shardRangeScanConsumer1 = newShardRangeScanConsumer.get(); for (int i = 0; i < shards; i++) { final int fi = i; - final ForkJoinTask task = ForkJoinPool.commonPool().submit(() -> { - shardRangeScanConsumer1.onNext(new GetResult("shard-" + fi + "-0", - new byte[10], - new Version(1, 2, 3, 4, empty(), empty()))); - shardRangeScanConsumer1.onNext(new GetResult("shard-" + fi + "-1", - new byte[10], - new Version(1, 2, 3, 4, empty(), empty()))); - shardRangeScanConsumer1.onCompleted(); - }); + final ForkJoinTask task = + ForkJoinPool.commonPool() + .submit( + () -> { + shardRangeScanConsumer1.onNext( + new GetResult( + "shard-" + fi + "-0", + new byte[10], + new Version(1, 2, 3, 4, empty(), empty()))); + shardRangeScanConsumer1.onNext( + new GetResult( + "shard-" + fi + "-1", + new byte[10], + new Version(1, 2, 3, 4, empty(), empty()))); + shardRangeScanConsumer1.onCompleted(); + }); tasks.add(task); } tasks.forEach(ForkJoinTask::join); @@ -630,24 +640,30 @@ public void onCompleted() { onCompletedCount.set(0); results.clear(); - // (2) complete partial exception final var shardRangeScanConsumer2 = newShardRangeScanConsumer.get(); for (int i = 0; i < shards; i++) { final int fi = i; - final ForkJoinTask task = ForkJoinPool.commonPool().submit(() -> { - if (fi %2 == 0) { - shardRangeScanConsumer2.onError(new IllegalStateException()); - return; - } - shardRangeScanConsumer2.onNext(new GetResult("shard-" + fi + "-0", - new byte[10], - new Version(1, 2, 3, 4, empty(), empty()))); - shardRangeScanConsumer2.onNext(new GetResult("shard-" + fi + "-1", - new byte[10], - new Version(1, 2, 3, 4, empty(), empty()))); - shardRangeScanConsumer2.onCompleted(); - }); + final ForkJoinTask task = + ForkJoinPool.commonPool() + .submit( + () -> { + if (fi % 2 == 0) { + shardRangeScanConsumer2.onError(new IllegalStateException()); + return; + } + shardRangeScanConsumer2.onNext( + new GetResult( + "shard-" + fi + "-0", + new byte[10], + new Version(1, 2, 3, 4, empty(), empty()))); + shardRangeScanConsumer2.onNext( + new GetResult( + "shard-" + fi + "-1", + new byte[10], + new Version(1, 2, 3, 4, empty(), empty()))); + shardRangeScanConsumer2.onCompleted(); + }); tasks.add(task); } tasks.forEach(ForkJoinTask::join); @@ -663,7 +679,9 @@ public void onCompleted() { // (3) complete all exception final var shardRangeScanConsumer3 = newShardRangeScanConsumer.get(); for (int i = 0; i < shards; i++) { - final ForkJoinTask task = ForkJoinPool.commonPool().submit(() -> shardRangeScanConsumer3.onError(new IllegalStateException())); + final ForkJoinTask task = + ForkJoinPool.commonPool() + .submit(() -> shardRangeScanConsumer3.onError(new IllegalStateException())); tasks.add(task); } tasks.forEach(ForkJoinTask::join); From 340cc9758a718d201fb3cd7f9cf3ac82fdf676fe Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Sun, 26 Jan 2025 14:51:11 +0800 Subject: [PATCH 3/5] fix typo --- .../io/streamnative/oxia/client/AsyncOxiaClientImpl.java | 6 +++--- .../streamnative/oxia/client/AsyncOxiaClientImplTest.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java b/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java index c657d35b..9faa82bc 100644 --- a/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java +++ b/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java @@ -784,21 +784,21 @@ private void internalRangeScanMultiShards( RangeScanConsumer consumer) { final Set shardIds = shardManager.allShardIds(); final RangeScanConsumer multiShardConsumer = - new ShardRangeScanConsumer(shardIds.size(), consumer); + new ShardingRangeScanConsumer(shardIds.size(), consumer); for (long shardId : shardIds) { internalShardRangeScan( shardId, startKeyInclusive, endKeyExclusive, secondaryIndexName, multiShardConsumer); } } - static class ShardRangeScanConsumer implements RangeScanConsumer { + static class ShardingRangeScanConsumer implements RangeScanConsumer { private final RangeScanConsumer delegate; private int pendingCompletedRequests; private boolean completed = false; private Throwable completedException = null; - ShardRangeScanConsumer(int shards, RangeScanConsumer delegate) { + ShardingRangeScanConsumer(int shards, RangeScanConsumer delegate) { this.pendingCompletedRequests = shards; this.delegate = delegate; } diff --git a/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java b/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java index 7d1b0220..65ce1d7f 100644 --- a/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java @@ -584,7 +584,7 @@ void testShardShardRangeScanConsumer() { final AtomicInteger onCompletedCount = new AtomicInteger(0); final Supplier newShardRangeScanConsumer = () -> - new AsyncOxiaClientImpl.ShardRangeScanConsumer( + new AsyncOxiaClientImpl.ShardingRangeScanConsumer( 5, new RangeScanConsumer() { @Override From 49707d31c5acf5396c20317dc0125918ff1b9ecc Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Sun, 26 Jan 2025 14:51:57 +0800 Subject: [PATCH 4/5] fix typo --- .../io/streamnative/oxia/client/AsyncOxiaClientImpl.java | 6 +++--- .../streamnative/oxia/client/AsyncOxiaClientImplTest.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java b/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java index 9faa82bc..58a221bf 100644 --- a/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java +++ b/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java @@ -784,21 +784,21 @@ private void internalRangeScanMultiShards( RangeScanConsumer consumer) { final Set shardIds = shardManager.allShardIds(); final RangeScanConsumer multiShardConsumer = - new ShardingRangeScanConsumer(shardIds.size(), consumer); + new SharedRangeScanConsumer(shardIds.size(), consumer); for (long shardId : shardIds) { internalShardRangeScan( shardId, startKeyInclusive, endKeyExclusive, secondaryIndexName, multiShardConsumer); } } - static class ShardingRangeScanConsumer implements RangeScanConsumer { + static class SharedRangeScanConsumer implements RangeScanConsumer { private final RangeScanConsumer delegate; private int pendingCompletedRequests; private boolean completed = false; private Throwable completedException = null; - ShardingRangeScanConsumer(int shards, RangeScanConsumer delegate) { + SharedRangeScanConsumer(int shards, RangeScanConsumer delegate) { this.pendingCompletedRequests = shards; this.delegate = delegate; } diff --git a/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java b/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java index 65ce1d7f..a1b2db0e 100644 --- a/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java @@ -584,7 +584,7 @@ void testShardShardRangeScanConsumer() { final AtomicInteger onCompletedCount = new AtomicInteger(0); final Supplier newShardRangeScanConsumer = () -> - new AsyncOxiaClientImpl.ShardingRangeScanConsumer( + new AsyncOxiaClientImpl.SharedRangeScanConsumer( 5, new RangeScanConsumer() { @Override From 2880efeb29919b3565644b76c921269fd2f61d4b Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Sun, 26 Jan 2025 15:00:30 +0800 Subject: [PATCH 5/5] remove unused field --- .../io/streamnative/oxia/client/AsyncOxiaClientImpl.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java b/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java index 58a221bf..37792924 100644 --- a/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java +++ b/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java @@ -751,10 +751,6 @@ private void internalShardRangeScan( .rangeScan( request, new StreamObserver<>() { - // using those two fields to help debug with heap dump - private boolean completed = false; - private Throwable completedException = null; - @Override public void onNext(RangeScanResponse response) { for (int i = 0; i < response.getRecordsCount(); i++) { @@ -764,14 +760,11 @@ public void onNext(RangeScanResponse response) { @Override public void onError(Throwable t) { - completed = true; - completedException = t; consumer.onError(t); } @Override public void onCompleted() { - completed = true; consumer.onCompleted(); } });