From 6f5f8a5679453a0c0002c09fb5bb63659647943c Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 1 May 2024 23:11:16 -0700 Subject: [PATCH] Refactored OxiaClientBuilder (#143) * Refactored OxiaClientBuilder * Fixed perf client * Fixed pulsar adapters * Fixed import --- .../oxia/client/api/OxiaClientBuilder.java | 54 ++++++++++++ .../internal/DefaultImplementation.java | 53 ++++++++++++ .../oxia/client/internal/ReflectionUtils.java | 82 +++++++++++++++++++ client-it/pom.xml | 6 ++ .../oxia/client/it/OxiaClientFailFastIT.java | 4 +- .../oxia/client/it/OxiaClientIT.java | 6 +- ...uilder.java => OxiaClientBuilderImpl.java} | 23 +++++- .../client/CachingAsyncOxiaClientTest.java | 12 ++- .../oxia/client/OxiaClientBuilderTest.java | 3 +- .../oxia/client/batch/BatchTest.java | 5 +- .../oxia/client/batch/BatcherTest.java | 4 +- .../oxia/client/session/SessionTest.java | 2 +- .../oxia/client/shard/ModelFactory.java | 2 +- .../client/shard/ShardManagerGrpcTest.java | 2 +- .../oxia/client/shard/ShardManagerTest.java | 2 +- .../client/shard/StaticShardStrategy.java | 7 +- perf/pom.xml | 5 ++ .../oxia/client/perf/PerfArguments.java | 8 +- .../oxia/client/perf/PerfClient.java | 5 +- pom.xml | 6 +- .../OxiaMetadataStore.java | 4 +- 21 files changed, 262 insertions(+), 33 deletions(-) create mode 100644 client-api/src/main/java/io/streamnative/oxia/client/api/OxiaClientBuilder.java create mode 100644 client-api/src/main/java/io/streamnative/oxia/client/internal/DefaultImplementation.java create mode 100644 client-api/src/main/java/io/streamnative/oxia/client/internal/ReflectionUtils.java rename client/src/main/java/io/streamnative/oxia/client/{OxiaClientBuilder.java => OxiaClientBuilderImpl.java} (91%) diff --git a/client-api/src/main/java/io/streamnative/oxia/client/api/OxiaClientBuilder.java b/client-api/src/main/java/io/streamnative/oxia/client/api/OxiaClientBuilder.java new file mode 100644 index 00000000..97cbc4c4 --- /dev/null +++ b/client-api/src/main/java/io/streamnative/oxia/client/api/OxiaClientBuilder.java @@ -0,0 +1,54 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.api; + +import io.opentelemetry.api.OpenTelemetry; +import io.streamnative.oxia.client.api.exceptions.OxiaException; +import io.streamnative.oxia.client.internal.DefaultImplementation; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; + +public interface OxiaClientBuilder { + + static OxiaClientBuilder create(String serviceAddress) { + return DefaultImplementation.getDefaultImplementation(serviceAddress); + } + + SyncOxiaClient syncClient() throws OxiaException; + + CompletableFuture asyncClient(); + + OxiaClientBuilder requestTimeout(Duration requestTimeout); + + OxiaClientBuilder batchLinger(Duration batchLinger); + + OxiaClientBuilder maxRequestsPerBatch(int maxRequestsPerBatch); + + OxiaClientBuilder recordCacheCapacity(int recordCacheCapacity); + + OxiaClientBuilder namespace(String namespace); + + OxiaClientBuilder disableRecordCache(); + + OxiaClientBuilder sessionTimeout(Duration sessionTimeout); + + OxiaClientBuilder clientIdentifier(String clientIdentifier); + + OxiaClientBuilder clientIdentifier(Supplier clientIdentifier); + + OxiaClientBuilder openTelemetry(OpenTelemetry openTelemetry); +} diff --git a/client-api/src/main/java/io/streamnative/oxia/client/internal/DefaultImplementation.java b/client-api/src/main/java/io/streamnative/oxia/client/internal/DefaultImplementation.java new file mode 100644 index 00000000..dac7d1ac --- /dev/null +++ b/client-api/src/main/java/io/streamnative/oxia/client/internal/DefaultImplementation.java @@ -0,0 +1,53 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.internal; + +import io.streamnative.oxia.client.api.OxiaClientBuilder; +import java.lang.reflect.Constructor; + +/** + * This class loads the implementation for {@link OxiaClientBuilderImpl} and allows you to decouple + * the API from the actual implementation. This class is internal to the Oxia API implementation, + * and it is not part of the public API it is not meant to be used by client applications. + */ +public class DefaultImplementation { + private static final Constructor CONSTRUCTOR; + + private static final String IMPL_CLASS_NAME = "io.streamnative.oxia.client.OxiaClientBuilderImpl"; + + static { + Constructor impl; + try { + impl = ReflectionUtils.newClassInstance(IMPL_CLASS_NAME).getConstructor(String.class); + } catch (Throwable error) { + throw new RuntimeException("Cannot load Oxia Client Implementation: " + error, error); + } + CONSTRUCTOR = impl; + } + + /** + * Access the actual implementation of the Oxia Client API. + * + * @return the loaded implementation. + */ + public static OxiaClientBuilder getDefaultImplementation(String serviceAddress) { + try { + return (OxiaClientBuilder) CONSTRUCTOR.newInstance(serviceAddress); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/client-api/src/main/java/io/streamnative/oxia/client/internal/ReflectionUtils.java b/client-api/src/main/java/io/streamnative/oxia/client/internal/ReflectionUtils.java new file mode 100644 index 00000000..fd25d4a9 --- /dev/null +++ b/client-api/src/main/java/io/streamnative/oxia/client/internal/ReflectionUtils.java @@ -0,0 +1,82 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.internal; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import lombok.experimental.UtilityClass; + +@UtilityClass +class ReflectionUtils { + interface SupplierWithException { + T get() throws Exception; + } + + static T catchExceptions(SupplierWithException s) { + try { + return s.get(); + } catch (Throwable t) { + if (t instanceof InvocationTargetException) { + // exception is thrown during invocation + Throwable cause = t.getCause(); + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else { + throw new RuntimeException(cause); + } + } + throw new RuntimeException(t); + } + } + + @SuppressWarnings("unchecked") + static Class newClassInstance(String className) { + try { + try { + // when the API is loaded in the same classloader as the impl + return (Class) + Class.forName(className, true, DefaultImplementation.class.getClassLoader()); + } catch (Exception e) { + // when the API is loaded in a separate classloader as the impl + // the classloader that loaded the impl needs to be a child classloader of the classloader + // that loaded the API + return (Class) + Class.forName(className, true, Thread.currentThread().getContextClassLoader()); + } + } catch (ClassNotFoundException | NoClassDefFoundError e) { + throw new RuntimeException(e); + } + } + + static Constructor getConstructor(String className, Class... argTypes) { + try { + Class clazz = newClassInstance(className); + return clazz.getConstructor(argTypes); + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + } + + static Method getStaticMethod(String className, String method, Class... argTypes) { + try { + Class clazz = newClassInstance(className); + return clazz.getMethod(method, argTypes); + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + } +} diff --git a/client-it/pom.xml b/client-it/pom.xml index b906090c..5629e43e 100644 --- a/client-it/pom.xml +++ b/client-it/pom.xml @@ -60,6 +60,12 @@ ${project.version} test + + io.streamnative.oxia + oxia-client-api + ${project.version} + test + io.streamnative.oxia oxia-testcontainers diff --git a/client-it/src/test/java/io/streamnative/oxia/client/it/OxiaClientFailFastIT.java b/client-it/src/test/java/io/streamnative/oxia/client/it/OxiaClientFailFastIT.java index d7bb54ae..3b97c82b 100644 --- a/client-it/src/test/java/io/streamnative/oxia/client/it/OxiaClientFailFastIT.java +++ b/client-it/src/test/java/io/streamnative/oxia/client/it/OxiaClientFailFastIT.java @@ -15,7 +15,7 @@ */ package io.streamnative.oxia.client.it; -import io.streamnative.oxia.client.OxiaClientBuilder; +import io.streamnative.oxia.client.api.OxiaClientBuilder; import io.streamnative.oxia.client.shard.NamespaceNotFoundException; import io.streamnative.oxia.testcontainers.OxiaContainer; import java.util.concurrent.CompletionException; @@ -39,7 +39,7 @@ public class OxiaClientFailFastIT { @Test public void testWrongNamespace() { try { - new OxiaClientBuilder(oxia.getServiceAddress()) + OxiaClientBuilder.create(oxia.getServiceAddress()) .namespace("my-ns-does-not-exist") .asyncClient() .join(); diff --git a/client-it/src/test/java/io/streamnative/oxia/client/it/OxiaClientIT.java b/client-it/src/test/java/io/streamnative/oxia/client/it/OxiaClientIT.java index fa5facf8..bb5372c3 100644 --- a/client-it/src/test/java/io/streamnative/oxia/client/it/OxiaClientIT.java +++ b/client-it/src/test/java/io/streamnative/oxia/client/it/OxiaClientIT.java @@ -33,13 +33,13 @@ import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; -import io.streamnative.oxia.client.OxiaClientBuilder; import io.streamnative.oxia.client.api.AsyncOxiaClient; import io.streamnative.oxia.client.api.DeleteOption; import io.streamnative.oxia.client.api.Notification; import io.streamnative.oxia.client.api.Notification.KeyCreated; import io.streamnative.oxia.client.api.Notification.KeyDeleted; import io.streamnative.oxia.client.api.Notification.KeyModified; +import io.streamnative.oxia.client.api.OxiaClientBuilder; import io.streamnative.oxia.client.api.PutOption; import io.streamnative.oxia.client.api.exceptions.KeyAlreadyExistsException; import io.streamnative.oxia.client.api.exceptions.UnexpectedVersionIdException; @@ -86,7 +86,7 @@ static void beforeAll() { OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build(); client = - new OxiaClientBuilder(oxia.getServiceAddress()) + OxiaClientBuilder.create(oxia.getServiceAddress()) .openTelemetry(openTelemetry) .asyncClient() .join(); @@ -174,7 +174,7 @@ void test() throws Exception { var identity = getClass().getSimpleName(); try (var otherClient = - new OxiaClientBuilder(oxia.getServiceAddress()) + OxiaClientBuilder.create(oxia.getServiceAddress()) .clientIdentifier(identity) .asyncClient() .join()) { diff --git a/client/src/main/java/io/streamnative/oxia/client/OxiaClientBuilder.java b/client/src/main/java/io/streamnative/oxia/client/OxiaClientBuilderImpl.java similarity index 91% rename from client/src/main/java/io/streamnative/oxia/client/OxiaClientBuilder.java rename to client/src/main/java/io/streamnative/oxia/client/OxiaClientBuilderImpl.java index f6def6ef..fa136807 100644 --- a/client/src/main/java/io/streamnative/oxia/client/OxiaClientBuilder.java +++ b/client/src/main/java/io/streamnative/oxia/client/OxiaClientBuilderImpl.java @@ -21,7 +21,9 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; import io.streamnative.oxia.client.api.AsyncOxiaClient; +import io.streamnative.oxia.client.api.OxiaClientBuilder; import io.streamnative.oxia.client.api.SyncOxiaClient; +import io.streamnative.oxia.client.api.exceptions.OxiaException; import java.time.Duration; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -30,7 +32,7 @@ import lombok.RequiredArgsConstructor; @RequiredArgsConstructor -public class OxiaClientBuilder { +public class OxiaClientBuilderImpl implements OxiaClientBuilder { public static final Duration DefaultBatchLinger = Duration.ofMillis(5); public static final int DefaultMaxRequestsPerBatch = 1000; @@ -46,10 +48,14 @@ public class OxiaClientBuilder { private int maxRequestsPerBatch = DefaultMaxRequestsPerBatch; private int recordCacheCapacity = DefaultRecordCacheCapacity; @NonNull private Duration sessionTimeout = DefaultSessionTimeout; - @NonNull private Supplier clientIdentifier = OxiaClientBuilder::randomClientIdentifier; + + @NonNull + private Supplier clientIdentifier = OxiaClientBuilderImpl::randomClientIdentifier; + @NonNull private String namespace = DefaultNamespace; @NonNull private OpenTelemetry openTelemetry = GlobalOpenTelemetry.get(); + @Override public @NonNull OxiaClientBuilder requestTimeout(@NonNull Duration requestTimeout) { if (requestTimeout.isNegative() || requestTimeout.equals(ZERO)) { throw new IllegalArgumentException( @@ -59,6 +65,7 @@ public class OxiaClientBuilder { return this; } + @Override public @NonNull OxiaClientBuilder batchLinger(@NonNull Duration batchLinger) { if (batchLinger.isNegative() || batchLinger.equals(ZERO)) { throw new IllegalArgumentException("batchLinger must be greater than zero: " + batchLinger); @@ -67,6 +74,7 @@ public class OxiaClientBuilder { return this; } + @Override public @NonNull OxiaClientBuilder maxRequestsPerBatch(int maxRequestsPerBatch) { if (maxRequestsPerBatch <= 0) { throw new IllegalArgumentException( @@ -76,6 +84,7 @@ public class OxiaClientBuilder { return this; } + @Override public @NonNull OxiaClientBuilder recordCacheCapacity(int recordCacheCapacity) { if (recordCacheCapacity <= 0) { throw new IllegalArgumentException( @@ -85,6 +94,7 @@ public class OxiaClientBuilder { return this; } + @Override public @NonNull OxiaClientBuilder namespace(@NonNull String namespace) { if (Strings.isNullOrEmpty(namespace)) { throw new IllegalArgumentException("namespace must not be null or empty."); @@ -93,11 +103,13 @@ public class OxiaClientBuilder { return this; } + @Override public @NonNull OxiaClientBuilder disableRecordCache() { recordCacheCapacity = 0; return this; } + @Override public @NonNull OxiaClientBuilder sessionTimeout(@NonNull Duration sessionTimeout) { if (sessionTimeout.isNegative() || sessionTimeout.equals(ZERO)) { throw new IllegalArgumentException( @@ -107,21 +119,25 @@ public class OxiaClientBuilder { return this; } + @Override public @NonNull OxiaClientBuilder clientIdentifier(@NonNull String clientIdentifier) { this.clientIdentifier = () -> clientIdentifier; return this; } + @Override public @NonNull OxiaClientBuilder clientIdentifier(@NonNull Supplier clientIdentifier) { this.clientIdentifier = clientIdentifier; return this; } + @Override public @NonNull OxiaClientBuilder openTelemetry(@NonNull OpenTelemetry openTelemetry) { this.openTelemetry = openTelemetry; return this; } + @Override public @NonNull CompletableFuture asyncClient() { var config = new ClientConfig( @@ -143,7 +159,8 @@ public class OxiaClientBuilder { } } - public @NonNull SyncOxiaClient syncClient() { + @Override + public SyncOxiaClient syncClient() throws OxiaException { return new SyncOxiaClientImpl(asyncClient().join()); } diff --git a/client/src/test/java/io/streamnative/oxia/client/CachingAsyncOxiaClientTest.java b/client/src/test/java/io/streamnative/oxia/client/CachingAsyncOxiaClientTest.java index c56b07f0..49ce0652 100644 --- a/client/src/test/java/io/streamnative/oxia/client/CachingAsyncOxiaClientTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/CachingAsyncOxiaClientTest.java @@ -15,7 +15,6 @@ */ package io.streamnative.oxia.client; -import static io.streamnative.oxia.client.OxiaClientBuilder.DefaultNamespace; import static java.nio.charset.StandardCharsets.UTF_8; import static java.time.Duration.ZERO; import static org.assertj.core.api.Assertions.assertThat; @@ -121,7 +120,16 @@ void getMocked() { void get() throws Exception { var config = new ClientConfig( - "localhost:8080", ZERO, ZERO, 1, 1024 * 1024, 1, ZERO, "id", null, DefaultNamespace); + "localhost:8080", + ZERO, + ZERO, + 1, + 1024 * 1024, + 1, + ZERO, + "id", + null, + OxiaClientBuilderImpl.DefaultNamespace); var cacheFactory = new CacheFactory(config, delegate); var value = "value".getBytes(UTF_8); diff --git a/client/src/test/java/io/streamnative/oxia/client/OxiaClientBuilderTest.java b/client/src/test/java/io/streamnative/oxia/client/OxiaClientBuilderTest.java index 1fe4f25e..6009a631 100644 --- a/client/src/test/java/io/streamnative/oxia/client/OxiaClientBuilderTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/OxiaClientBuilderTest.java @@ -19,12 +19,13 @@ import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import io.streamnative.oxia.client.api.OxiaClientBuilder; import java.time.Duration; import org.junit.jupiter.api.Test; class OxiaClientBuilderTest { - OxiaClientBuilder builder = new OxiaClientBuilder("address:1234"); + OxiaClientBuilder builder = OxiaClientBuilder.create("address:1234"); @Test void requestTimeout() { diff --git a/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java b/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java index 269975ba..d9d2296a 100644 --- a/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/batch/BatchTest.java @@ -15,7 +15,7 @@ */ package io.streamnative.oxia.client.batch; -import static io.streamnative.oxia.client.OxiaClientBuilder.DefaultNamespace; +import static io.streamnative.oxia.client.OxiaClientBuilderImpl.DefaultNamespace; import static io.streamnative.oxia.proto.OxiaClientGrpc.OxiaClientImplBase; import static io.streamnative.oxia.proto.Status.KEY_NOT_FOUND; import static io.streamnative.oxia.proto.Status.OK; @@ -33,6 +33,7 @@ import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; import io.streamnative.oxia.client.ClientConfig; +import io.streamnative.oxia.client.OxiaClientBuilderImpl; import io.streamnative.oxia.client.api.GetResult; import io.streamnative.oxia.client.api.PutResult; import io.streamnative.oxia.client.api.exceptions.UnexpectedVersionIdException; @@ -90,7 +91,7 @@ class BatchTest { Duration.ofMillis(1000), "client_id", null, - DefaultNamespace); + OxiaClientBuilderImpl.DefaultNamespace); private final OxiaClientImplBase serviceImpl = mock( diff --git a/client/src/test/java/io/streamnative/oxia/client/batch/BatcherTest.java b/client/src/test/java/io/streamnative/oxia/client/batch/BatcherTest.java index 6504129a..45669769 100644 --- a/client/src/test/java/io/streamnative/oxia/client/batch/BatcherTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/batch/BatcherTest.java @@ -15,7 +15,6 @@ */ package io.streamnative.oxia.client.batch; -import static io.streamnative.oxia.client.OxiaClientBuilder.DefaultNamespace; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; @@ -30,6 +29,7 @@ import static org.mockito.Mockito.when; import io.streamnative.oxia.client.ClientConfig; +import io.streamnative.oxia.client.OxiaClientBuilderImpl; import io.streamnative.oxia.client.api.GetResult; import io.streamnative.oxia.client.api.PutResult; import io.streamnative.oxia.client.batch.Operation.ReadOperation.GetOperation; @@ -63,7 +63,7 @@ class BatcherTest { Duration.ofMillis(1000), "client_id", null, - DefaultNamespace); + OxiaClientBuilderImpl.DefaultNamespace); BlockingQueue> queue; Batcher batcher; diff --git a/client/src/test/java/io/streamnative/oxia/client/session/SessionTest.java b/client/src/test/java/io/streamnative/oxia/client/session/SessionTest.java index ab16174a..6a2dea88 100644 --- a/client/src/test/java/io/streamnative/oxia/client/session/SessionTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/session/SessionTest.java @@ -15,7 +15,7 @@ */ package io.streamnative.oxia.client.session; -import static io.streamnative.oxia.client.OxiaClientBuilder.DefaultNamespace; +import static io.streamnative.oxia.client.OxiaClientBuilderImpl.DefaultNamespace; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.mockito.Mockito.mock; diff --git a/client/src/test/java/io/streamnative/oxia/client/shard/ModelFactory.java b/client/src/test/java/io/streamnative/oxia/client/shard/ModelFactory.java index e5b732ea..2f4ffa72 100644 --- a/client/src/test/java/io/streamnative/oxia/client/shard/ModelFactory.java +++ b/client/src/test/java/io/streamnative/oxia/client/shard/ModelFactory.java @@ -15,7 +15,7 @@ */ package io.streamnative.oxia.client.shard; -import static io.streamnative.oxia.client.OxiaClientBuilder.DefaultNamespace; +import static io.streamnative.oxia.client.OxiaClientBuilderImpl.DefaultNamespace; import io.streamnative.oxia.proto.Int32HashRange; import io.streamnative.oxia.proto.NamespaceShardsAssignment; diff --git a/client/src/test/java/io/streamnative/oxia/client/shard/ShardManagerGrpcTest.java b/client/src/test/java/io/streamnative/oxia/client/shard/ShardManagerGrpcTest.java index f5d60ffb..0bbecc03 100644 --- a/client/src/test/java/io/streamnative/oxia/client/shard/ShardManagerGrpcTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/shard/ShardManagerGrpcTest.java @@ -15,7 +15,7 @@ */ package io.streamnative.oxia.client.shard; -import static io.streamnative.oxia.client.OxiaClientBuilder.DefaultNamespace; +import static io.streamnative.oxia.client.OxiaClientBuilderImpl.DefaultNamespace; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; diff --git a/client/src/test/java/io/streamnative/oxia/client/shard/ShardManagerTest.java b/client/src/test/java/io/streamnative/oxia/client/shard/ShardManagerTest.java index d3b436c0..eb9b01c5 100644 --- a/client/src/test/java/io/streamnative/oxia/client/shard/ShardManagerTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/shard/ShardManagerTest.java @@ -15,7 +15,7 @@ */ package io.streamnative.oxia.client.shard; -import static io.streamnative.oxia.client.OxiaClientBuilder.DefaultNamespace; +import static io.streamnative.oxia.client.OxiaClientBuilderImpl.DefaultNamespace; import static io.streamnative.oxia.client.shard.HashRangeShardStrategy.Xxh332HashRangeShardStrategy; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; diff --git a/client/src/test/java/io/streamnative/oxia/client/shard/StaticShardStrategy.java b/client/src/test/java/io/streamnative/oxia/client/shard/StaticShardStrategy.java index 929bd38d..c43d4ebc 100644 --- a/client/src/test/java/io/streamnative/oxia/client/shard/StaticShardStrategy.java +++ b/client/src/test/java/io/streamnative/oxia/client/shard/StaticShardStrategy.java @@ -15,9 +15,9 @@ */ package io.streamnative.oxia.client.shard; -import static io.streamnative.oxia.client.OxiaClientBuilder.DefaultNamespace; import static lombok.AccessLevel.PACKAGE; +import io.streamnative.oxia.client.OxiaClientBuilderImpl; import io.streamnative.oxia.proto.ShardAssignment; import io.streamnative.oxia.proto.ShardAssignments; import java.util.HashMap; @@ -46,9 +46,10 @@ class StaticShardStrategy implements ShardStrategy { public @NonNull StaticShardStrategy assign( @NonNull String key, @NonNull ShardAssignments response) { - var nsShardsAssignment = response.getNamespacesMap().get(DefaultNamespace); + var nsShardsAssignment = + response.getNamespacesMap().get(OxiaClientBuilderImpl.DefaultNamespace); if (nsShardsAssignment == null) { - throw new NamespaceNotFoundException(DefaultNamespace); + throw new NamespaceNotFoundException(OxiaClientBuilderImpl.DefaultNamespace); } if (nsShardsAssignment.getAssignmentsCount() != 1) { throw new IllegalArgumentException(); diff --git a/perf/pom.xml b/perf/pom.xml index fe335ec7..7679cafa 100644 --- a/perf/pom.xml +++ b/perf/pom.xml @@ -36,6 +36,11 @@ + + ${project.groupId} + oxia-client-api + ${project.version} + com.beust jcommander diff --git a/perf/src/main/java/io/streamnative/oxia/client/perf/PerfArguments.java b/perf/src/main/java/io/streamnative/oxia/client/perf/PerfArguments.java index 4626182a..73d91dea 100644 --- a/perf/src/main/java/io/streamnative/oxia/client/perf/PerfArguments.java +++ b/perf/src/main/java/io/streamnative/oxia/client/perf/PerfArguments.java @@ -17,7 +17,7 @@ import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; -import io.streamnative.oxia.client.OxiaClientBuilder; +import io.streamnative.oxia.client.OxiaClientBuilderImpl; @Parameters(commandDescription = "Test Oxia Java client performance.") public class PerfArguments { @@ -61,15 +61,15 @@ public class PerfArguments { @Parameter( names = {"--batch-linger-ms"}, description = "Batch linger time") - long batchLingerMs = OxiaClientBuilder.DefaultBatchLinger.toMillis(); + long batchLingerMs = OxiaClientBuilderImpl.DefaultBatchLinger.toMillis(); @Parameter( names = {"--max-requests-per-batch"}, description = "Maximum requests per batch") - int maxRequestsPerBatch = OxiaClientBuilder.DefaultMaxRequestsPerBatch; + int maxRequestsPerBatch = OxiaClientBuilderImpl.DefaultMaxRequestsPerBatch; @Parameter( names = {"--request-timeout-ms"}, description = "Requests timeout") - long requestTimeoutMs = OxiaClientBuilder.DefaultRequestTimeout.toMillis(); + long requestTimeoutMs = OxiaClientBuilderImpl.DefaultRequestTimeout.toMillis(); } diff --git a/perf/src/main/java/io/streamnative/oxia/client/perf/PerfClient.java b/perf/src/main/java/io/streamnative/oxia/client/perf/PerfClient.java index a5d292be..aad19326 100644 --- a/perf/src/main/java/io/streamnative/oxia/client/perf/PerfClient.java +++ b/perf/src/main/java/io/streamnative/oxia/client/perf/PerfClient.java @@ -21,8 +21,9 @@ import com.beust.jcommander.ParameterException; import com.google.common.util.concurrent.RateLimiter; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; -import io.streamnative.oxia.client.OxiaClientBuilder; +import io.streamnative.oxia.client.OxiaClientBuilderImpl; import io.streamnative.oxia.client.api.AsyncOxiaClient; +import io.streamnative.oxia.client.api.OxiaClientBuilder; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -68,7 +69,7 @@ public static void main(String[] args) throws Exception { AutoConfiguredOpenTelemetrySdk sdk = AutoConfiguredOpenTelemetrySdk.builder() .build(); - AsyncOxiaClient client = new OxiaClientBuilder(arguments.serviceAddr) + AsyncOxiaClient client = OxiaClientBuilder.create(arguments.serviceAddr) .batchLinger(Duration.ofMillis(arguments.batchLingerMs)) .maxRequestsPerBatch(arguments.maxRequestsPerBatch) .requestTimeout(Duration.ofMillis(arguments.requestTimeoutMs)) diff --git a/pom.xml b/pom.xml index a66bad0c..15963356 100644 --- a/pom.xml +++ b/pom.xml @@ -95,17 +95,17 @@ 3.0.0 1.18.30 1.7.32 - 5.9.2 + 5.11.0-M1 5.11.0 1.34.1 - 1.17.6 + 1.19.7 32.1.3-jre 0.8.12 4.1 3.10.1 3.1.0 - 3.0.0-M7 + 3.2.5 4.8.1.0 2.39.0 diff --git a/pulsar-metadatastore-oxia/src/main/java/io/streamnative/pulsarmetadatastoreoxia/OxiaMetadataStore.java b/pulsar-metadatastore-oxia/src/main/java/io/streamnative/pulsarmetadatastoreoxia/OxiaMetadataStore.java index 77b19acc..fd4d6cbb 100644 --- a/pulsar-metadatastore-oxia/src/main/java/io/streamnative/pulsarmetadatastoreoxia/OxiaMetadataStore.java +++ b/pulsar-metadatastore-oxia/src/main/java/io/streamnative/pulsarmetadatastoreoxia/OxiaMetadataStore.java @@ -15,10 +15,10 @@ */ package io.streamnative.pulsarmetadatastoreoxia; -import io.streamnative.oxia.client.OxiaClientBuilder; import io.streamnative.oxia.client.api.AsyncOxiaClient; import io.streamnative.oxia.client.api.DeleteOption; import io.streamnative.oxia.client.api.Notification; +import io.streamnative.oxia.client.api.OxiaClientBuilder; import io.streamnative.oxia.client.api.PutOption; import io.streamnative.oxia.client.api.PutResult; import io.streamnative.oxia.client.api.Version; @@ -66,7 +66,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore { this.synchronizer = Optional.ofNullable(metadataStoreConfig.getSynchronizer()); identity = UUID.randomUUID().toString(); client = - new OxiaClientBuilder(serviceAddress) + OxiaClientBuilder.create(serviceAddress) .clientIdentifier(identity) .namespace(namespace) .sessionTimeout(Duration.ofMillis(metadataStoreConfig.getSessionTimeoutMillis()))