From 8cbd39f0846c565efcaaeb3cf04b029aead107ed Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Wed, 5 Mar 2025 20:00:53 +0800 Subject: [PATCH 1/3] feat: support IncludeValue for get operation See https://github.com/streamnative/oxia/pull/636 --- .../oxia/client/api/GetOption.java | 15 +++++- .../oxia/client/api/OptionIncludeValue.java | 5 ++ .../oxia/client/it/OxiaClientIT.java | 54 +++++++++++++++++-- .../oxia/client/AsyncOxiaClientImpl.java | 22 ++++---- .../oxia/client/batch/Operation.java | 8 +-- .../oxia/client/options/GetOptions.java | 42 +++++++++++++++ .../oxia/client/batch/BatchTest.java | 3 +- .../oxia/client/batch/BatcherTest.java | 13 ++--- .../oxia/client/batch/OperationTest.java | 3 +- 9 files changed, 136 insertions(+), 29 deletions(-) create mode 100644 client-api/src/main/java/io/streamnative/oxia/client/api/OptionIncludeValue.java create mode 100644 client/src/main/java/io/streamnative/oxia/client/options/GetOptions.java diff --git a/client-api/src/main/java/io/streamnative/oxia/client/api/GetOption.java b/client-api/src/main/java/io/streamnative/oxia/client/api/GetOption.java index 8dbad02d..de194680 100644 --- a/client-api/src/main/java/io/streamnative/oxia/client/api/GetOption.java +++ b/client-api/src/main/java/io/streamnative/oxia/client/api/GetOption.java @@ -17,7 +17,7 @@ import lombok.NonNull; -public sealed interface GetOption permits OptionComparisonType, OptionPartitionKey { +public sealed interface GetOption permits OptionComparisonType, OptionIncludeValue, OptionPartitionKey { /** ComparisonEqual sets the Get() operation to compare the stored key for equality. */ GetOption ComparisonEqual = new OptionComparisonType(OptionComparisonType.ComparisonType.Equal); @@ -59,4 +59,17 @@ public sealed interface GetOption permits OptionComparisonType, OptionPartitionK static GetOption PartitionKey(@NonNull String partitionKey) { return new OptionPartitionKey(partitionKey); } + + /** + * Creates and returns a GetOption that specifies whether to include a value. + * This method is used to configure whether a value should be included in the operation. + * + * @param includeValue A boolean flag indicating whether the value should be included. + * - true: The value will be included. + * - false: The value will not be included. + * @return A GetOption instance representing the include value setting. + */ + static GetOption IncludeValue(boolean includeValue) { + return new OptionIncludeValue(includeValue); + } } diff --git a/client-api/src/main/java/io/streamnative/oxia/client/api/OptionIncludeValue.java b/client-api/src/main/java/io/streamnative/oxia/client/api/OptionIncludeValue.java new file mode 100644 index 00000000..c2bf6bb1 --- /dev/null +++ b/client-api/src/main/java/io/streamnative/oxia/client/api/OptionIncludeValue.java @@ -0,0 +1,5 @@ +package io.streamnative.oxia.client.api; + +public record OptionIncludeValue(boolean includeValue) implements GetOption{ + +} diff --git a/client-it/src/test/java/io/streamnative/oxia/client/it/OxiaClientIT.java b/client-it/src/test/java/io/streamnative/oxia/client/it/OxiaClientIT.java index 6ac1ce54..2be164ea 100644 --- a/client-it/src/test/java/io/streamnative/oxia/client/it/OxiaClientIT.java +++ b/client-it/src/test/java/io/streamnative/oxia/client/it/OxiaClientIT.java @@ -49,20 +49,20 @@ import io.streamnative.oxia.client.api.RangeScanOption; import io.streamnative.oxia.client.api.SyncOxiaClient; import io.streamnative.oxia.client.api.exceptions.KeyAlreadyExistsException; +import io.streamnative.oxia.client.api.exceptions.OxiaException; import io.streamnative.oxia.client.api.exceptions.UnexpectedVersionIdException; import io.streamnative.oxia.testcontainers.OxiaContainer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Queue; -import java.util.Set; + +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import lombok.Cleanup; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.testcontainers.containers.output.Slf4jLogConsumer; @@ -660,4 +660,48 @@ void testSecondaryIndex() throws Exception { .toList(); assertThat(list).containsExactly("si-a", "si-c"); } + + + @Test + @SneakyThrows + void testGetIncludeValue() { + @Cleanup + final SyncOxiaClient client = OxiaClientBuilder.create(oxia.getServiceAddress()).syncClient(); + + final String key = "stream"; + + final List keys = new ArrayList<>(); + PutResult putResult = client.put(key, UUID.randomUUID().toString().getBytes(), + Set.of( + PutOption.PartitionKey(key), + PutOption.SequenceKeysDeltas(List.of(1L)) + ) + ); + keys.add(putResult.key()); + putResult = client.put(key, UUID.randomUUID().toString().getBytes(), + Set.of( + PutOption.PartitionKey(key), + PutOption.SequenceKeysDeltas(List.of(1L)) + ) + ); + keys.add(putResult.key()); + + + for (String subKey : keys) { + GetResult result = client.get(subKey, Set.of(GetOption.PartitionKey(key), GetOption.IncludeValue(true))); + Assertions.assertNotNull(result.getValue()); + + result = client.get(subKey, Set.of(GetOption.PartitionKey(key), GetOption.IncludeValue(false))); + Assertions.assertEquals(result.getValue().length, 0); + } + + var result = client.get(keys.get(0), Set.of(GetOption.PartitionKey(key), GetOption.IncludeValue(false), GetOption.ComparisonHigher)); + Assertions.assertEquals(result.getValue().length, 0); + Assertions.assertEquals(result.getKey(), keys.get(1)); + + + result = client.get(keys.get(1), Set.of(GetOption.PartitionKey(key), GetOption.IncludeValue(false), GetOption.ComparisonLower)); + Assertions.assertEquals(result.getValue().length, 0); + Assertions.assertEquals(result.getKey(), keys.get(0)); + } } diff --git a/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java b/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java index 37792924..968cf3a2 100644 --- a/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java +++ b/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java @@ -44,6 +44,7 @@ import io.streamnative.oxia.client.metrics.Unit; import io.streamnative.oxia.client.metrics.UpDownCounter; import io.streamnative.oxia.client.notify.NotificationManager; +import io.streamnative.oxia.client.options.GetOptions; import io.streamnative.oxia.client.session.SessionManager; import io.streamnative.oxia.client.shard.ShardManager; import io.streamnative.oxia.proto.KeyComparisonType; @@ -464,13 +465,14 @@ private CompletableFuture internalPut( @Override public @NonNull CompletableFuture get(String key, Set options) { + final GetOptions internalOptions = GetOptions.parseFrom(options); long startTime = System.nanoTime(); gaugePendingGetRequests.increment(); var callback = new CompletableFuture(); try { checkIfClosed(); Objects.requireNonNull(key); - internalGet(key, options, callback); + internalGet(key, internalOptions, callback); } catch (RuntimeException e) { callback.completeExceptionally(e); } @@ -491,25 +493,23 @@ private CompletableFuture internalPut( } private void internalGet( - String key, Set options, CompletableFuture result) { - KeyComparisonType comparisonType = OptionsUtils.getComparisonType(options); - Optional partitionKey = OptionsUtils.getPartitionKey(options); - if (comparisonType == KeyComparisonType.EQUAL || partitionKey.isPresent()) { + String key, GetOptions options, CompletableFuture result) { + if (options.comparisonType() == KeyComparisonType.EQUAL || options.partitionKey() != null) { // Single shard get operation - long shardId = shardManager.getShardForKey(partitionKey.orElse(key)); - readBatchManager.getBatcher(shardId).add(new GetOperation(result, key, comparisonType)); + long shardId = shardManager.getShardForKey(Optional.ofNullable(options.partitionKey()).orElse(key)); + readBatchManager.getBatcher(shardId).add(new GetOperation(result, key, options)); } else { - internalGetFloorCeiling(key, comparisonType, result); + internalGetFloorCeiling(key, options, result); } } private void internalGetFloorCeiling( - String key, KeyComparisonType comparisonType, CompletableFuture result) { + String key, GetOptions options, CompletableFuture result) { // We need check on all the shards for a floor/ceiling query List> futures = new ArrayList<>(); for (long shardId : shardManager.allShardIds()) { CompletableFuture f = new CompletableFuture<>(); - readBatchManager.getBatcher(shardId).add(new GetOperation(f, key, comparisonType)); + readBatchManager.getBatcher(shardId).add(new GetOperation(f, key, options)); futures.add(f); } @@ -535,7 +535,7 @@ private void internalGetFloorCeiling( } GetResult gr = - switch (comparisonType) { + switch (options.comparisonType()) { case EQUAL, UNRECOGNIZED -> null; // This would be handled withing context of single // shard diff --git a/client/src/main/java/io/streamnative/oxia/client/batch/Operation.java b/client/src/main/java/io/streamnative/oxia/client/batch/Operation.java index fdb5d1a6..f113ee23 100644 --- a/client/src/main/java/io/streamnative/oxia/client/batch/Operation.java +++ b/client/src/main/java/io/streamnative/oxia/client/batch/Operation.java @@ -31,13 +31,13 @@ import io.streamnative.oxia.client.api.exceptions.KeyAlreadyExistsException; import io.streamnative.oxia.client.api.exceptions.SessionDoesNotExistException; import io.streamnative.oxia.client.api.exceptions.UnexpectedVersionIdException; +import io.streamnative.oxia.client.options.GetOptions; import io.streamnative.oxia.proto.DeleteRangeRequest; import io.streamnative.oxia.proto.DeleteRangeResponse; import io.streamnative.oxia.proto.DeleteRequest; import io.streamnative.oxia.proto.DeleteResponse; import io.streamnative.oxia.proto.GetRequest; import io.streamnative.oxia.proto.GetResponse; -import io.streamnative.oxia.proto.KeyComparisonType; import io.streamnative.oxia.proto.PutRequest; import io.streamnative.oxia.proto.PutResponse; import io.streamnative.oxia.proto.Status; @@ -61,13 +61,13 @@ sealed interface ReadOperation extends Operation permits GetOperation { record GetOperation( @NonNull CompletableFuture callback, @NonNull String key, - KeyComparisonType comparisonType) + @NonNull GetOptions options) implements ReadOperation { GetRequest toProto() { return GetRequest.newBuilder() .setKey(key) - .setComparisonType(comparisonType) - .setIncludeValue(true) + .setComparisonType(options.comparisonType()) + .setIncludeValue(options.includeValue()) .build(); } diff --git a/client/src/main/java/io/streamnative/oxia/client/options/GetOptions.java b/client/src/main/java/io/streamnative/oxia/client/options/GetOptions.java new file mode 100644 index 00000000..c24fe647 --- /dev/null +++ b/client/src/main/java/io/streamnative/oxia/client/options/GetOptions.java @@ -0,0 +1,42 @@ +package io.streamnative.oxia.client.options; + +import io.streamnative.oxia.client.api.GetOption; +import io.streamnative.oxia.client.api.OptionComparisonType; +import io.streamnative.oxia.client.api.OptionIncludeValue; +import io.streamnative.oxia.client.api.OptionPartitionKey; +import io.streamnative.oxia.proto.KeyComparisonType; + +import java.util.Set; + + +public record GetOptions(String partitionKey, boolean includeValue, + KeyComparisonType comparisonType) { + + + public static GetOptions parseFrom(Set options) { + boolean includeValue = true; + KeyComparisonType comparisonType = KeyComparisonType.EQUAL; + String partitionKey = null; + for (GetOption option : options) { + if (option instanceof OptionIncludeValue) { + includeValue = ((OptionIncludeValue) option).includeValue(); + continue; + } + if (option instanceof OptionComparisonType) { + comparisonType = switch (((OptionComparisonType) option).comparisonType()) { + case Floor -> KeyComparisonType.FLOOR; + case Lower -> KeyComparisonType.LOWER; + case Higher -> KeyComparisonType.HIGHER; + case Ceiling -> KeyComparisonType.CEILING; + default -> KeyComparisonType.EQUAL; + }; + continue; + } + if (option instanceof OptionPartitionKey) { + partitionKey = ((OptionPartitionKey) option).partitionKey(); + continue; + } + } + return new GetOptions(partitionKey, includeValue, comparisonType); + } +} diff --git a/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java b/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java index 2dcc1c03..65589812 100644 --- a/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java @@ -47,6 +47,7 @@ import io.streamnative.oxia.client.batch.Operation.WriteOperation.PutOperation; import io.streamnative.oxia.client.grpc.*; import io.streamnative.oxia.client.metrics.InstrumentProvider; +import io.streamnative.oxia.client.options.GetOptions; import io.streamnative.oxia.client.session.Session; import io.streamnative.oxia.client.session.SessionManager; import io.streamnative.oxia.client.shard.NoShardAvailableException; @@ -393,7 +394,7 @@ public void shardId() { class ReadBatchTests { ReadBatch batch; CompletableFuture getCallable = new CompletableFuture<>(); - GetOperation get = new GetOperation(getCallable, "", KeyComparisonType.EQUAL); + GetOperation get = new GetOperation(getCallable, "", new GetOptions(null, true, KeyComparisonType.EQUAL)); @BeforeEach void setup() { diff --git a/client/src/test/java/io/streamnative/oxia/client/batch/BatcherTest.java b/client/src/test/java/io/streamnative/oxia/client/batch/BatcherTest.java index 629e6eda..22bf0ae8 100644 --- a/client/src/test/java/io/streamnative/oxia/client/batch/BatcherTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/batch/BatcherTest.java @@ -30,6 +30,7 @@ import io.streamnative.oxia.client.api.GetResult; import io.streamnative.oxia.client.api.PutResult; import io.streamnative.oxia.client.batch.Operation.ReadOperation.GetOperation; +import io.streamnative.oxia.client.options.GetOptions; import io.streamnative.oxia.client.util.BatchedArrayBlockingQueue; import io.streamnative.oxia.proto.KeyComparisonType; import java.nio.charset.StandardCharsets; @@ -87,7 +88,7 @@ void teardown() { @Test void createBatchAndAdd() throws Exception { var callback = new CompletableFuture(); - Operation op = new GetOperation(callback, "key", KeyComparisonType.EQUAL); + Operation op = new GetOperation(callback, "key", new GetOptions(null, true, KeyComparisonType.EQUAL)); when(batchFactory.getBatch(shardId)).thenReturn(batch); when(batch.size()).thenReturn(1); when(batch.canAdd(any())).thenReturn(true); @@ -99,7 +100,7 @@ void createBatchAndAdd() throws Exception { @Test void sendBatchOnFull() throws Exception { var callback = new CompletableFuture(); - Operation op = new GetOperation(callback, "key", KeyComparisonType.EQUAL); + Operation op = new GetOperation(callback, "key", new GetOptions(null, true,KeyComparisonType.EQUAL)); when(batchFactory.getBatch(shardId)).thenReturn(batch); when(batch.size()).thenReturn(config.maxRequestsPerBatch()); when(batch.canAdd(any())).thenReturn(true); @@ -136,7 +137,7 @@ void addWhenNextDoesNotFit() { @Test void sendBatchOnFullThenNewBatch() throws Exception { var callback = new CompletableFuture(); - Operation op = new GetOperation(callback, "key", KeyComparisonType.EQUAL); + Operation op = new GetOperation(callback, "key", new GetOptions(null, true,KeyComparisonType.EQUAL)); when(batchFactory.getBatch(shardId)).thenReturn(batch); when(batch.size()).thenReturn(config.maxRequestsPerBatch(), 1); when(batch.canAdd(any())).thenReturn(true); @@ -154,7 +155,7 @@ void sendBatchOnFullThenNewBatch() throws Exception { @Test void sendBatchOnLingerExpiration() throws Exception { var callback = new CompletableFuture(); - Operation op = new GetOperation(callback, "key", KeyComparisonType.EQUAL); + Operation op = new GetOperation(callback, "key", new GetOptions(null, true,KeyComparisonType.EQUAL)); when(batchFactory.getBatch(shardId)).thenReturn(batch); when(batch.size()).thenReturn(1); when(batch.canAdd(any())).thenReturn(true); @@ -167,7 +168,7 @@ void sendBatchOnLingerExpiration() throws Exception { @Test void sendBatchOnLingerExpirationMulti() throws Exception { var callback = new CompletableFuture(); - Operation op = new GetOperation(callback, "key", KeyComparisonType.EQUAL); + Operation op = new GetOperation(callback, "key", new GetOptions(null, true,KeyComparisonType.EQUAL)); when(batchFactory.getBatch(shardId)).thenReturn(batch); when(batch.size()).thenReturn(1); when(batch.canAdd(any())).thenReturn(true); @@ -183,7 +184,7 @@ void sendBatchOnLingerExpirationMulti() throws Exception { @Test void unboundedTakeAtStart() throws Exception { var callback = new CompletableFuture(); - Operation op = new GetOperation(callback, "key", KeyComparisonType.EQUAL); + Operation op = new GetOperation(callback, "key", new GetOptions(null, true,KeyComparisonType.EQUAL)); when(batchFactory.getBatch(shardId)).thenReturn(batch); when(batch.size()).thenReturn(1); diff --git a/client/src/test/java/io/streamnative/oxia/client/batch/OperationTest.java b/client/src/test/java/io/streamnative/oxia/client/batch/OperationTest.java index 6c6cd249..574b9853 100644 --- a/client/src/test/java/io/streamnative/oxia/client/batch/OperationTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/batch/OperationTest.java @@ -35,6 +35,7 @@ import io.streamnative.oxia.client.batch.Operation.WriteOperation.DeleteOperation; import io.streamnative.oxia.client.batch.Operation.WriteOperation.DeleteRangeOperation; import io.streamnative.oxia.client.batch.Operation.WriteOperation.PutOperation; +import io.streamnative.oxia.client.options.GetOptions; import io.streamnative.oxia.proto.DeleteRangeResponse; import io.streamnative.oxia.proto.DeleteResponse; import io.streamnative.oxia.proto.GetResponse; @@ -61,7 +62,7 @@ class OperationTest { class GetOperationTests { CompletableFuture callback = new CompletableFuture<>(); - GetOperation op = new GetOperation(callback, "key", KeyComparisonType.EQUAL); + GetOperation op = new GetOperation(callback, "key", new GetOptions(null, true, KeyComparisonType.EQUAL)); @Test void toProto() { From e1f7e69026b045f14b395252aa0064533004f61d Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Wed, 5 Mar 2025 20:01:39 +0800 Subject: [PATCH 2/3] spotless --- .../oxia/client/api/GetOption.java | 12 ++--- .../oxia/client/api/OptionIncludeValue.java | 4 +- .../oxia/client/it/OxiaClientIT.java | 49 +++++++++++-------- .../oxia/client/AsyncOxiaClientImpl.java | 6 +-- .../oxia/client/options/GetOptions.java | 22 ++++----- .../oxia/client/batch/BatchTest.java | 3 +- .../oxia/client/batch/BatcherTest.java | 18 ++++--- .../oxia/client/batch/OperationTest.java | 3 +- 8 files changed, 64 insertions(+), 53 deletions(-) diff --git a/client-api/src/main/java/io/streamnative/oxia/client/api/GetOption.java b/client-api/src/main/java/io/streamnative/oxia/client/api/GetOption.java index de194680..d87bba6d 100644 --- a/client-api/src/main/java/io/streamnative/oxia/client/api/GetOption.java +++ b/client-api/src/main/java/io/streamnative/oxia/client/api/GetOption.java @@ -17,7 +17,8 @@ import lombok.NonNull; -public sealed interface GetOption permits OptionComparisonType, OptionIncludeValue, OptionPartitionKey { +public sealed interface GetOption + permits OptionComparisonType, OptionIncludeValue, OptionPartitionKey { /** ComparisonEqual sets the Get() operation to compare the stored key for equality. */ GetOption ComparisonEqual = new OptionComparisonType(OptionComparisonType.ComparisonType.Equal); @@ -61,12 +62,11 @@ static GetOption PartitionKey(@NonNull String partitionKey) { } /** - * Creates and returns a GetOption that specifies whether to include a value. - * This method is used to configure whether a value should be included in the operation. + * Creates and returns a GetOption that specifies whether to include a value. This method is used + * to configure whether a value should be included in the operation. * - * @param includeValue A boolean flag indicating whether the value should be included. - * - true: The value will be included. - * - false: The value will not be included. + * @param includeValue A boolean flag indicating whether the value should be included. - true: The + * value will be included. - false: The value will not be included. * @return A GetOption instance representing the include value setting. */ static GetOption IncludeValue(boolean includeValue) { diff --git a/client-api/src/main/java/io/streamnative/oxia/client/api/OptionIncludeValue.java b/client-api/src/main/java/io/streamnative/oxia/client/api/OptionIncludeValue.java index c2bf6bb1..141337b8 100644 --- a/client-api/src/main/java/io/streamnative/oxia/client/api/OptionIncludeValue.java +++ b/client-api/src/main/java/io/streamnative/oxia/client/api/OptionIncludeValue.java @@ -1,5 +1,3 @@ package io.streamnative.oxia.client.api; -public record OptionIncludeValue(boolean includeValue) implements GetOption{ - -} +public record OptionIncludeValue(boolean includeValue) implements GetOption {} diff --git a/client-it/src/test/java/io/streamnative/oxia/client/it/OxiaClientIT.java b/client-it/src/test/java/io/streamnative/oxia/client/it/OxiaClientIT.java index 2be164ea..8e371377 100644 --- a/client-it/src/test/java/io/streamnative/oxia/client/it/OxiaClientIT.java +++ b/client-it/src/test/java/io/streamnative/oxia/client/it/OxiaClientIT.java @@ -49,10 +49,8 @@ import io.streamnative.oxia.client.api.RangeScanOption; import io.streamnative.oxia.client.api.SyncOxiaClient; import io.streamnative.oxia.client.api.exceptions.KeyAlreadyExistsException; -import io.streamnative.oxia.client.api.exceptions.OxiaException; import io.streamnative.oxia.client.api.exceptions.UnexpectedVersionIdException; import io.streamnative.oxia.testcontainers.OxiaContainer; - import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; @@ -661,7 +659,6 @@ void testSecondaryIndex() throws Exception { assertThat(list).containsExactly("si-a", "si-c"); } - @Test @SneakyThrows void testGetIncludeValue() { @@ -671,36 +668,46 @@ void testGetIncludeValue() { final String key = "stream"; final List keys = new ArrayList<>(); - PutResult putResult = client.put(key, UUID.randomUUID().toString().getBytes(), - Set.of( - PutOption.PartitionKey(key), - PutOption.SequenceKeysDeltas(List.of(1L)) - ) - ); + PutResult putResult = + client.put( + key, + UUID.randomUUID().toString().getBytes(), + Set.of(PutOption.PartitionKey(key), PutOption.SequenceKeysDeltas(List.of(1L)))); keys.add(putResult.key()); - putResult = client.put(key, UUID.randomUUID().toString().getBytes(), - Set.of( - PutOption.PartitionKey(key), - PutOption.SequenceKeysDeltas(List.of(1L)) - ) - ); + putResult = + client.put( + key, + UUID.randomUUID().toString().getBytes(), + Set.of(PutOption.PartitionKey(key), PutOption.SequenceKeysDeltas(List.of(1L)))); keys.add(putResult.key()); - for (String subKey : keys) { - GetResult result = client.get(subKey, Set.of(GetOption.PartitionKey(key), GetOption.IncludeValue(true))); + GetResult result = + client.get(subKey, Set.of(GetOption.PartitionKey(key), GetOption.IncludeValue(true))); Assertions.assertNotNull(result.getValue()); - result = client.get(subKey, Set.of(GetOption.PartitionKey(key), GetOption.IncludeValue(false))); + result = + client.get(subKey, Set.of(GetOption.PartitionKey(key), GetOption.IncludeValue(false))); Assertions.assertEquals(result.getValue().length, 0); } - var result = client.get(keys.get(0), Set.of(GetOption.PartitionKey(key), GetOption.IncludeValue(false), GetOption.ComparisonHigher)); + var result = + client.get( + keys.get(0), + Set.of( + GetOption.PartitionKey(key), + GetOption.IncludeValue(false), + GetOption.ComparisonHigher)); Assertions.assertEquals(result.getValue().length, 0); Assertions.assertEquals(result.getKey(), keys.get(1)); - - result = client.get(keys.get(1), Set.of(GetOption.PartitionKey(key), GetOption.IncludeValue(false), GetOption.ComparisonLower)); + result = + client.get( + keys.get(1), + Set.of( + GetOption.PartitionKey(key), + GetOption.IncludeValue(false), + GetOption.ComparisonLower)); Assertions.assertEquals(result.getValue().length, 0); Assertions.assertEquals(result.getKey(), keys.get(0)); } diff --git a/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java b/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java index 968cf3a2..605a2379 100644 --- a/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java +++ b/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java @@ -492,11 +492,11 @@ private CompletableFuture internalPut( }); } - private void internalGet( - String key, GetOptions options, CompletableFuture result) { + private void internalGet(String key, GetOptions options, CompletableFuture result) { if (options.comparisonType() == KeyComparisonType.EQUAL || options.partitionKey() != null) { // Single shard get operation - long shardId = shardManager.getShardForKey(Optional.ofNullable(options.partitionKey()).orElse(key)); + long shardId = + shardManager.getShardForKey(Optional.ofNullable(options.partitionKey()).orElse(key)); readBatchManager.getBatcher(shardId).add(new GetOperation(result, key, options)); } else { internalGetFloorCeiling(key, options, result); diff --git a/client/src/main/java/io/streamnative/oxia/client/options/GetOptions.java b/client/src/main/java/io/streamnative/oxia/client/options/GetOptions.java index c24fe647..ed8edd98 100644 --- a/client/src/main/java/io/streamnative/oxia/client/options/GetOptions.java +++ b/client/src/main/java/io/streamnative/oxia/client/options/GetOptions.java @@ -5,13 +5,10 @@ import io.streamnative.oxia.client.api.OptionIncludeValue; import io.streamnative.oxia.client.api.OptionPartitionKey; import io.streamnative.oxia.proto.KeyComparisonType; - import java.util.Set; - -public record GetOptions(String partitionKey, boolean includeValue, - KeyComparisonType comparisonType) { - +public record GetOptions( + String partitionKey, boolean includeValue, KeyComparisonType comparisonType) { public static GetOptions parseFrom(Set options) { boolean includeValue = true; @@ -23,13 +20,14 @@ public static GetOptions parseFrom(Set options) { continue; } if (option instanceof OptionComparisonType) { - comparisonType = switch (((OptionComparisonType) option).comparisonType()) { - case Floor -> KeyComparisonType.FLOOR; - case Lower -> KeyComparisonType.LOWER; - case Higher -> KeyComparisonType.HIGHER; - case Ceiling -> KeyComparisonType.CEILING; - default -> KeyComparisonType.EQUAL; - }; + comparisonType = + switch (((OptionComparisonType) option).comparisonType()) { + case Floor -> KeyComparisonType.FLOOR; + case Lower -> KeyComparisonType.LOWER; + case Higher -> KeyComparisonType.HIGHER; + case Ceiling -> KeyComparisonType.CEILING; + default -> KeyComparisonType.EQUAL; + }; continue; } if (option instanceof OptionPartitionKey) { diff --git a/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java b/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java index 65589812..1bbf7fdc 100644 --- a/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java @@ -394,7 +394,8 @@ public void shardId() { class ReadBatchTests { ReadBatch batch; CompletableFuture getCallable = new CompletableFuture<>(); - GetOperation get = new GetOperation(getCallable, "", new GetOptions(null, true, KeyComparisonType.EQUAL)); + GetOperation get = + new GetOperation(getCallable, "", new GetOptions(null, true, KeyComparisonType.EQUAL)); @BeforeEach void setup() { diff --git a/client/src/test/java/io/streamnative/oxia/client/batch/BatcherTest.java b/client/src/test/java/io/streamnative/oxia/client/batch/BatcherTest.java index 22bf0ae8..6e2bd7bf 100644 --- a/client/src/test/java/io/streamnative/oxia/client/batch/BatcherTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/batch/BatcherTest.java @@ -88,7 +88,8 @@ void teardown() { @Test void createBatchAndAdd() throws Exception { var callback = new CompletableFuture(); - Operation op = new GetOperation(callback, "key", new GetOptions(null, true, KeyComparisonType.EQUAL)); + Operation op = + new GetOperation(callback, "key", new GetOptions(null, true, KeyComparisonType.EQUAL)); when(batchFactory.getBatch(shardId)).thenReturn(batch); when(batch.size()).thenReturn(1); when(batch.canAdd(any())).thenReturn(true); @@ -100,7 +101,8 @@ void createBatchAndAdd() throws Exception { @Test void sendBatchOnFull() throws Exception { var callback = new CompletableFuture(); - Operation op = new GetOperation(callback, "key", new GetOptions(null, true,KeyComparisonType.EQUAL)); + Operation op = + new GetOperation(callback, "key", new GetOptions(null, true, KeyComparisonType.EQUAL)); when(batchFactory.getBatch(shardId)).thenReturn(batch); when(batch.size()).thenReturn(config.maxRequestsPerBatch()); when(batch.canAdd(any())).thenReturn(true); @@ -137,7 +139,8 @@ void addWhenNextDoesNotFit() { @Test void sendBatchOnFullThenNewBatch() throws Exception { var callback = new CompletableFuture(); - Operation op = new GetOperation(callback, "key", new GetOptions(null, true,KeyComparisonType.EQUAL)); + Operation op = + new GetOperation(callback, "key", new GetOptions(null, true, KeyComparisonType.EQUAL)); when(batchFactory.getBatch(shardId)).thenReturn(batch); when(batch.size()).thenReturn(config.maxRequestsPerBatch(), 1); when(batch.canAdd(any())).thenReturn(true); @@ -155,7 +158,8 @@ void sendBatchOnFullThenNewBatch() throws Exception { @Test void sendBatchOnLingerExpiration() throws Exception { var callback = new CompletableFuture(); - Operation op = new GetOperation(callback, "key", new GetOptions(null, true,KeyComparisonType.EQUAL)); + Operation op = + new GetOperation(callback, "key", new GetOptions(null, true, KeyComparisonType.EQUAL)); when(batchFactory.getBatch(shardId)).thenReturn(batch); when(batch.size()).thenReturn(1); when(batch.canAdd(any())).thenReturn(true); @@ -168,7 +172,8 @@ void sendBatchOnLingerExpiration() throws Exception { @Test void sendBatchOnLingerExpirationMulti() throws Exception { var callback = new CompletableFuture(); - Operation op = new GetOperation(callback, "key", new GetOptions(null, true,KeyComparisonType.EQUAL)); + Operation op = + new GetOperation(callback, "key", new GetOptions(null, true, KeyComparisonType.EQUAL)); when(batchFactory.getBatch(shardId)).thenReturn(batch); when(batch.size()).thenReturn(1); when(batch.canAdd(any())).thenReturn(true); @@ -184,7 +189,8 @@ void sendBatchOnLingerExpirationMulti() throws Exception { @Test void unboundedTakeAtStart() throws Exception { var callback = new CompletableFuture(); - Operation op = new GetOperation(callback, "key", new GetOptions(null, true,KeyComparisonType.EQUAL)); + Operation op = + new GetOperation(callback, "key", new GetOptions(null, true, KeyComparisonType.EQUAL)); when(batchFactory.getBatch(shardId)).thenReturn(batch); when(batch.size()).thenReturn(1); diff --git a/client/src/test/java/io/streamnative/oxia/client/batch/OperationTest.java b/client/src/test/java/io/streamnative/oxia/client/batch/OperationTest.java index 574b9853..b9dadfd3 100644 --- a/client/src/test/java/io/streamnative/oxia/client/batch/OperationTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/batch/OperationTest.java @@ -62,7 +62,8 @@ class OperationTest { class GetOperationTests { CompletableFuture callback = new CompletableFuture<>(); - GetOperation op = new GetOperation(callback, "key", new GetOptions(null, true, KeyComparisonType.EQUAL)); + GetOperation op = + new GetOperation(callback, "key", new GetOptions(null, true, KeyComparisonType.EQUAL)); @Test void toProto() { From a3c83d00149065bf5f081f1ca51438d0082890d3 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Wed, 5 Mar 2025 20:01:50 +0800 Subject: [PATCH 3/3] format license --- .../streamnative/oxia/client/api/GetOption.java | 2 +- .../oxia/client/api/OptionIncludeValue.java | 15 +++++++++++++++ .../streamnative/oxia/client/batch/Operation.java | 2 +- .../oxia/client/options/GetOptions.java | 15 +++++++++++++++ .../oxia/client/batch/OperationTest.java | 2 +- 5 files changed, 33 insertions(+), 3 deletions(-) diff --git a/client-api/src/main/java/io/streamnative/oxia/client/api/GetOption.java b/client-api/src/main/java/io/streamnative/oxia/client/api/GetOption.java index d87bba6d..9e67afae 100644 --- a/client-api/src/main/java/io/streamnative/oxia/client/api/GetOption.java +++ b/client-api/src/main/java/io/streamnative/oxia/client/api/GetOption.java @@ -1,5 +1,5 @@ /* - * Copyright © 2022-2024 StreamNative Inc. + * Copyright © 2022-2025 StreamNative Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/client-api/src/main/java/io/streamnative/oxia/client/api/OptionIncludeValue.java b/client-api/src/main/java/io/streamnative/oxia/client/api/OptionIncludeValue.java index 141337b8..6c4d3f3a 100644 --- a/client-api/src/main/java/io/streamnative/oxia/client/api/OptionIncludeValue.java +++ b/client-api/src/main/java/io/streamnative/oxia/client/api/OptionIncludeValue.java @@ -1,3 +1,18 @@ +/* + * Copyright © 2022-2025 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.streamnative.oxia.client.api; public record OptionIncludeValue(boolean includeValue) implements GetOption {} diff --git a/client/src/main/java/io/streamnative/oxia/client/batch/Operation.java b/client/src/main/java/io/streamnative/oxia/client/batch/Operation.java index f113ee23..58bfb434 100644 --- a/client/src/main/java/io/streamnative/oxia/client/batch/Operation.java +++ b/client/src/main/java/io/streamnative/oxia/client/batch/Operation.java @@ -1,5 +1,5 @@ /* - * Copyright © 2022-2024 StreamNative Inc. + * Copyright © 2022-2025 StreamNative Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/client/src/main/java/io/streamnative/oxia/client/options/GetOptions.java b/client/src/main/java/io/streamnative/oxia/client/options/GetOptions.java index ed8edd98..c769b1dd 100644 --- a/client/src/main/java/io/streamnative/oxia/client/options/GetOptions.java +++ b/client/src/main/java/io/streamnative/oxia/client/options/GetOptions.java @@ -1,3 +1,18 @@ +/* + * Copyright © 2022-2025 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.streamnative.oxia.client.options; import io.streamnative.oxia.client.api.GetOption; diff --git a/client/src/test/java/io/streamnative/oxia/client/batch/OperationTest.java b/client/src/test/java/io/streamnative/oxia/client/batch/OperationTest.java index b9dadfd3..8c3c17e4 100644 --- a/client/src/test/java/io/streamnative/oxia/client/batch/OperationTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/batch/OperationTest.java @@ -1,5 +1,5 @@ /* - * Copyright © 2022-2024 StreamNative Inc. + * Copyright © 2022-2025 StreamNative Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.