From f3f6a316a62dc7c1effcdef9279e93f8cad71b1b Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 2 May 2024 11:35:23 -0700 Subject: [PATCH] Refactored Put & Delete options (#144) --- .../oxia/client/api/AsyncOxiaClient.java | 30 ++++- .../oxia/client/api/DeleteOption.java | 47 +------- .../oxia/client/api/OptionEphemeral.java | 18 +++ .../oxia/client/api/OptionVersionId.java | 37 ++++++ .../oxia/client/api/PutOption.java | 61 +--------- .../oxia/client/api/SyncOxiaClient.java | 23 +++- .../oxia/client/it/OxiaClientIT.java | 26 ++-- .../oxia/client/AsyncOxiaClientImpl.java | 28 +++-- .../oxia/client/CachingAsyncOxiaClient.java | 16 ++- .../oxia/client/DeleteOptionsUtil.java | 45 +++---- .../oxia/client/PutOptionsUtil.java | 69 ++++++----- .../oxia/client/SyncOxiaClientImpl.java | 17 ++- .../oxia/client/batch/Operation.java | 22 ++-- .../oxia/client/AsyncOxiaClientImplTest.java | 12 +- .../client/CachingAsyncOxiaClientTest.java | 9 +- .../oxia/client/api/DeleteOptionTest.java | 66 ++++------ .../oxia/client/api/PutOptionTest.java | 113 ++++++------------ .../oxia/client/batch/BatchTest.java | 7 +- .../oxia/client/batch/BatcherTest.java | 4 +- .../oxia/client/batch/OperationTest.java | 31 ++--- .../OxiaMetadataStore.java | 47 ++++---- 21 files changed, 359 insertions(+), 369 deletions(-) create mode 100644 client-api/src/main/java/io/streamnative/oxia/client/api/OptionEphemeral.java create mode 100644 client-api/src/main/java/io/streamnative/oxia/client/api/OptionVersionId.java diff --git a/client-api/src/main/java/io/streamnative/oxia/client/api/AsyncOxiaClient.java b/client-api/src/main/java/io/streamnative/oxia/client/api/AsyncOxiaClient.java index ecd1455e..5b608aee 100644 --- a/client-api/src/main/java/io/streamnative/oxia/client/api/AsyncOxiaClient.java +++ b/client-api/src/main/java/io/streamnative/oxia/client/api/AsyncOxiaClient.java @@ -17,6 +17,7 @@ import io.streamnative.oxia.client.api.exceptions.UnexpectedVersionIdException; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import lombok.NonNull; @@ -29,7 +30,7 @@ public interface AsyncOxiaClient extends AutoCloseable { * specified, at the instant when the put is applied. The put will not be applied if the server's * versionId of the record does not match the expectation set in the call. If you wish the put to * succeed only if the key does not already exist on the server, then pass the {@link - * Version#KeyNotExists} value. + * PutOption#IfRecordDoesNotExist} value. * * @param key The key with which the value should be associated. * @param value The value to associate with the key. @@ -40,7 +41,20 @@ public interface AsyncOxiaClient extends AutoCloseable { * the call. */ @NonNull - CompletableFuture put(String key, byte[] value, PutOption... options); + CompletableFuture put(String key, byte[] value, Set options); + + /** + * Conditionally associates a value with a key if the server's versionId of the record is as + * specified, at the instant when the put is applied. The put will not be applied if the server's + * versionId of the record does not match the expectation set in the call. + * + * @param key The key with which the value should be associated. + * @param value The value to associate with the key. + * @return The result of the put at the specified key. Supplied via a future that returns the + * {@link PutResult}. + */ + @NonNull + CompletableFuture put(String key, byte[] value); /** * Conditionally deletes the record associated with the key if the record exists, and the server's @@ -56,7 +70,17 @@ public interface AsyncOxiaClient extends AutoCloseable { * versionId at the server did not that match supplied in the call. */ @NonNull - CompletableFuture delete(String key, DeleteOption... options); + CompletableFuture delete(String key, Set options); + + /** + * Unconditionally deletes the record associated with the key if the record exists. + * + * @param key Deletes the record with the specified key. + * @return A future that completes when the delete call has returned. The future can return a flag + * that will be true if the key was actually present on the server, false otherwise. + */ + @NonNull + CompletableFuture delete(String key); /** * Deletes any records with keys within the specified range. For more information on how keys are diff --git a/client-api/src/main/java/io/streamnative/oxia/client/api/DeleteOption.java b/client-api/src/main/java/io/streamnative/oxia/client/api/DeleteOption.java index 25611a96..2da3108f 100644 --- a/client-api/src/main/java/io/streamnative/oxia/client/api/DeleteOption.java +++ b/client-api/src/main/java/io/streamnative/oxia/client/api/DeleteOption.java @@ -15,50 +15,9 @@ */ package io.streamnative.oxia.client.api; -import static io.streamnative.oxia.client.api.DeleteOption.VersionIdDeleteOption; -import static io.streamnative.oxia.client.api.DeleteOption.VersionIdDeleteOption.IfVersionIdEquals; -import static io.streamnative.oxia.client.api.DeleteOption.VersionIdDeleteOption.Unconditionally; +public sealed interface DeleteOption permits OptionVersionId { -public sealed interface DeleteOption permits VersionIdDeleteOption { - - default boolean cannotCoExistWith(DeleteOption option) { - return false; - } - - sealed interface VersionIdDeleteOption extends DeleteOption - permits IfVersionIdEquals, Unconditionally { - - Long toVersionId(); - - default boolean cannotCoExistWith(DeleteOption option) { - return option instanceof VersionIdDeleteOption; - } - - record IfVersionIdEquals(long versionId) implements VersionIdDeleteOption { - - public IfVersionIdEquals { - if (versionId < 0) { - throw new IllegalArgumentException("versionId cannot be less than 0 - was: " + versionId); - } - } - - @Override - public Long toVersionId() { - return versionId(); - } - } - - record Unconditionally() implements VersionIdDeleteOption { - @Override - public Long toVersionId() { - return null; - } - } - } - - VersionIdDeleteOption Unconditionally = new VersionIdDeleteOption.Unconditionally(); - - static VersionIdDeleteOption ifVersionIdEquals(long versionId) { - return new IfVersionIdEquals(versionId); + static DeleteOption IfVersionIdEquals(long versionId) { + return new OptionVersionId.OptionVersionIdEqual(versionId); } } diff --git a/client-api/src/main/java/io/streamnative/oxia/client/api/OptionEphemeral.java b/client-api/src/main/java/io/streamnative/oxia/client/api/OptionEphemeral.java new file mode 100644 index 00000000..e0b52eb7 --- /dev/null +++ b/client-api/src/main/java/io/streamnative/oxia/client/api/OptionEphemeral.java @@ -0,0 +1,18 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.api; + +public record OptionEphemeral() implements PutOption {} diff --git a/client-api/src/main/java/io/streamnative/oxia/client/api/OptionVersionId.java b/client-api/src/main/java/io/streamnative/oxia/client/api/OptionVersionId.java new file mode 100644 index 00000000..5f54118d --- /dev/null +++ b/client-api/src/main/java/io/streamnative/oxia/client/api/OptionVersionId.java @@ -0,0 +1,37 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.api; + +public sealed interface OptionVersionId extends PutOption, DeleteOption + permits OptionVersionId.OptionRecordDoesNotExist, OptionVersionId.OptionVersionIdEqual { + + long versionId(); + + record OptionVersionIdEqual(long versionId) implements OptionVersionId { + public OptionVersionIdEqual { + if (versionId < 0) { + throw new IllegalArgumentException("versionId cannot be less than 0 - was: " + versionId); + } + } + } + + record OptionRecordDoesNotExist() implements OptionVersionId { + @Override + public long versionId() { + return Version.KeyNotExists; + } + } +} diff --git a/client-api/src/main/java/io/streamnative/oxia/client/api/PutOption.java b/client-api/src/main/java/io/streamnative/oxia/client/api/PutOption.java index ffea7f0f..0ea7a867 100644 --- a/client-api/src/main/java/io/streamnative/oxia/client/api/PutOption.java +++ b/client-api/src/main/java/io/streamnative/oxia/client/api/PutOption.java @@ -15,63 +15,12 @@ */ package io.streamnative.oxia.client.api; -import static io.streamnative.oxia.client.api.PutOption.AsEphemeralRecord; -import static io.streamnative.oxia.client.api.PutOption.VersionIdPutOption; -import static io.streamnative.oxia.client.api.PutOption.VersionIdPutOption.IfRecordDoesNotExist; -import static io.streamnative.oxia.client.api.PutOption.VersionIdPutOption.IfVersionIdEquals; -import static io.streamnative.oxia.client.api.PutOption.VersionIdPutOption.Unconditionally; +public sealed interface PutOption permits OptionEphemeral, OptionVersionId { -public sealed interface PutOption permits VersionIdPutOption, AsEphemeralRecord { + PutOption IfRecordDoesNotExist = new OptionVersionId.OptionRecordDoesNotExist(); + PutOption AsEphemeralRecord = new OptionEphemeral(); - default boolean cannotCoExistWith(PutOption option) { - return false; - } - - sealed interface VersionIdPutOption extends PutOption - permits IfVersionIdEquals, IfRecordDoesNotExist, Unconditionally { - - Long toVersionId(); - - default boolean cannotCoExistWith(PutOption option) { - return option instanceof VersionIdPutOption; - } - - record IfVersionIdEquals(long versionId) implements VersionIdPutOption { - - public IfVersionIdEquals { - if (versionId < 0) { - throw new IllegalArgumentException("versionId cannot be less than 0 - was: " + versionId); - } - } - - @Override - public Long toVersionId() { - return versionId(); - } - } - - record IfRecordDoesNotExist() implements VersionIdPutOption { - @Override - public Long toVersionId() { - return Version.KeyNotExists; - } - } - - record Unconditionally() implements VersionIdPutOption { - @Override - public Long toVersionId() { - return null; - } - } - } - - record AsEphemeralRecord() implements PutOption {} - - VersionIdPutOption IfRecordDoesNotExist = new VersionIdPutOption.IfRecordDoesNotExist(); - VersionIdPutOption Unconditionally = new VersionIdPutOption.Unconditionally(); - PutOption AsEphemeralRecord = new AsEphemeralRecord(); - - static VersionIdPutOption ifVersionIdEquals(long versionId) { - return new IfVersionIdEquals(versionId); + static PutOption IfVersionIdEquals(long versionId) { + return new OptionVersionId.OptionVersionIdEqual(versionId); } } diff --git a/client-api/src/main/java/io/streamnative/oxia/client/api/SyncOxiaClient.java b/client-api/src/main/java/io/streamnative/oxia/client/api/SyncOxiaClient.java index 1da4c0ae..d89df28d 100644 --- a/client-api/src/main/java/io/streamnative/oxia/client/api/SyncOxiaClient.java +++ b/client-api/src/main/java/io/streamnative/oxia/client/api/SyncOxiaClient.java @@ -17,12 +17,23 @@ import io.streamnative.oxia.client.api.exceptions.UnexpectedVersionIdException; import java.util.List; +import java.util.Set; import java.util.function.Consumer; import lombok.NonNull; /** Synchronous client for the Oxia service. */ public interface SyncOxiaClient extends AutoCloseable { + /** + * Associates a value with a key if the server's versionId of the record is as specified, at the + * instant when the put is applied. + * + * @param key The key with which the value should be associated. + * @param value The value to associate with the key. + * @return The result of the put at the specified key. + */ + PutResult put(@NonNull String key, byte @NonNull [] value); + /** * Conditionally associates a value with a key if the server's versionId of the record is as * specified, at the instant when the put is applied. The put will not be applied if the server's @@ -37,9 +48,17 @@ public interface SyncOxiaClient extends AutoCloseable { * @throws UnexpectedVersionIdException The versionId at the server did not that match supplied in * the call. */ - PutResult put(@NonNull String key, byte @NonNull [] value, PutOption... options) + PutResult put(@NonNull String key, byte @NonNull [] value, Set options) throws UnexpectedVersionIdException; + /** + * Unconditionally deletes the record associated with the key if the record exists. + * + * @param key Deletes the record with the specified key. + * @return True if the key was actually present on the server, false otherwise. + */ + boolean delete(@NonNull String key); + /** * Conditionally deletes the record associated with the key if the record exists, and the server's * versionId of the record is as specified, at the instant when the delete is applied. The delete @@ -52,7 +71,7 @@ PutResult put(@NonNull String key, byte @NonNull [] value, PutOption... options) * @throws UnexpectedVersionIdException The versionId at the server did not that match supplied in * the call. */ - boolean delete(@NonNull String key, @NonNull DeleteOption... options) + boolean delete(@NonNull String key, @NonNull Set options) throws UnexpectedVersionIdException; /** diff --git a/client-it/src/test/java/io/streamnative/oxia/client/it/OxiaClientIT.java b/client-it/src/test/java/io/streamnative/oxia/client/it/OxiaClientIT.java index bb5372c3..0b839e21 100644 --- a/client-it/src/test/java/io/streamnative/oxia/client/it/OxiaClientIT.java +++ b/client-it/src/test/java/io/streamnative/oxia/client/it/OxiaClientIT.java @@ -16,7 +16,7 @@ package io.streamnative.oxia.client.it; import static io.streamnative.oxia.client.api.PutOption.IfRecordDoesNotExist; -import static io.streamnative.oxia.client.api.PutOption.ifVersionIdEquals; +import static io.streamnative.oxia.client.api.PutOption.IfVersionIdEquals; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.CompletableFuture.allOf; import static java.util.function.Function.identity; @@ -46,6 +46,7 @@ import io.streamnative.oxia.testcontainers.OxiaContainer; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.AfterAll; @@ -102,13 +103,14 @@ static void afterAll() throws Exception { @Test void test() throws Exception { - var a = client.put("a", "a".getBytes(UTF_8), IfRecordDoesNotExist); - var b = client.put("b", "b".getBytes(UTF_8), IfRecordDoesNotExist); + var a = client.put("a", "a".getBytes(UTF_8), Set.of(IfRecordDoesNotExist)); + var b = client.put("b", "b".getBytes(UTF_8), Set.of(IfRecordDoesNotExist)); var c = client.put("c", "c".getBytes(UTF_8)); var d = client.put("d", "d".getBytes(UTF_8)); allOf(a, b, c, d).join(); - assertThatThrownBy(() -> client.put("a", "a".getBytes(UTF_8), IfRecordDoesNotExist).join()) + assertThatThrownBy( + () -> client.put("a", "a".getBytes(UTF_8), Set.of(IfRecordDoesNotExist)).join()) .hasCauseInstanceOf(KeyAlreadyExistsException.class); // verify 'a' is present var getResult = client.get("a").join(); @@ -122,7 +124,7 @@ void test() throws Exception { () -> assertThat(notifications).contains(new KeyCreated("a", finalAVersion))); // update 'a' with expected version - client.put("a", "a2".getBytes(UTF_8), ifVersionIdEquals(aVersion)).join(); + client.put("a", "a2".getBytes(UTF_8), Set.of(IfVersionIdEquals(aVersion))).join(); getResult = client.get("a").join(); assertThat(getResult.getValue()).isEqualTo("a2".getBytes(UTF_8)); aVersion = getResult.getVersion().versionId(); @@ -136,13 +138,16 @@ void test() throws Exception { // put with unexpected version var bVersion = client.get("b").join().getVersion().versionId(); assertThatThrownBy( - () -> client.put("b", "b2".getBytes(UTF_8), ifVersionIdEquals(bVersion + 1L)).join()) + () -> + client + .put("b", "b2".getBytes(UTF_8), Set.of(IfVersionIdEquals(bVersion + 1L))) + .join()) .hasCauseInstanceOf(UnexpectedVersionIdException.class); // delete with unexpected version var cVersion = client.get("c").join().getVersion().versionId(); assertThatThrownBy( - () -> client.delete("c", DeleteOption.ifVersionIdEquals(cVersion + 1L)).join()) + () -> client.delete("c", Set.of(DeleteOption.IfVersionIdEquals(cVersion + 1L))).join()) .hasCauseInstanceOf(UnexpectedVersionIdException.class); // list all keys @@ -150,7 +155,7 @@ void test() throws Exception { assertThat(listResult).containsOnly("a", "b", "c", "d"); // delete 'a' with expected version - client.delete("a", DeleteOption.ifVersionIdEquals(aVersion)).join(); + client.delete("a", Set.of(DeleteOption.IfVersionIdEquals(aVersion))).join(); getResult = client.get("a").join(); assertThat(getResult).isNull(); @@ -178,13 +183,14 @@ void test() throws Exception { .clientIdentifier(identity) .asyncClient() .join()) { - otherClient.put("f", "f".getBytes(), PutOption.AsEphemeralRecord).join(); + otherClient.put("f", "f".getBytes(), Set.of(PutOption.AsEphemeralRecord)).join(); getResult = client.get("f").join(); var sessionId = getResult.getVersion().sessionId().get(); assertThat(sessionId).isNotNull(); assertThat(getResult.getVersion().clientIdentifier().get()).isEqualTo(identity); - var putResult = otherClient.put("g", "g".getBytes(), PutOption.AsEphemeralRecord).join(); + var putResult = + otherClient.put("g", "g".getBytes(), Set.of(PutOption.AsEphemeralRecord)).join(); assertThat(putResult.version().clientIdentifier().get()).isEqualTo(identity); assertThat(putResult.version().sessionId().get()).isNotNull(); diff --git a/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java b/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java index 9fd4327f..cff59bcc 100644 --- a/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java +++ b/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java @@ -41,8 +41,11 @@ import io.streamnative.oxia.client.shard.ShardManager; import io.streamnative.oxia.proto.ListRequest; import io.streamnative.oxia.proto.ListResponse; +import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.OptionalLong; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Function; @@ -217,7 +220,13 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient { } @Override - public @NonNull CompletableFuture put(String key, byte[] value, PutOption... options) { + public @NonNull CompletableFuture put(String key, byte[] value) { + return put(key, value, Collections.emptySet()); + } + + @Override + public @NonNull CompletableFuture put( + String key, byte[] value, Set options) { long startTime = System.nanoTime(); var callback = new CompletableFuture(); @@ -229,12 +238,10 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient { gaugePendingPutRequests.increment(); gaugePendingPutBytes.add(value.length); - var validatedOptions = PutOptionsUtil.validate(options); var shardId = shardManager.get(key); - var versionId = PutOptionsUtil.toVersionId(validatedOptions); + var versionId = PutOptionsUtil.getVersionId(options); var op = - new PutOperation( - callback, key, value, versionId, PutOptionsUtil.toEphemeral(validatedOptions)); + new PutOperation(callback, key, value, versionId, PutOptionsUtil.isEphemeral(options)); writeBatchManager.getBatcher(shardId).add(op); } catch (RuntimeException e) { callback.completeExceptionally(e); @@ -254,7 +261,12 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient { } @Override - public @NonNull CompletableFuture delete(String key, DeleteOption... options) { + public @NonNull CompletableFuture delete(String key) { + return delete(key, Collections.emptySet()); + } + + @Override + public @NonNull CompletableFuture delete(String key, Set options) { long startTime = System.nanoTime(); gaugePendingDeleteRequests.increment(); @@ -263,9 +275,9 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient { try { checkIfClosed(); Objects.requireNonNull(key); - var validatedOptions = DeleteOptionsUtil.validate(options); + + OptionalLong versionId = DeleteOptionsUtil.getVersionId(options); var shardId = shardManager.get(key); - var versionId = DeleteOptionsUtil.toVersionId(validatedOptions); writeBatchManager.getBatcher(shardId).add(new DeleteOperation(callback, key, versionId)); } catch (RuntimeException e) { callback.completeExceptionally(e); diff --git a/client/src/main/java/io/streamnative/oxia/client/CachingAsyncOxiaClient.java b/client/src/main/java/io/streamnative/oxia/client/CachingAsyncOxiaClient.java index 8c371c37..583dee63 100644 --- a/client/src/main/java/io/streamnative/oxia/client/CachingAsyncOxiaClient.java +++ b/client/src/main/java/io/streamnative/oxia/client/CachingAsyncOxiaClient.java @@ -25,7 +25,9 @@ import io.streamnative.oxia.client.api.Notification; import io.streamnative.oxia.client.api.PutOption; import io.streamnative.oxia.client.api.PutResult; +import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Supplier; @@ -48,16 +50,26 @@ class CachingAsyncOxiaClient implements AsyncOxiaClient { delegate.notifications(n -> recordCache.synchronous().invalidate(n.key())); } + @Override + public @NonNull CompletableFuture put(@NonNull String key, byte @NonNull [] value) { + return put(key, value, Collections.emptySet()); + } + @Override public @NonNull CompletableFuture put( - @NonNull String key, byte @NonNull [] value, @NonNull PutOption... options) { + @NonNull String key, byte @NonNull [] value, @NonNull Set options) { recordCache.synchronous().invalidate(key); return delegate.put(key, value, options); } + @Override + public @NonNull CompletableFuture delete(@NonNull String key) { + return delete(key, Collections.emptySet()); + } + @Override public @NonNull CompletableFuture delete( - @NonNull String key, @NonNull DeleteOption... options) { + @NonNull String key, @NonNull Set options) { recordCache.synchronous().invalidate(key); return delegate.delete(key, options); } diff --git a/client/src/main/java/io/streamnative/oxia/client/DeleteOptionsUtil.java b/client/src/main/java/io/streamnative/oxia/client/DeleteOptionsUtil.java index c598ccb4..f582f227 100644 --- a/client/src/main/java/io/streamnative/oxia/client/DeleteOptionsUtil.java +++ b/client/src/main/java/io/streamnative/oxia/client/DeleteOptionsUtil.java @@ -17,45 +17,28 @@ package io.streamnative.oxia.client; import io.streamnative.oxia.client.api.DeleteOption; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Optional; +import io.streamnative.oxia.client.api.OptionVersionId; +import java.util.OptionalLong; import java.util.Set; import lombok.experimental.UtilityClass; @UtilityClass public class DeleteOptionsUtil { - private static final Set DefaultDeleteOptions = - Collections.singleton(DeleteOption.Unconditionally); - - public static Set validate(DeleteOption... args) { - if (args == null || args.length == 0) { - return DefaultDeleteOptions; + public static OptionalLong getVersionId(Set opts) { + if (opts == null || opts.isEmpty()) { + return OptionalLong.empty(); } - Arrays.stream(args) - .forEach( - a -> { - if (Arrays.stream(args) - .filter(c -> !c.equals(a)) - .anyMatch(c -> a.cannotCoExistWith(c))) { - throw new IllegalArgumentException( - "Incompatible " - + DeleteOption.class.getSimpleName() - + "s: " - + Arrays.toString(args)); - } - }); - return new HashSet<>(Arrays.asList(args)); - } + if (opts.size() > 1) { + throw new IllegalArgumentException("Conflicting delete options"); + } - public static Optional toVersionId(Collection options) { - return options.stream() - .filter(o -> o instanceof DeleteOption.VersionIdDeleteOption) - .findAny() - .map(o -> ((DeleteOption.VersionIdDeleteOption) o).toVersionId()); + DeleteOption delOpt = opts.iterator().next(); + if (delOpt instanceof OptionVersionId o) { + return OptionalLong.of(o.versionId()); + } else { + return OptionalLong.empty(); + } } } diff --git a/client/src/main/java/io/streamnative/oxia/client/PutOptionsUtil.java b/client/src/main/java/io/streamnative/oxia/client/PutOptionsUtil.java index cbf9a8ea..4d6a3ff8 100644 --- a/client/src/main/java/io/streamnative/oxia/client/PutOptionsUtil.java +++ b/client/src/main/java/io/streamnative/oxia/client/PutOptionsUtil.java @@ -16,49 +16,54 @@ package io.streamnative.oxia.client; +import io.streamnative.oxia.client.api.OptionEphemeral; +import io.streamnative.oxia.client.api.OptionVersionId; import io.streamnative.oxia.client.api.PutOption; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Optional; +import io.streamnative.oxia.client.api.Version; +import java.util.OptionalLong; import java.util.Set; import lombok.experimental.UtilityClass; @UtilityClass public class PutOptionsUtil { - private static final Set DefaultPutOptions = - Collections.singleton(PutOption.Unconditionally); + public static OptionalLong getVersionId(Set options) { + if (options == null || options.isEmpty()) { + return OptionalLong.empty(); + } - public static Optional toVersionId(Collection options) { - return options.stream() - .filter(o -> o instanceof PutOption.VersionIdPutOption) - .findAny() - .map(o -> ((PutOption.VersionIdPutOption) o).toVersionId()); - } + boolean alreadyHasVersionId = false; + long versionId = Version.KeyNotExists; + for (PutOption o : options) { + if (o instanceof OptionVersionId e) { + if (alreadyHasVersionId) { + throw new IllegalArgumentException( + "Incompatible " + PutOption.class.getSimpleName() + "s: " + options); + } + + versionId = e.versionId(); + alreadyHasVersionId = true; + } + } - public static boolean toEphemeral(Collection options) { - return options.stream().anyMatch(o -> o instanceof PutOption.AsEphemeralRecord); + if (alreadyHasVersionId) { + return OptionalLong.of(versionId); + } else { + return OptionalLong.empty(); + } } - public static Set validate(PutOption... args) { - if (args == null || args.length == 0) { - return DefaultPutOptions; + public static boolean isEphemeral(Set options) { + if (options.isEmpty()) { + return false; + } + + for (PutOption option : options) { + if (option instanceof OptionEphemeral) { + return true; + } } - Arrays.stream(args) - .forEach( - a -> { - if (Arrays.stream(args) - .filter(c -> !c.equals(a)) - .anyMatch(c -> a.cannotCoExistWith(c))) { - throw new IllegalArgumentException( - "Incompatible " - + PutOption.class.getSimpleName() - + "s: " - + Arrays.toString(args)); - } - }); - return new HashSet<>(Arrays.asList(args)); + + return false; } } diff --git a/client/src/main/java/io/streamnative/oxia/client/SyncOxiaClientImpl.java b/client/src/main/java/io/streamnative/oxia/client/SyncOxiaClientImpl.java index 0faf85be..a8732b28 100644 --- a/client/src/main/java/io/streamnative/oxia/client/SyncOxiaClientImpl.java +++ b/client/src/main/java/io/streamnative/oxia/client/SyncOxiaClientImpl.java @@ -23,7 +23,9 @@ import io.streamnative.oxia.client.api.PutResult; import io.streamnative.oxia.client.api.SyncOxiaClient; import io.streamnative.oxia.client.api.exceptions.UnexpectedVersionIdException; +import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import lombok.AccessLevel; @@ -35,10 +37,15 @@ class SyncOxiaClientImpl implements SyncOxiaClient { private final AsyncOxiaClient asyncClient; + @Override + public PutResult put(@NonNull String key, byte @NonNull [] value) { + return put(key, value, Collections.emptySet()); + } + @SneakyThrows @Override public @NonNull PutResult put( - @NonNull String key, byte @NonNull [] value, @NonNull PutOption... options) { + @NonNull String key, byte @NonNull [] value, @NonNull Set options) { try { return asyncClient.put(key, value, options).get(); } catch (InterruptedException e) { @@ -51,7 +58,13 @@ class SyncOxiaClientImpl implements SyncOxiaClient { @SneakyThrows @Override - public boolean delete(@NonNull String key, @NonNull DeleteOption... options) + public boolean delete(@NonNull String key) { + return delete(key, Collections.emptySet()); + } + + @SneakyThrows + @Override + public boolean delete(@NonNull String key, @NonNull Set options) throws UnexpectedVersionIdException { try { return asyncClient.delete(key, options).get(); diff --git a/client/src/main/java/io/streamnative/oxia/client/batch/Operation.java b/client/src/main/java/io/streamnative/oxia/client/batch/Operation.java index 9e169551..965a9c26 100644 --- a/client/src/main/java/io/streamnative/oxia/client/batch/Operation.java +++ b/client/src/main/java/io/streamnative/oxia/client/batch/Operation.java @@ -42,6 +42,7 @@ import java.util.Arrays; import java.util.Objects; import java.util.Optional; +import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import lombok.NonNull; @@ -76,14 +77,15 @@ record PutOperation( @NonNull CompletableFuture callback, @NonNull String key, byte @NonNull [] value, - @NonNull Optional expectedVersionId, + @NonNull OptionalLong expectedVersionId, boolean ephemeral) implements WriteOperation { public PutOperation { - if (expectedVersionId.isPresent() && expectedVersionId.get() < KeyNotExists) { + if (expectedVersionId.isPresent() && expectedVersionId.getAsLong() < KeyNotExists) { throw new IllegalArgumentException( - "expectedVersionId must be >= -1 (KeyNotExists), was: " + expectedVersionId.get()); + "expectedVersionId must be >= -1 (KeyNotExists), was: " + + expectedVersionId.getAsLong()); } } @@ -106,10 +108,10 @@ void complete(@NonNull PutResponse response) { switch (response.getStatus()) { case SESSION_DOES_NOT_EXIST -> fail(new SessionDoesNotExistException()); case UNEXPECTED_VERSION_ID -> { - if (expectedVersionId.get() == KeyNotExists) { + if (expectedVersionId.getAsLong() == KeyNotExists) { fail(new KeyAlreadyExistsException(key)); } else { - fail(new UnexpectedVersionIdException(key, expectedVersionId.get())); + fail(new UnexpectedVersionIdException(key, expectedVersionId.getAsLong())); } } case OK -> callback.complete(ProtoUtil.getPutResultFromProto(response)); @@ -144,13 +146,13 @@ record SessionInfo(long sessionId, @NonNull String clientIdentifier) {} record DeleteOperation( @NonNull CompletableFuture callback, @NonNull String key, - @NonNull Optional expectedVersionId) + @NonNull OptionalLong expectedVersionId) implements WriteOperation { public DeleteOperation { - if (expectedVersionId.isPresent() && expectedVersionId.get() < 0) { + if (expectedVersionId.isPresent() && expectedVersionId.getAsLong() < 0) { throw new IllegalArgumentException( - "expectedVersionId must be >= 0, was: " + expectedVersionId.get()); + "expectedVersionId must be >= 0, was: " + expectedVersionId.getAsLong()); } } @@ -163,7 +165,7 @@ DeleteRequest toProto() { void complete(@NonNull DeleteResponse response) { switch (response.getStatus()) { case UNEXPECTED_VERSION_ID -> fail( - new UnexpectedVersionIdException(key, expectedVersionId.get())); + new UnexpectedVersionIdException(key, expectedVersionId.getAsLong())); case KEY_NOT_FOUND -> callback.complete(false); case OK -> callback.complete(true); default -> fail(new IllegalStateException("GRPC.Status: " + response.getStatus().name())); @@ -171,7 +173,7 @@ void complete(@NonNull DeleteResponse response) { } public DeleteOperation(@NonNull CompletableFuture callback, @NonNull String key) { - this(callback, key, Optional.empty()); + this(callback, key, OptionalLong.empty()); } } diff --git a/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java b/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java index 01064c45..12373256 100644 --- a/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java @@ -15,7 +15,7 @@ */ package io.streamnative.oxia.client; -import static io.streamnative.oxia.client.api.PutOption.ifVersionIdEquals; +import static io.streamnative.oxia.client.api.PutOption.IfVersionIdEquals; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Optional.empty; import static org.assertj.core.api.Assertions.assertThat; @@ -47,6 +47,7 @@ import io.streamnative.oxia.proto.ReactorOxiaClientGrpc.ReactorOxiaClientStub; import java.time.Duration; import java.util.List; +import java.util.Set; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -146,7 +147,7 @@ void putExpectedVersion() { when(shardManager.get(key)).thenReturn(shardId); when(writeBatchManager.getBatcher(shardId)).thenReturn(batcher); doNothing().when(batcher).add(opCaptor.capture()); - var result = client.put(key, value, ifVersionIdEquals(expectedVersionId)); + var result = client.put(key, value, Set.of(IfVersionIdEquals(expectedVersionId))); assertThat(result).isNotCompleted(); assertThat(opCaptor.getValue()) .satisfies( @@ -161,7 +162,7 @@ void putExpectedVersion() { void putInvalidOptions() { var key = "key"; var value = "hello".getBytes(UTF_8); - var result = client.put(key, value, ifVersionIdEquals(1L), ifVersionIdEquals(2L)); + var result = client.put(key, value, Set.of(IfVersionIdEquals(1L), IfVersionIdEquals(2L))); assertThat(result).isCompletedExceptionally(); } @@ -218,7 +219,7 @@ void deleteExpectedVersion() { when(shardManager.get(key)).thenReturn(shardId); when(writeBatchManager.getBatcher(shardId)).thenReturn(batcher); doNothing().when(batcher).add(opCaptor.capture()); - var result = client.delete(key, DeleteOption.ifVersionIdEquals(expectedVersionId)); + var result = client.delete(key, Set.of(DeleteOption.IfVersionIdEquals(expectedVersionId))); assertThat(result).isNotCompleted(); assertThat(opCaptor.getValue()) .satisfies( @@ -232,7 +233,8 @@ void deleteExpectedVersion() { void deleteInvalidOptions() { var key = "key"; var result = - client.delete(key, DeleteOption.ifVersionIdEquals(1L), DeleteOption.ifVersionIdEquals(2L)); + client.delete( + key, Set.of(DeleteOption.IfVersionIdEquals(1L), DeleteOption.IfVersionIdEquals(2L))); assertThat(result).isCompletedExceptionally(); } diff --git a/client/src/test/java/io/streamnative/oxia/client/CachingAsyncOxiaClientTest.java b/client/src/test/java/io/streamnative/oxia/client/CachingAsyncOxiaClientTest.java index 49ce0652..275abfd0 100644 --- a/client/src/test/java/io/streamnative/oxia/client/CachingAsyncOxiaClientTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/CachingAsyncOxiaClientTest.java @@ -34,6 +34,7 @@ import io.streamnative.oxia.client.api.Notification.KeyModified; import io.streamnative.oxia.client.api.PutResult; import io.streamnative.oxia.client.api.Version; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Optional; @@ -72,22 +73,22 @@ void put() { var value = "value".getBytes(UTF_8); var version = new Version(1L, 2L, 3L, 4L, Optional.empty(), Optional.empty()); var result = CompletableFuture.completedFuture(new PutResult(version)); - when(delegate.put("a", value)).thenReturn(result); + when(delegate.put("a", value, Collections.emptySet())).thenReturn(result); assertThat(client.put("a", value)).isSameAs(result); var inOrder = inOrder(delegate, syncCache); inOrder.verify(syncCache).invalidate("a"); - inOrder.verify(delegate).put("a", value); + inOrder.verify(delegate).put("a", value, Collections.emptySet()); } @Test void delete() { when(cache.synchronous()).thenReturn(syncCache); var result = CompletableFuture.completedFuture(true); - when(delegate.delete("a")).thenReturn(result); + when(delegate.delete("a", Collections.emptySet())).thenReturn(result); assertThat(client.delete("a")).isSameAs(result); var inOrder = inOrder(delegate, syncCache); inOrder.verify(syncCache).invalidate("a"); - inOrder.verify(delegate).delete("a"); + inOrder.verify(delegate).delete("a", Collections.emptySet()); } @Test diff --git a/client/src/test/java/io/streamnative/oxia/client/api/DeleteOptionTest.java b/client/src/test/java/io/streamnative/oxia/client/api/DeleteOptionTest.java index 8d31810d..348f27a2 100644 --- a/client/src/test/java/io/streamnative/oxia/client/api/DeleteOptionTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/api/DeleteOptionTest.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import io.streamnative.oxia.client.DeleteOptionsUtil; +import java.util.Collections; import java.util.Set; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; @@ -32,66 +33,39 @@ class DeleteOptionTest { class IfVersionIdEqualsTests { @Test void ifVersionIdEquals() { - assertThat(DeleteOption.ifVersionIdEquals(1L)) + assertThat(DeleteOption.IfVersionIdEquals(1L)) .satisfies( o -> { - assertThat(o) - .isInstanceOf(DeleteOption.VersionIdDeleteOption.IfVersionIdEquals.class); - assertThat(o.toVersionId()).isEqualTo(1L); - assertThat(o.cannotCoExistWith(DeleteOption.Unconditionally)).isTrue(); + assertThat(o).isInstanceOf(OptionVersionId.OptionVersionIdEqual.class); + + if (o instanceof OptionVersionId.OptionVersionIdEqual e) { + assertThat(e.versionId()).isEqualTo(1L); + } }); } @Test void versionIdLessThanZero() { - assertThatNoException().isThrownBy(() -> DeleteOption.ifVersionIdEquals(0L)); - assertThatThrownBy(() -> DeleteOption.ifVersionIdEquals(-1L)) + assertThatNoException().isThrownBy(() -> DeleteOption.IfVersionIdEquals(0L)); + assertThatThrownBy(() -> DeleteOption.IfVersionIdEquals(-1L)) .isInstanceOf(IllegalArgumentException.class); } } - @Nested - @DisplayName("Unconditionally tests") - class UnconditionallyTests { - @Test - void unconditionally() { - assertThat(DeleteOption.Unconditionally) - .satisfies( - o -> { - assertThat(o) - .isInstanceOf(DeleteOption.VersionIdDeleteOption.Unconditionally.class); - assertThat(o.toVersionId()).isNull(); - assertThat(o.cannotCoExistWith(DeleteOption.ifVersionIdEquals(1L))).isTrue(); - }); - } - } - - @Test - void validate() { - assertThat( - DeleteOptionsUtil.validate( - DeleteOption.ifVersionIdEquals(1L), DeleteOption.ifVersionIdEquals(1L))) - .containsOnly(DeleteOption.ifVersionIdEquals(1L)); - } - @Test - void validateEmpty() { - assertThat(DeleteOptionsUtil.validate()).containsOnly(DeleteOption.Unconditionally); - } - - @Test - void validateFail() { + void toVersionId() { + assertThat(DeleteOptionsUtil.getVersionId(Collections.emptySet())).isEmpty(); + assertThat(DeleteOptionsUtil.getVersionId(Set.of(DeleteOption.IfVersionIdEquals(1L)))) + .hasValue(1L); assertThatThrownBy( () -> - DeleteOptionsUtil.validate( - DeleteOption.Unconditionally, DeleteOption.ifVersionIdEquals(1L))) + DeleteOptionsUtil.getVersionId( + Set.of(DeleteOption.IfVersionIdEquals(1L), DeleteOption.IfVersionIdEquals(1L)))) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy( + () -> + DeleteOptionsUtil.getVersionId( + Set.of(DeleteOption.IfVersionIdEquals(1L), DeleteOption.IfVersionIdEquals(2L)))) .isInstanceOf(IllegalArgumentException.class); - } - - @Test - void toVersionId() { - assertThat(DeleteOptionsUtil.toVersionId(Set.of(DeleteOption.Unconditionally))).isEmpty(); - assertThat(DeleteOptionsUtil.toVersionId(Set.of(DeleteOption.ifVersionIdEquals(1L)))) - .hasValue(1L); } } diff --git a/client/src/test/java/io/streamnative/oxia/client/api/PutOptionTest.java b/client/src/test/java/io/streamnative/oxia/client/api/PutOptionTest.java index 8c0ffe71..1a3c9e07 100644 --- a/client/src/test/java/io/streamnative/oxia/client/api/PutOptionTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/api/PutOptionTest.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import io.streamnative.oxia.client.PutOptionsUtil; +import java.util.Collections; import java.util.Set; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; @@ -32,21 +33,21 @@ class PutOptionTest { class IfVersionIdEqualsTests { @Test void ifVersionIdEquals() { - assertThat(PutOption.ifVersionIdEquals(1L)) + assertThat(PutOption.IfVersionIdEquals(1L)) .satisfies( o -> { - assertThat(o).isInstanceOf(PutOption.VersionIdPutOption.IfVersionIdEquals.class); - assertThat(o.toVersionId()).isEqualTo(1L); - assertThat(o.cannotCoExistWith(PutOption.Unconditionally)).isTrue(); - assertThat(o.cannotCoExistWith(PutOption.IfRecordDoesNotExist)).isTrue(); - assertThat(o.cannotCoExistWith(PutOption.AsEphemeralRecord)).isFalse(); + assertThat(o).isInstanceOf(OptionVersionId.OptionVersionIdEqual.class); + + if (o instanceof OptionVersionId e) { + assertThat(e.versionId()).isEqualTo(1); + } }); } @Test void versionIdLessThanZero() { - assertThatNoException().isThrownBy(() -> PutOption.ifVersionIdEquals(0L)); - assertThatThrownBy(() -> PutOption.ifVersionIdEquals(-1L)) + assertThatNoException().isThrownBy(() -> PutOption.IfVersionIdEquals(0L)); + assertThatThrownBy(() -> PutOption.IfVersionIdEquals(-1L)) .isInstanceOf(IllegalArgumentException.class); } } @@ -59,92 +60,54 @@ void ifRecordDoesNotExist() { assertThat(PutOption.IfRecordDoesNotExist) .satisfies( o -> { - assertThat(o).isInstanceOf(PutOption.VersionIdPutOption.IfRecordDoesNotExist.class); - assertThat(o.toVersionId()).isEqualTo(Version.KeyNotExists); - assertThat(o.cannotCoExistWith(PutOption.Unconditionally)).isTrue(); - assertThat(o.cannotCoExistWith(PutOption.ifVersionIdEquals(1L))).isTrue(); - assertThat(o.cannotCoExistWith(PutOption.AsEphemeralRecord)).isFalse(); - }); - } - } + assertThat(o).isInstanceOf(OptionVersionId.OptionRecordDoesNotExist.class); - @Nested - @DisplayName("Unconditionally tests") - class UnconditionallyTests { - @Test - void unconditionally() { - assertThat(PutOption.Unconditionally) - .satisfies( - o -> { - assertThat(o).isInstanceOf(PutOption.VersionIdPutOption.Unconditionally.class); - assertThat(o.toVersionId()).isNull(); - assertThat(o.cannotCoExistWith(PutOption.IfRecordDoesNotExist)).isTrue(); - assertThat(o.cannotCoExistWith(PutOption.ifVersionIdEquals(1L))).isTrue(); - assertThat(o.cannotCoExistWith(PutOption.AsEphemeralRecord)).isFalse(); - }); - } - } - - @Nested - @DisplayName("AsEphemeralRecord tests") - class AsEphemeralRecordTests { - @Test - void asEphemeral() { - assertThat(PutOption.AsEphemeralRecord) - .satisfies( - o -> { - assertThat(o).isInstanceOf(PutOption.AsEphemeralRecord.class); - assertThat(o.cannotCoExistWith(PutOption.IfRecordDoesNotExist)).isFalse(); - assertThat(o.cannotCoExistWith(PutOption.Unconditionally)).isFalse(); - assertThat(o.cannotCoExistWith(PutOption.ifVersionIdEquals(1L))).isFalse(); - assertThat(o.cannotCoExistWith(PutOption.AsEphemeralRecord)).isFalse(); + if (o instanceof OptionVersionId e) { + assertThat(e.versionId()).isEqualTo(Version.KeyNotExists); + } }); } } @Test - void validate() { - assertThat( - PutOptionsUtil.validate( - PutOption.AsEphemeralRecord, - PutOption.AsEphemeralRecord, - PutOption.ifVersionIdEquals(1L), - PutOption.ifVersionIdEquals(1L))) - .containsOnly(PutOption.AsEphemeralRecord, PutOption.ifVersionIdEquals(1L)); - } + void validateFail() { + assertThatThrownBy( + () -> + PutOptionsUtil.getVersionId( + Set.of(PutOption.IfRecordDoesNotExist, PutOption.IfVersionIdEquals(1L)))) + .isInstanceOf(IllegalArgumentException.class); - @Test - void validateEmpty() { - assertThat(PutOptionsUtil.validate()).containsOnly(PutOption.Unconditionally); - } + assertThatThrownBy( + () -> + PutOptionsUtil.getVersionId( + Set.of(PutOption.IfVersionIdEquals(1L), PutOption.IfVersionIdEquals(1L)))) + .isInstanceOf(IllegalArgumentException.class); - @Test - void validateFail() { assertThatThrownBy( () -> - PutOptionsUtil.validate( - PutOption.Unconditionally, - PutOption.IfRecordDoesNotExist, - PutOption.ifVersionIdEquals(1L))) + PutOptionsUtil.getVersionId( + Set.of(PutOption.IfVersionIdEquals(1L), PutOption.IfVersionIdEquals(2L)))) .isInstanceOf(IllegalArgumentException.class); } @Test - void toVersionId() { - assertThat(PutOptionsUtil.toVersionId(Set.of(PutOption.AsEphemeralRecord))).isEmpty(); - assertThat(PutOptionsUtil.toVersionId(Set.of(PutOption.Unconditionally))).isEmpty(); - assertThat(PutOptionsUtil.toVersionId(Set.of(PutOption.IfRecordDoesNotExist))) + void getVersionId() { + assertThat(PutOptionsUtil.getVersionId(Set.of(PutOption.AsEphemeralRecord))).isEmpty(); + assertThat(PutOptionsUtil.getVersionId(Collections.emptySet())).isEmpty(); + assertThat(PutOptionsUtil.getVersionId(Set.of(PutOption.IfRecordDoesNotExist))) .hasValue(Version.KeyNotExists); - assertThat(PutOptionsUtil.toVersionId(Set.of(PutOption.ifVersionIdEquals(1L)))).hasValue(1L); + assertThat(PutOptionsUtil.getVersionId(Set.of(PutOption.IfVersionIdEquals(1L)))).hasValue(1L); assertThat( - PutOptionsUtil.toVersionId( - Set.of(PutOption.AsEphemeralRecord, PutOption.ifVersionIdEquals(1L)))) + PutOptionsUtil.getVersionId( + Set.of(PutOption.AsEphemeralRecord, PutOption.IfVersionIdEquals(1L)))) .hasValue(1L); } @Test - void toEphemeral() { - assertThat(PutOptionsUtil.toEphemeral(Set.of(PutOption.AsEphemeralRecord))).isTrue(); - assertThat(PutOptionsUtil.toEphemeral(Set.of())).isFalse(); + void isEphemeral() { + assertThat(PutOptionsUtil.isEphemeral(Set.of(PutOption.AsEphemeralRecord))).isTrue(); + assertThat(PutOptionsUtil.isEphemeral(Set.of(PutOption.IfRecordDoesNotExist))).isFalse(); + assertThat(PutOptionsUtil.isEphemeral(Set.of(PutOption.IfVersionIdEquals(5)))).isFalse(); + assertThat(PutOptionsUtil.isEphemeral(Collections.emptySet())).isFalse(); } } diff --git a/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java b/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java index d9d2296a..b5d4d24c 100644 --- a/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java @@ -60,6 +60,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Function; @@ -145,10 +146,10 @@ class WriteBatchTests { CompletableFuture deleteCallable = new CompletableFuture<>(); CompletableFuture deleteRangeCallable = new CompletableFuture<>(); - PutOperation put = new PutOperation(putCallable, "", new byte[0], Optional.of(1L), false); + PutOperation put = new PutOperation(putCallable, "", new byte[0], OptionalLong.of(1), false); PutOperation putEphemeral = - new PutOperation(putEphemeralCallable, "", new byte[0], Optional.of(1L), true); - DeleteOperation delete = new DeleteOperation(deleteCallable, "", Optional.of(1L)); + new PutOperation(putEphemeralCallable, "", new byte[0], OptionalLong.of(1), true); + DeleteOperation delete = new DeleteOperation(deleteCallable, "", OptionalLong.of(1)); DeleteRangeOperation deleteRange = new DeleteRangeOperation(deleteRangeCallable, "a", "b"); String clientIdentifier = "client-id"; diff --git a/client/src/test/java/io/streamnative/oxia/client/batch/BatcherTest.java b/client/src/test/java/io/streamnative/oxia/client/batch/BatcherTest.java index 45669769..930633f9 100644 --- a/client/src/test/java/io/streamnative/oxia/client/batch/BatcherTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/batch/BatcherTest.java @@ -35,7 +35,7 @@ import io.streamnative.oxia.client.batch.Operation.ReadOperation.GetOperation; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.Optional; +import java.util.OptionalLong; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; @@ -107,7 +107,7 @@ void addWhenNextDoesNotFit() { var callback = new CompletableFuture(); Operation op = new Operation.WriteOperation.PutOperation( - callback, "key", "value".getBytes(StandardCharsets.UTF_8), Optional.empty(), false); + callback, "key", "value".getBytes(StandardCharsets.UTF_8), OptionalLong.empty(), false); when(batchFactory.getBatch(shardId)).thenReturn(batch); when(batch.size()).thenReturn(config.maxRequestsPerBatch(), 1); when(batch.canAdd(any())).thenReturn(false); diff --git a/client/src/test/java/io/streamnative/oxia/client/batch/OperationTest.java b/client/src/test/java/io/streamnative/oxia/client/batch/OperationTest.java index 8fc90bb7..4569a85e 100644 --- a/client/src/test/java/io/streamnative/oxia/client/batch/OperationTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/batch/OperationTest.java @@ -43,6 +43,7 @@ import io.streamnative.oxia.proto.Version; import java.util.Objects; import java.util.Optional; +import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.junit.jupiter.api.DisplayName; @@ -145,7 +146,7 @@ void completeOther() { class PutOperationTests { CompletableFuture callback = new CompletableFuture<>(); byte[] payload = "hello".getBytes(UTF_8); - PutOperation op = new PutOperation(callback, "key", payload, Optional.of(10L), false); + PutOperation op = new PutOperation(callback, "key", payload, OptionalLong.of(10), false); long sessionId = 0L; String clientId = "client-id"; SessionInfo sessionInfo = new SessionInfo(sessionId, clientId); @@ -154,16 +155,18 @@ class PutOperationTests { void constructInvalidExpectedVersionId() { assertThatNoException() .isThrownBy( - () -> new PutOperation(callback, "key", payload, Optional.of(KeyNotExists), false)); + () -> + new PutOperation(callback, "key", payload, OptionalLong.of(KeyNotExists), false)); assertThatNoException() - .isThrownBy(() -> new PutOperation(callback, "key", payload, Optional.of(0L), false)); - assertThatThrownBy(() -> new PutOperation(callback, "key", payload, Optional.of(-2L), false)) + .isThrownBy(() -> new PutOperation(callback, "key", payload, OptionalLong.of(0L), false)); + assertThatThrownBy( + () -> new PutOperation(callback, "key", payload, OptionalLong.of(-2L), false)) .isInstanceOf(IllegalArgumentException.class); } @Test void toProtoNoExpectedVersion() { - var op = new PutOperation(callback, "key", payload, Optional.empty(), false); + var op = new PutOperation(callback, "key", payload, OptionalLong.empty(), false); var request = op.toProto(Optional.empty()); assertThat(request) .satisfies( @@ -178,7 +181,7 @@ void toProtoNoExpectedVersion() { @Test void toProtoExpectedVersion() { - var op = new PutOperation(callback, "key", payload, Optional.of(1L), false); + var op = new PutOperation(callback, "key", payload, OptionalLong.of(1L), false); var request = op.toProto(Optional.empty()); assertThat(request) .satisfies( @@ -193,7 +196,7 @@ void toProtoExpectedVersion() { @Test void toProtoNoExistingVersion() { - var op = new PutOperation(callback, "key", payload, Optional.of(KeyNotExists), false); + var op = new PutOperation(callback, "key", payload, OptionalLong.of(KeyNotExists), false); var request = op.toProto(Optional.empty()); assertThat(request) .satisfies( @@ -208,7 +211,7 @@ void toProtoNoExistingVersion() { @Test void toProtoEphemeral() { - var op = new PutOperation(callback, "key", payload, Optional.empty(), true); + var op = new PutOperation(callback, "key", payload, OptionalLong.empty(), true); var request = op.toProto(Optional.of(sessionInfo)); assertThat(request) .satisfies( @@ -238,7 +241,7 @@ void completeUnexpectedVersion() { @Test void completeKeyAlreadyExists() { - var op = new PutOperation(callback, "key", payload, Optional.of(KeyNotExists), false); + var op = new PutOperation(callback, "key", payload, OptionalLong.of(KeyNotExists), false); var response = PutResponse.newBuilder().setStatus(UNEXPECTED_VERSION_ID).build(); op.complete(response); assertThat(callback).isCompletedExceptionally(); @@ -254,7 +257,7 @@ void completeKeyAlreadyExists() { @Test void completeSessionDoesNotExist() { - var op = new PutOperation(callback, "key", payload, Optional.empty(), true); + var op = new PutOperation(callback, "key", payload, OptionalLong.empty(), true); var response = PutResponse.newBuilder().setStatus(SESSION_DOES_NOT_EXIST).build(); op.complete(response); assertThat(callback).isCompletedExceptionally(); @@ -332,15 +335,15 @@ void completeOther() { @DisplayName("Tests of delete operation") class DeleteOperationTests { CompletableFuture callback = new CompletableFuture<>(); - DeleteOperation op = new DeleteOperation(callback, "key", Optional.of(10L)); + DeleteOperation op = new DeleteOperation(callback, "key", OptionalLong.of(10L)); @Test void constructInvalidExpectedVersionId() { assertThatNoException() - .isThrownBy(() -> new DeleteOperation(callback, "key", Optional.of(0L))); - assertThatThrownBy(() -> new DeleteOperation(callback, "key", Optional.of(KeyNotExists))) + .isThrownBy(() -> new DeleteOperation(callback, "key", OptionalLong.of(0L))); + assertThatThrownBy(() -> new DeleteOperation(callback, "key", OptionalLong.of(KeyNotExists))) .isInstanceOf(IllegalArgumentException.class); - assertThatThrownBy(() -> new DeleteOperation(callback, "key", Optional.of(-2L))) + assertThatThrownBy(() -> new DeleteOperation(callback, "key", OptionalLong.of(-2L))) .isInstanceOf(IllegalArgumentException.class); } diff --git a/pulsar-metadatastore-oxia/src/main/java/io/streamnative/pulsarmetadatastoreoxia/OxiaMetadataStore.java b/pulsar-metadatastore-oxia/src/main/java/io/streamnative/pulsarmetadatastoreoxia/OxiaMetadataStore.java index fd4d6cbb..bef598a6 100644 --- a/pulsar-metadatastore-oxia/src/main/java/io/streamnative/pulsarmetadatastoreoxia/OxiaMetadataStore.java +++ b/pulsar-metadatastore-oxia/src/main/java/io/streamnative/pulsarmetadatastoreoxia/OxiaMetadataStore.java @@ -25,10 +25,13 @@ import io.streamnative.oxia.client.api.exceptions.KeyAlreadyExistsException; import io.streamnative.oxia.client.api.exceptions.UnexpectedVersionIdException; import java.time.Duration; +import java.util.Collections; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -154,12 +157,13 @@ protected CompletableFuture storeDelete(String path, Optional expect return CompletableFuture.failedFuture( new MetadataStoreException("Key '" + path + "' has children")); } else { - var delOption = + Set deleteOptions = expectedVersion - .map(DeleteOption::ifVersionIdEquals) - .orElse(DeleteOption.Unconditionally); - CompletableFuture result = client.delete(path, delOption); - return result + .map(v -> Collections.singleton(DeleteOption.IfVersionIdEquals(v))) + .orElse(Collections.emptySet()); + + return client + .delete(path, deleteOptions) .thenCompose( exists -> { if (!exists) { @@ -202,20 +206,20 @@ protected CompletableFuture storePut( } else { actualPath = CompletableFuture.completedFuture(path); } - var versionCondition = - expectedVersion - .map( - ver -> { - if (ver == -1) { - return PutOption.IfRecordDoesNotExist; - } - return PutOption.ifVersionIdEquals(ver); - }) - .orElse(PutOption.Unconditionally); - var putOptions = - options.contains(CreateOption.Ephemeral) - ? new PutOption[] {PutOption.AsEphemeralRecord, versionCondition} - : new PutOption[] {versionCondition}; + Set putOptions = new HashSet<>(); + expectedVersion + .map( + ver -> { + if (ver == -1) { + return PutOption.IfRecordDoesNotExist; + } + return PutOption.IfVersionIdEquals(ver); + }) + .ifPresent(putOptions::add); + + if (options.contains(CreateOption.Ephemeral)) { + putOptions.add(PutOption.AsEphemeralRecord); + } return actualPath .thenCompose( aPath -> @@ -247,7 +251,10 @@ private CompletableFuture createParents(String path) { return CompletableFuture.completedFuture(null); } else { return client - .put(parent, new byte[] {}, PutOption.IfRecordDoesNotExist) + .put( + parent, + new byte[] {}, + Collections.singleton(PutOption.IfRecordDoesNotExist)) .thenCompose(__ -> createParents(parent)); } })