Skip to content

Commit

Permalink
Refactored Put & Delete options (#144)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored May 2, 2024
1 parent 6f5f8a5 commit f3f6a31
Show file tree
Hide file tree
Showing 21 changed files with 359 additions and 369 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.streamnative.oxia.client.api.exceptions.UnexpectedVersionIdException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import lombok.NonNull;
Expand All @@ -29,7 +30,7 @@ public interface AsyncOxiaClient extends AutoCloseable {
* specified, at the instant when the put is applied. The put will not be applied if the server's
* versionId of the record does not match the expectation set in the call. If you wish the put to
* succeed only if the key does not already exist on the server, then pass the {@link
* Version#KeyNotExists} value.
* PutOption#IfRecordDoesNotExist} value.
*
* @param key The key with which the value should be associated.
* @param value The value to associate with the key.
Expand All @@ -40,7 +41,20 @@ public interface AsyncOxiaClient extends AutoCloseable {
* the call.
*/
@NonNull
CompletableFuture<PutResult> put(String key, byte[] value, PutOption... options);
CompletableFuture<PutResult> put(String key, byte[] value, Set<PutOption> options);

/**
* Conditionally associates a value with a key if the server's versionId of the record is as
* specified, at the instant when the put is applied. The put will not be applied if the server's
* versionId of the record does not match the expectation set in the call.
*
* @param key The key with which the value should be associated.
* @param value The value to associate with the key.
* @return The result of the put at the specified key. Supplied via a future that returns the
* {@link PutResult}.
*/
@NonNull
CompletableFuture<PutResult> put(String key, byte[] value);

/**
* Conditionally deletes the record associated with the key if the record exists, and the server's
Expand All @@ -56,7 +70,17 @@ public interface AsyncOxiaClient extends AutoCloseable {
* versionId at the server did not that match supplied in the call.
*/
@NonNull
CompletableFuture<Boolean> delete(String key, DeleteOption... options);
CompletableFuture<Boolean> delete(String key, Set<DeleteOption> options);

/**
* Unconditionally deletes the record associated with the key if the record exists.
*
* @param key Deletes the record with the specified key.
* @return A future that completes when the delete call has returned. The future can return a flag
* that will be true if the key was actually present on the server, false otherwise.
*/
@NonNull
CompletableFuture<Boolean> delete(String key);

/**
* Deletes any records with keys within the specified range. For more information on how keys are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,50 +15,9 @@
*/
package io.streamnative.oxia.client.api;

import static io.streamnative.oxia.client.api.DeleteOption.VersionIdDeleteOption;
import static io.streamnative.oxia.client.api.DeleteOption.VersionIdDeleteOption.IfVersionIdEquals;
import static io.streamnative.oxia.client.api.DeleteOption.VersionIdDeleteOption.Unconditionally;
public sealed interface DeleteOption permits OptionVersionId {

public sealed interface DeleteOption permits VersionIdDeleteOption {

default boolean cannotCoExistWith(DeleteOption option) {
return false;
}

sealed interface VersionIdDeleteOption extends DeleteOption
permits IfVersionIdEquals, Unconditionally {

Long toVersionId();

default boolean cannotCoExistWith(DeleteOption option) {
return option instanceof VersionIdDeleteOption;
}

record IfVersionIdEquals(long versionId) implements VersionIdDeleteOption {

public IfVersionIdEquals {
if (versionId < 0) {
throw new IllegalArgumentException("versionId cannot be less than 0 - was: " + versionId);
}
}

@Override
public Long toVersionId() {
return versionId();
}
}

record Unconditionally() implements VersionIdDeleteOption {
@Override
public Long toVersionId() {
return null;
}
}
}

VersionIdDeleteOption Unconditionally = new VersionIdDeleteOption.Unconditionally();

static VersionIdDeleteOption ifVersionIdEquals(long versionId) {
return new IfVersionIdEquals(versionId);
static DeleteOption IfVersionIdEquals(long versionId) {
return new OptionVersionId.OptionVersionIdEqual(versionId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oxia.client.api;

public record OptionEphemeral() implements PutOption {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oxia.client.api;

public sealed interface OptionVersionId extends PutOption, DeleteOption
permits OptionVersionId.OptionRecordDoesNotExist, OptionVersionId.OptionVersionIdEqual {

long versionId();

record OptionVersionIdEqual(long versionId) implements OptionVersionId {
public OptionVersionIdEqual {
if (versionId < 0) {
throw new IllegalArgumentException("versionId cannot be less than 0 - was: " + versionId);
}
}
}

record OptionRecordDoesNotExist() implements OptionVersionId {
@Override
public long versionId() {
return Version.KeyNotExists;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,63 +15,12 @@
*/
package io.streamnative.oxia.client.api;

import static io.streamnative.oxia.client.api.PutOption.AsEphemeralRecord;
import static io.streamnative.oxia.client.api.PutOption.VersionIdPutOption;
import static io.streamnative.oxia.client.api.PutOption.VersionIdPutOption.IfRecordDoesNotExist;
import static io.streamnative.oxia.client.api.PutOption.VersionIdPutOption.IfVersionIdEquals;
import static io.streamnative.oxia.client.api.PutOption.VersionIdPutOption.Unconditionally;
public sealed interface PutOption permits OptionEphemeral, OptionVersionId {

public sealed interface PutOption permits VersionIdPutOption, AsEphemeralRecord {
PutOption IfRecordDoesNotExist = new OptionVersionId.OptionRecordDoesNotExist();
PutOption AsEphemeralRecord = new OptionEphemeral();

default boolean cannotCoExistWith(PutOption option) {
return false;
}

sealed interface VersionIdPutOption extends PutOption
permits IfVersionIdEquals, IfRecordDoesNotExist, Unconditionally {

Long toVersionId();

default boolean cannotCoExistWith(PutOption option) {
return option instanceof VersionIdPutOption;
}

record IfVersionIdEquals(long versionId) implements VersionIdPutOption {

public IfVersionIdEquals {
if (versionId < 0) {
throw new IllegalArgumentException("versionId cannot be less than 0 - was: " + versionId);
}
}

@Override
public Long toVersionId() {
return versionId();
}
}

record IfRecordDoesNotExist() implements VersionIdPutOption {
@Override
public Long toVersionId() {
return Version.KeyNotExists;
}
}

record Unconditionally() implements VersionIdPutOption {
@Override
public Long toVersionId() {
return null;
}
}
}

record AsEphemeralRecord() implements PutOption {}

VersionIdPutOption IfRecordDoesNotExist = new VersionIdPutOption.IfRecordDoesNotExist();
VersionIdPutOption Unconditionally = new VersionIdPutOption.Unconditionally();
PutOption AsEphemeralRecord = new AsEphemeralRecord();

static VersionIdPutOption ifVersionIdEquals(long versionId) {
return new IfVersionIdEquals(versionId);
static PutOption IfVersionIdEquals(long versionId) {
return new OptionVersionId.OptionVersionIdEqual(versionId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,23 @@

import io.streamnative.oxia.client.api.exceptions.UnexpectedVersionIdException;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import lombok.NonNull;

/** Synchronous client for the Oxia service. */
public interface SyncOxiaClient extends AutoCloseable {

/**
* Associates a value with a key if the server's versionId of the record is as specified, at the
* instant when the put is applied.
*
* @param key The key with which the value should be associated.
* @param value The value to associate with the key.
* @return The result of the put at the specified key.
*/
PutResult put(@NonNull String key, byte @NonNull [] value);

/**
* Conditionally associates a value with a key if the server's versionId of the record is as
* specified, at the instant when the put is applied. The put will not be applied if the server's
Expand All @@ -37,9 +48,17 @@ public interface SyncOxiaClient extends AutoCloseable {
* @throws UnexpectedVersionIdException The versionId at the server did not that match supplied in
* the call.
*/
PutResult put(@NonNull String key, byte @NonNull [] value, PutOption... options)
PutResult put(@NonNull String key, byte @NonNull [] value, Set<PutOption> options)
throws UnexpectedVersionIdException;

/**
* Unconditionally deletes the record associated with the key if the record exists.
*
* @param key Deletes the record with the specified key.
* @return True if the key was actually present on the server, false otherwise.
*/
boolean delete(@NonNull String key);

/**
* Conditionally deletes the record associated with the key if the record exists, and the server's
* versionId of the record is as specified, at the instant when the delete is applied. The delete
Expand All @@ -52,7 +71,7 @@ PutResult put(@NonNull String key, byte @NonNull [] value, PutOption... options)
* @throws UnexpectedVersionIdException The versionId at the server did not that match supplied in
* the call.
*/
boolean delete(@NonNull String key, @NonNull DeleteOption... options)
boolean delete(@NonNull String key, @NonNull Set<DeleteOption> options)
throws UnexpectedVersionIdException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.streamnative.oxia.client.it;

import static io.streamnative.oxia.client.api.PutOption.IfRecordDoesNotExist;
import static io.streamnative.oxia.client.api.PutOption.ifVersionIdEquals;
import static io.streamnative.oxia.client.api.PutOption.IfVersionIdEquals;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.function.Function.identity;
Expand Down Expand Up @@ -46,6 +46,7 @@
import io.streamnative.oxia.testcontainers.OxiaContainer;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -102,13 +103,14 @@ static void afterAll() throws Exception {

@Test
void test() throws Exception {
var a = client.put("a", "a".getBytes(UTF_8), IfRecordDoesNotExist);
var b = client.put("b", "b".getBytes(UTF_8), IfRecordDoesNotExist);
var a = client.put("a", "a".getBytes(UTF_8), Set.of(IfRecordDoesNotExist));
var b = client.put("b", "b".getBytes(UTF_8), Set.of(IfRecordDoesNotExist));
var c = client.put("c", "c".getBytes(UTF_8));
var d = client.put("d", "d".getBytes(UTF_8));
allOf(a, b, c, d).join();

assertThatThrownBy(() -> client.put("a", "a".getBytes(UTF_8), IfRecordDoesNotExist).join())
assertThatThrownBy(
() -> client.put("a", "a".getBytes(UTF_8), Set.of(IfRecordDoesNotExist)).join())
.hasCauseInstanceOf(KeyAlreadyExistsException.class);
// verify 'a' is present
var getResult = client.get("a").join();
Expand All @@ -122,7 +124,7 @@ void test() throws Exception {
() -> assertThat(notifications).contains(new KeyCreated("a", finalAVersion)));

// update 'a' with expected version
client.put("a", "a2".getBytes(UTF_8), ifVersionIdEquals(aVersion)).join();
client.put("a", "a2".getBytes(UTF_8), Set.of(IfVersionIdEquals(aVersion))).join();
getResult = client.get("a").join();
assertThat(getResult.getValue()).isEqualTo("a2".getBytes(UTF_8));
aVersion = getResult.getVersion().versionId();
Expand All @@ -136,21 +138,24 @@ void test() throws Exception {
// put with unexpected version
var bVersion = client.get("b").join().getVersion().versionId();
assertThatThrownBy(
() -> client.put("b", "b2".getBytes(UTF_8), ifVersionIdEquals(bVersion + 1L)).join())
() ->
client
.put("b", "b2".getBytes(UTF_8), Set.of(IfVersionIdEquals(bVersion + 1L)))
.join())
.hasCauseInstanceOf(UnexpectedVersionIdException.class);

// delete with unexpected version
var cVersion = client.get("c").join().getVersion().versionId();
assertThatThrownBy(
() -> client.delete("c", DeleteOption.ifVersionIdEquals(cVersion + 1L)).join())
() -> client.delete("c", Set.of(DeleteOption.IfVersionIdEquals(cVersion + 1L))).join())
.hasCauseInstanceOf(UnexpectedVersionIdException.class);

// list all keys
var listResult = client.list("a", "e").join();
assertThat(listResult).containsOnly("a", "b", "c", "d");

// delete 'a' with expected version
client.delete("a", DeleteOption.ifVersionIdEquals(aVersion)).join();
client.delete("a", Set.of(DeleteOption.IfVersionIdEquals(aVersion))).join();
getResult = client.get("a").join();
assertThat(getResult).isNull();

Expand Down Expand Up @@ -178,13 +183,14 @@ void test() throws Exception {
.clientIdentifier(identity)
.asyncClient()
.join()) {
otherClient.put("f", "f".getBytes(), PutOption.AsEphemeralRecord).join();
otherClient.put("f", "f".getBytes(), Set.of(PutOption.AsEphemeralRecord)).join();
getResult = client.get("f").join();
var sessionId = getResult.getVersion().sessionId().get();
assertThat(sessionId).isNotNull();
assertThat(getResult.getVersion().clientIdentifier().get()).isEqualTo(identity);

var putResult = otherClient.put("g", "g".getBytes(), PutOption.AsEphemeralRecord).join();
var putResult =
otherClient.put("g", "g".getBytes(), Set.of(PutOption.AsEphemeralRecord)).join();
assertThat(putResult.version().clientIdentifier().get()).isEqualTo(identity);
assertThat(putResult.version().sessionId().get()).isNotNull();

Expand Down
Loading

0 comments on commit f3f6a31

Please sign in to comment.