Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support IncludeValue for get operation #207

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,8 @@

import lombok.NonNull;

public sealed interface GetOption permits OptionComparisonType, OptionPartitionKey {
public sealed interface GetOption
permits OptionComparisonType, OptionIncludeValue, OptionPartitionKey {

/** ComparisonEqual sets the Get() operation to compare the stored key for equality. */
GetOption ComparisonEqual = new OptionComparisonType(OptionComparisonType.ComparisonType.Equal);
Expand Down Expand Up @@ -59,4 +60,16 @@ public sealed interface GetOption permits OptionComparisonType, OptionPartitionK
static GetOption PartitionKey(@NonNull String partitionKey) {
return new OptionPartitionKey(partitionKey);
}

/**
* Creates and returns a GetOption that specifies whether to include a value. This method is used
* to configure whether a value should be included in the operation.
*
* @param includeValue A boolean flag indicating whether the value should be included. - true: The
* value will be included. - false: The value will not be included.
* @return A GetOption instance representing the include value setting.
*/
static GetOption IncludeValue(boolean includeValue) {
return new OptionIncludeValue(includeValue);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use 2 static objects here instead of allocating new object each time

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oxia.client.api;

public record OptionIncludeValue(boolean includeValue) implements GetOption {}
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,16 @@
import io.streamnative.oxia.client.api.exceptions.KeyAlreadyExistsException;
import io.streamnative.oxia.client.api.exceptions.UnexpectedVersionIdException;
import io.streamnative.oxia.testcontainers.OxiaContainer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.output.Slf4jLogConsumer;
Expand Down Expand Up @@ -660,4 +658,57 @@ void testSecondaryIndex() throws Exception {
.toList();
assertThat(list).containsExactly("si-a", "si-c");
}

@Test
@SneakyThrows
void testGetIncludeValue() {
@Cleanup
final SyncOxiaClient client = OxiaClientBuilder.create(oxia.getServiceAddress()).syncClient();

final String key = "stream";

final List<String> keys = new ArrayList<>();
PutResult putResult =
client.put(
key,
UUID.randomUUID().toString().getBytes(),
Set.of(PutOption.PartitionKey(key), PutOption.SequenceKeysDeltas(List.of(1L))));
keys.add(putResult.key());
putResult =
client.put(
key,
UUID.randomUUID().toString().getBytes(),
Set.of(PutOption.PartitionKey(key), PutOption.SequenceKeysDeltas(List.of(1L))));
keys.add(putResult.key());

for (String subKey : keys) {
GetResult result =
client.get(subKey, Set.of(GetOption.PartitionKey(key), GetOption.IncludeValue(true)));
Assertions.assertNotNull(result.getValue());

result =
client.get(subKey, Set.of(GetOption.PartitionKey(key), GetOption.IncludeValue(false)));
Assertions.assertEquals(result.getValue().length, 0);
}

var result =
client.get(
keys.get(0),
Set.of(
GetOption.PartitionKey(key),
GetOption.IncludeValue(false),
GetOption.ComparisonHigher));
Assertions.assertEquals(result.getValue().length, 0);
Assertions.assertEquals(result.getKey(), keys.get(1));

result =
client.get(
keys.get(1),
Set.of(
GetOption.PartitionKey(key),
GetOption.IncludeValue(false),
GetOption.ComparisonLower));
Assertions.assertEquals(result.getValue().length, 0);
Assertions.assertEquals(result.getKey(), keys.get(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.streamnative.oxia.client.metrics.Unit;
import io.streamnative.oxia.client.metrics.UpDownCounter;
import io.streamnative.oxia.client.notify.NotificationManager;
import io.streamnative.oxia.client.options.GetOptions;
import io.streamnative.oxia.client.session.SessionManager;
import io.streamnative.oxia.client.shard.ShardManager;
import io.streamnative.oxia.proto.KeyComparisonType;
Expand Down Expand Up @@ -464,13 +465,14 @@ private CompletableFuture<PutResult> internalPut(

@Override
public @NonNull CompletableFuture<GetResult> get(String key, Set<GetOption> options) {
final GetOptions internalOptions = GetOptions.parseFrom(options);
long startTime = System.nanoTime();
gaugePendingGetRequests.increment();
var callback = new CompletableFuture<GetResult>();
try {
checkIfClosed();
Objects.requireNonNull(key);
internalGet(key, options, callback);
internalGet(key, internalOptions, callback);
} catch (RuntimeException e) {
callback.completeExceptionally(e);
}
Expand All @@ -490,26 +492,24 @@ private CompletableFuture<PutResult> internalPut(
});
}

private void internalGet(
String key, Set<GetOption> options, CompletableFuture<GetResult> result) {
KeyComparisonType comparisonType = OptionsUtils.getComparisonType(options);
Optional<String> partitionKey = OptionsUtils.getPartitionKey(options);
if (comparisonType == KeyComparisonType.EQUAL || partitionKey.isPresent()) {
private void internalGet(String key, GetOptions options, CompletableFuture<GetResult> result) {
if (options.comparisonType() == KeyComparisonType.EQUAL || options.partitionKey() != null) {
// Single shard get operation
long shardId = shardManager.getShardForKey(partitionKey.orElse(key));
readBatchManager.getBatcher(shardId).add(new GetOperation(result, key, comparisonType));
long shardId =
shardManager.getShardForKey(Optional.ofNullable(options.partitionKey()).orElse(key));
readBatchManager.getBatcher(shardId).add(new GetOperation(result, key, options));
} else {
internalGetFloorCeiling(key, comparisonType, result);
internalGetFloorCeiling(key, options, result);
}
}

private void internalGetFloorCeiling(
String key, KeyComparisonType comparisonType, CompletableFuture<GetResult> result) {
String key, GetOptions options, CompletableFuture<GetResult> result) {
// We need check on all the shards for a floor/ceiling query
List<CompletableFuture<GetResult>> futures = new ArrayList<>();
for (long shardId : shardManager.allShardIds()) {
CompletableFuture<GetResult> f = new CompletableFuture<>();
readBatchManager.getBatcher(shardId).add(new GetOperation(f, key, comparisonType));
readBatchManager.getBatcher(shardId).add(new GetOperation(f, key, options));
futures.add(f);
}

Expand All @@ -535,7 +535,7 @@ private void internalGetFloorCeiling(
}

GetResult gr =
switch (comparisonType) {
switch (options.comparisonType()) {
case EQUAL,
UNRECOGNIZED -> null; // This would be handled withing context of single
// shard
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,13 +31,13 @@
import io.streamnative.oxia.client.api.exceptions.KeyAlreadyExistsException;
import io.streamnative.oxia.client.api.exceptions.SessionDoesNotExistException;
import io.streamnative.oxia.client.api.exceptions.UnexpectedVersionIdException;
import io.streamnative.oxia.client.options.GetOptions;
import io.streamnative.oxia.proto.DeleteRangeRequest;
import io.streamnative.oxia.proto.DeleteRangeResponse;
import io.streamnative.oxia.proto.DeleteRequest;
import io.streamnative.oxia.proto.DeleteResponse;
import io.streamnative.oxia.proto.GetRequest;
import io.streamnative.oxia.proto.GetResponse;
import io.streamnative.oxia.proto.KeyComparisonType;
import io.streamnative.oxia.proto.PutRequest;
import io.streamnative.oxia.proto.PutResponse;
import io.streamnative.oxia.proto.Status;
Expand All @@ -61,13 +61,13 @@ sealed interface ReadOperation<R> extends Operation<R> permits GetOperation {
record GetOperation(
@NonNull CompletableFuture<GetResult> callback,
@NonNull String key,
KeyComparisonType comparisonType)
@NonNull GetOptions options)
implements ReadOperation<GetResult> {
GetRequest toProto() {
return GetRequest.newBuilder()
.setKey(key)
.setComparisonType(comparisonType)
.setIncludeValue(true)
.setComparisonType(options.comparisonType())
.setIncludeValue(options.includeValue())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oxia.client.options;

import io.streamnative.oxia.client.api.GetOption;
import io.streamnative.oxia.client.api.OptionComparisonType;
import io.streamnative.oxia.client.api.OptionIncludeValue;
import io.streamnative.oxia.client.api.OptionPartitionKey;
import io.streamnative.oxia.proto.KeyComparisonType;
import java.util.Set;

public record GetOptions(
String partitionKey, boolean includeValue, KeyComparisonType comparisonType) {

public static GetOptions parseFrom(Set<GetOption> options) {
boolean includeValue = true;
KeyComparisonType comparisonType = KeyComparisonType.EQUAL;
String partitionKey = null;
for (GetOption option : options) {
if (option instanceof OptionIncludeValue) {
includeValue = ((OptionIncludeValue) option).includeValue();
continue;
}
if (option instanceof OptionComparisonType) {
comparisonType =
switch (((OptionComparisonType) option).comparisonType()) {
case Floor -> KeyComparisonType.FLOOR;
case Lower -> KeyComparisonType.LOWER;
case Higher -> KeyComparisonType.HIGHER;
case Ceiling -> KeyComparisonType.CEILING;
default -> KeyComparisonType.EQUAL;
};
continue;
}
if (option instanceof OptionPartitionKey) {
partitionKey = ((OptionPartitionKey) option).partitionKey();
continue;
}
}
return new GetOptions(partitionKey, includeValue, comparisonType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.streamnative.oxia.client.batch.Operation.WriteOperation.PutOperation;
import io.streamnative.oxia.client.grpc.*;
import io.streamnative.oxia.client.metrics.InstrumentProvider;
import io.streamnative.oxia.client.options.GetOptions;
import io.streamnative.oxia.client.session.Session;
import io.streamnative.oxia.client.session.SessionManager;
import io.streamnative.oxia.client.shard.NoShardAvailableException;
Expand Down Expand Up @@ -393,7 +394,8 @@ public void shardId() {
class ReadBatchTests {
ReadBatch batch;
CompletableFuture<GetResult> getCallable = new CompletableFuture<>();
GetOperation get = new GetOperation(getCallable, "", KeyComparisonType.EQUAL);
GetOperation get =
new GetOperation(getCallable, "", new GetOptions(null, true, KeyComparisonType.EQUAL));

@BeforeEach
void setup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.streamnative.oxia.client.api.GetResult;
import io.streamnative.oxia.client.api.PutResult;
import io.streamnative.oxia.client.batch.Operation.ReadOperation.GetOperation;
import io.streamnative.oxia.client.options.GetOptions;
import io.streamnative.oxia.client.util.BatchedArrayBlockingQueue;
import io.streamnative.oxia.proto.KeyComparisonType;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -87,7 +88,8 @@ void teardown() {
@Test
void createBatchAndAdd() throws Exception {
var callback = new CompletableFuture<GetResult>();
Operation<?> op = new GetOperation(callback, "key", KeyComparisonType.EQUAL);
Operation<?> op =
new GetOperation(callback, "key", new GetOptions(null, true, KeyComparisonType.EQUAL));
when(batchFactory.getBatch(shardId)).thenReturn(batch);
when(batch.size()).thenReturn(1);
when(batch.canAdd(any())).thenReturn(true);
Expand All @@ -99,7 +101,8 @@ void createBatchAndAdd() throws Exception {
@Test
void sendBatchOnFull() throws Exception {
var callback = new CompletableFuture<GetResult>();
Operation<?> op = new GetOperation(callback, "key", KeyComparisonType.EQUAL);
Operation<?> op =
new GetOperation(callback, "key", new GetOptions(null, true, KeyComparisonType.EQUAL));
when(batchFactory.getBatch(shardId)).thenReturn(batch);
when(batch.size()).thenReturn(config.maxRequestsPerBatch());
when(batch.canAdd(any())).thenReturn(true);
Expand Down Expand Up @@ -136,7 +139,8 @@ void addWhenNextDoesNotFit() {
@Test
void sendBatchOnFullThenNewBatch() throws Exception {
var callback = new CompletableFuture<GetResult>();
Operation<?> op = new GetOperation(callback, "key", KeyComparisonType.EQUAL);
Operation<?> op =
new GetOperation(callback, "key", new GetOptions(null, true, KeyComparisonType.EQUAL));
when(batchFactory.getBatch(shardId)).thenReturn(batch);
when(batch.size()).thenReturn(config.maxRequestsPerBatch(), 1);
when(batch.canAdd(any())).thenReturn(true);
Expand All @@ -154,7 +158,8 @@ void sendBatchOnFullThenNewBatch() throws Exception {
@Test
void sendBatchOnLingerExpiration() throws Exception {
var callback = new CompletableFuture<GetResult>();
Operation<?> op = new GetOperation(callback, "key", KeyComparisonType.EQUAL);
Operation<?> op =
new GetOperation(callback, "key", new GetOptions(null, true, KeyComparisonType.EQUAL));
when(batchFactory.getBatch(shardId)).thenReturn(batch);
when(batch.size()).thenReturn(1);
when(batch.canAdd(any())).thenReturn(true);
Expand All @@ -167,7 +172,8 @@ void sendBatchOnLingerExpiration() throws Exception {
@Test
void sendBatchOnLingerExpirationMulti() throws Exception {
var callback = new CompletableFuture<GetResult>();
Operation<?> op = new GetOperation(callback, "key", KeyComparisonType.EQUAL);
Operation<?> op =
new GetOperation(callback, "key", new GetOptions(null, true, KeyComparisonType.EQUAL));
when(batchFactory.getBatch(shardId)).thenReturn(batch);
when(batch.size()).thenReturn(1);
when(batch.canAdd(any())).thenReturn(true);
Expand All @@ -183,7 +189,8 @@ void sendBatchOnLingerExpirationMulti() throws Exception {
@Test
void unboundedTakeAtStart() throws Exception {
var callback = new CompletableFuture<GetResult>();
Operation<?> op = new GetOperation(callback, "key", KeyComparisonType.EQUAL);
Operation<?> op =
new GetOperation(callback, "key", new GetOptions(null, true, KeyComparisonType.EQUAL));

when(batchFactory.getBatch(shardId)).thenReturn(batch);
when(batch.size()).thenReturn(1);
Expand Down
Loading