Skip to content

Commit

Permalink
Refactored OxiaClientBuilder (#143)
Browse files Browse the repository at this point in the history
* Refactored OxiaClientBuilder

* Fixed perf client

* Fixed pulsar adapters

* Fixed import
  • Loading branch information
merlimat authored May 2, 2024
1 parent 9e9585d commit 6f5f8a5
Show file tree
Hide file tree
Showing 21 changed files with 262 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oxia.client.api;

import io.opentelemetry.api.OpenTelemetry;
import io.streamnative.oxia.client.api.exceptions.OxiaException;
import io.streamnative.oxia.client.internal.DefaultImplementation;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

public interface OxiaClientBuilder {

static OxiaClientBuilder create(String serviceAddress) {
return DefaultImplementation.getDefaultImplementation(serviceAddress);
}

SyncOxiaClient syncClient() throws OxiaException;

CompletableFuture<AsyncOxiaClient> asyncClient();

OxiaClientBuilder requestTimeout(Duration requestTimeout);

OxiaClientBuilder batchLinger(Duration batchLinger);

OxiaClientBuilder maxRequestsPerBatch(int maxRequestsPerBatch);

OxiaClientBuilder recordCacheCapacity(int recordCacheCapacity);

OxiaClientBuilder namespace(String namespace);

OxiaClientBuilder disableRecordCache();

OxiaClientBuilder sessionTimeout(Duration sessionTimeout);

OxiaClientBuilder clientIdentifier(String clientIdentifier);

OxiaClientBuilder clientIdentifier(Supplier<String> clientIdentifier);

OxiaClientBuilder openTelemetry(OpenTelemetry openTelemetry);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oxia.client.internal;

import io.streamnative.oxia.client.api.OxiaClientBuilder;
import java.lang.reflect.Constructor;

/**
* This class loads the implementation for {@link OxiaClientBuilderImpl} and allows you to decouple
* the API from the actual implementation. <b>This class is internal to the Oxia API implementation,
* and it is not part of the public API it is not meant to be used by client applications.</b>
*/
public class DefaultImplementation {
private static final Constructor<?> CONSTRUCTOR;

private static final String IMPL_CLASS_NAME = "io.streamnative.oxia.client.OxiaClientBuilderImpl";

static {
Constructor<?> impl;
try {
impl = ReflectionUtils.newClassInstance(IMPL_CLASS_NAME).getConstructor(String.class);
} catch (Throwable error) {
throw new RuntimeException("Cannot load Oxia Client Implementation: " + error, error);
}
CONSTRUCTOR = impl;
}

/**
* Access the actual implementation of the Oxia Client API.
*
* @return the loaded implementation.
*/
public static OxiaClientBuilder getDefaultImplementation(String serviceAddress) {
try {
return (OxiaClientBuilder) CONSTRUCTOR.newInstance(serviceAddress);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oxia.client.internal;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import lombok.experimental.UtilityClass;

@UtilityClass
class ReflectionUtils {
interface SupplierWithException<T> {
T get() throws Exception;
}

static <T> T catchExceptions(SupplierWithException<T> s) {
try {
return s.get();
} catch (Throwable t) {
if (t instanceof InvocationTargetException) {
// exception is thrown during invocation
Throwable cause = t.getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw new RuntimeException(cause);
}
}
throw new RuntimeException(t);
}
}

@SuppressWarnings("unchecked")
static <T> Class<T> newClassInstance(String className) {
try {
try {
// when the API is loaded in the same classloader as the impl
return (Class<T>)
Class.forName(className, true, DefaultImplementation.class.getClassLoader());
} catch (Exception e) {
// when the API is loaded in a separate classloader as the impl
// the classloader that loaded the impl needs to be a child classloader of the classloader
// that loaded the API
return (Class<T>)
Class.forName(className, true, Thread.currentThread().getContextClassLoader());
}
} catch (ClassNotFoundException | NoClassDefFoundError e) {
throw new RuntimeException(e);
}
}

static <T> Constructor<T> getConstructor(String className, Class<?>... argTypes) {
try {
Class<T> clazz = newClassInstance(className);
return clazz.getConstructor(argTypes);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
}

static <T> Method getStaticMethod(String className, String method, Class<?>... argTypes) {
try {
Class<T> clazz = newClassInstance(className);
return clazz.getMethod(method, argTypes);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
}
}
6 changes: 6 additions & 0 deletions client-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.streamnative.oxia</groupId>
<artifactId>oxia-client-api</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.streamnative.oxia</groupId>
<artifactId>oxia-testcontainers</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.streamnative.oxia.client.it;

import io.streamnative.oxia.client.OxiaClientBuilder;
import io.streamnative.oxia.client.api.OxiaClientBuilder;
import io.streamnative.oxia.client.shard.NamespaceNotFoundException;
import io.streamnative.oxia.testcontainers.OxiaContainer;
import java.util.concurrent.CompletionException;
Expand All @@ -39,7 +39,7 @@ public class OxiaClientFailFastIT {
@Test
public void testWrongNamespace() {
try {
new OxiaClientBuilder(oxia.getServiceAddress())
OxiaClientBuilder.create(oxia.getServiceAddress())
.namespace("my-ns-does-not-exist")
.asyncClient()
.join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
import io.streamnative.oxia.client.OxiaClientBuilder;
import io.streamnative.oxia.client.api.AsyncOxiaClient;
import io.streamnative.oxia.client.api.DeleteOption;
import io.streamnative.oxia.client.api.Notification;
import io.streamnative.oxia.client.api.Notification.KeyCreated;
import io.streamnative.oxia.client.api.Notification.KeyDeleted;
import io.streamnative.oxia.client.api.Notification.KeyModified;
import io.streamnative.oxia.client.api.OxiaClientBuilder;
import io.streamnative.oxia.client.api.PutOption;
import io.streamnative.oxia.client.api.exceptions.KeyAlreadyExistsException;
import io.streamnative.oxia.client.api.exceptions.UnexpectedVersionIdException;
Expand Down Expand Up @@ -86,7 +86,7 @@ static void beforeAll() {
OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();

client =
new OxiaClientBuilder(oxia.getServiceAddress())
OxiaClientBuilder.create(oxia.getServiceAddress())
.openTelemetry(openTelemetry)
.asyncClient()
.join();
Expand Down Expand Up @@ -174,7 +174,7 @@ void test() throws Exception {

var identity = getClass().getSimpleName();
try (var otherClient =
new OxiaClientBuilder(oxia.getServiceAddress())
OxiaClientBuilder.create(oxia.getServiceAddress())
.clientIdentifier(identity)
.asyncClient()
.join()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.streamnative.oxia.client.api.AsyncOxiaClient;
import io.streamnative.oxia.client.api.OxiaClientBuilder;
import io.streamnative.oxia.client.api.SyncOxiaClient;
import io.streamnative.oxia.client.api.exceptions.OxiaException;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand All @@ -30,7 +32,7 @@
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class OxiaClientBuilder {
public class OxiaClientBuilderImpl implements OxiaClientBuilder {

public static final Duration DefaultBatchLinger = Duration.ofMillis(5);
public static final int DefaultMaxRequestsPerBatch = 1000;
Expand All @@ -46,10 +48,14 @@ public class OxiaClientBuilder {
private int maxRequestsPerBatch = DefaultMaxRequestsPerBatch;
private int recordCacheCapacity = DefaultRecordCacheCapacity;
@NonNull private Duration sessionTimeout = DefaultSessionTimeout;
@NonNull private Supplier<String> clientIdentifier = OxiaClientBuilder::randomClientIdentifier;

@NonNull
private Supplier<String> clientIdentifier = OxiaClientBuilderImpl::randomClientIdentifier;

@NonNull private String namespace = DefaultNamespace;
@NonNull private OpenTelemetry openTelemetry = GlobalOpenTelemetry.get();

@Override
public @NonNull OxiaClientBuilder requestTimeout(@NonNull Duration requestTimeout) {
if (requestTimeout.isNegative() || requestTimeout.equals(ZERO)) {
throw new IllegalArgumentException(
Expand All @@ -59,6 +65,7 @@ public class OxiaClientBuilder {
return this;
}

@Override
public @NonNull OxiaClientBuilder batchLinger(@NonNull Duration batchLinger) {
if (batchLinger.isNegative() || batchLinger.equals(ZERO)) {
throw new IllegalArgumentException("batchLinger must be greater than zero: " + batchLinger);
Expand All @@ -67,6 +74,7 @@ public class OxiaClientBuilder {
return this;
}

@Override
public @NonNull OxiaClientBuilder maxRequestsPerBatch(int maxRequestsPerBatch) {
if (maxRequestsPerBatch <= 0) {
throw new IllegalArgumentException(
Expand All @@ -76,6 +84,7 @@ public class OxiaClientBuilder {
return this;
}

@Override
public @NonNull OxiaClientBuilder recordCacheCapacity(int recordCacheCapacity) {
if (recordCacheCapacity <= 0) {
throw new IllegalArgumentException(
Expand All @@ -85,6 +94,7 @@ public class OxiaClientBuilder {
return this;
}

@Override
public @NonNull OxiaClientBuilder namespace(@NonNull String namespace) {
if (Strings.isNullOrEmpty(namespace)) {
throw new IllegalArgumentException("namespace must not be null or empty.");
Expand All @@ -93,11 +103,13 @@ public class OxiaClientBuilder {
return this;
}

@Override
public @NonNull OxiaClientBuilder disableRecordCache() {
recordCacheCapacity = 0;
return this;
}

@Override
public @NonNull OxiaClientBuilder sessionTimeout(@NonNull Duration sessionTimeout) {
if (sessionTimeout.isNegative() || sessionTimeout.equals(ZERO)) {
throw new IllegalArgumentException(
Expand All @@ -107,21 +119,25 @@ public class OxiaClientBuilder {
return this;
}

@Override
public @NonNull OxiaClientBuilder clientIdentifier(@NonNull String clientIdentifier) {
this.clientIdentifier = () -> clientIdentifier;
return this;
}

@Override
public @NonNull OxiaClientBuilder clientIdentifier(@NonNull Supplier<String> clientIdentifier) {
this.clientIdentifier = clientIdentifier;
return this;
}

@Override
public @NonNull OxiaClientBuilder openTelemetry(@NonNull OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
return this;
}

@Override
public @NonNull CompletableFuture<AsyncOxiaClient> asyncClient() {
var config =
new ClientConfig(
Expand All @@ -143,7 +159,8 @@ public class OxiaClientBuilder {
}
}

public @NonNull SyncOxiaClient syncClient() {
@Override
public SyncOxiaClient syncClient() throws OxiaException {
return new SyncOxiaClientImpl(asyncClient().join());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.streamnative.oxia.client;

import static io.streamnative.oxia.client.OxiaClientBuilder.DefaultNamespace;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.time.Duration.ZERO;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -121,7 +120,16 @@ void getMocked() {
void get() throws Exception {
var config =
new ClientConfig(
"localhost:8080", ZERO, ZERO, 1, 1024 * 1024, 1, ZERO, "id", null, DefaultNamespace);
"localhost:8080",
ZERO,
ZERO,
1,
1024 * 1024,
1,
ZERO,
"id",
null,
OxiaClientBuilderImpl.DefaultNamespace);
var cacheFactory = new CacheFactory(config, delegate);

var value = "value".getBytes(UTF_8);
Expand Down
Loading

0 comments on commit 6f5f8a5

Please sign in to comment.