Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP-63 Add caching #109

Merged
merged 3 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased]

### Added

- Added support for caching of lookup joins.

### Fixed

- Fixed issue in the logging code of the `JavaNetHttpPollingClient` which prevents showing the status code and response body when the log level is configured at DEBUG (or lower) level.
Expand Down
62 changes: 43 additions & 19 deletions README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ under the License.

<!-- IMPORTANT: If you update Flink, remember to update link to its docs in maven-javadoc-plugin <links>
section, omitting the patch part (so for 1.15.0 use 1.15). -->

<flink.version>1.16.3</flink.version>

<target.java.version>11</target.java.version>
Expand Down Expand Up @@ -296,7 +297,6 @@ under the License.
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M5</version>
<configuration>

</configuration>
</plugin>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package com.getindata.connectors.http.internal.table.lookup;

import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

Expand All @@ -19,7 +17,7 @@

@Slf4j
@RequiredArgsConstructor
public class AsyncHttpTableLookupFunction extends AsyncTableFunction<RowData> {
public class AsyncHttpTableLookupFunction extends AsyncLookupFunction {

private static final String PULLING_THREAD_POOL_SIZE = "8";

Expand Down Expand Up @@ -73,29 +71,27 @@ public void open(FunctionContext context) throws Exception {
);
}

public void eval(CompletableFuture<Collection<RowData>> resultFuture, Object... keys) {

CompletableFuture<Optional<RowData>> future = new CompletableFuture<>();
future.completeAsync(() -> decorate.lookupByKeys(keys), pullingThreadPool);
@Override
public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
CompletableFuture<Collection<RowData>> future = new CompletableFuture<>();
future.completeAsync(() -> decorate.lookup(keyRow), pullingThreadPool);

// We don't want to use ForkJoinPool at all. We are using a different thread pool
// for publishing here intentionally to avoid thread starvation.
CompletableFuture<Collection<RowData>> resultFuture = new CompletableFuture<>();
future.whenCompleteAsync(
(optionalResult, throwable) -> {
(result, throwable) -> {
if (throwable != null) {
log.error("Exception while processing Http Async request", throwable);
resultFuture.completeExceptionally(
new RuntimeException("Exception while processing Http Async request",
throwable));
} else {
if (optionalResult.isPresent()) {
resultFuture.complete(Collections.singleton(optionalResult.get()));
} else {
resultFuture.complete(Collections.emptyList());
}
resultFuture.complete(result);
}
},
publishingThreadPool);
return resultFuture;
}

public LookupRow getLookupRow() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.DeserializationSchema;
Expand All @@ -10,15 +11,19 @@
import org.apache.flink.table.api.DataTypes.Field;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider ;
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
import org.apache.flink.table.connector.source.lookup.PartialCachingAsyncLookupProvider;
import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -46,17 +51,20 @@ public class HttpLookupTableSource
private final DynamicTableFactory.Context dynamicTableFactoryContext;

private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
@Nullable
private final LookupCache cache;

public HttpLookupTableSource(
DataType physicalRowDataType,
HttpLookupConfig lookupConfig,
DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
DynamicTableFactory.Context dynamicTablecontext) {

DynamicTableFactory.Context dynamicTablecontext,
@Nullable LookupCache cache) {
this.physicalRowDataType = physicalRowDataType;
this.lookupConfig = lookupConfig;
this.decodingFormat = decodingFormat;
this.dynamicTableFactoryContext = dynamicTablecontext;
this.cache = cache;
}

@Override
Expand All @@ -66,6 +74,7 @@ public void applyProjection(int[][] projectedFields, DataType producedDataType)

@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
log.debug("getLookupRuntimeProvider Entry");

LookupRow lookupRow = extractLookupRow(lookupContext.getKeys());

Expand Down Expand Up @@ -94,21 +103,38 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContex
PollingClientFactory<RowData> pollingClientFactory =
createPollingClientFactory(lookupQueryCreator, lookupConfig);

HttpTableLookupFunction dataLookupFunction =
new HttpTableLookupFunction(
pollingClientFactory,
responseSchemaDecoder,
lookupRow,
lookupConfig
);
return getLookupRuntimeProvider(lookupRow, responseSchemaDecoder, pollingClientFactory);
}

protected LookupRuntimeProvider getLookupRuntimeProvider(LookupRow lookupRow,
DeserializationSchema<RowData> responseSchemaDecoder,
PollingClientFactory<RowData> pollingClientFactory) {

HttpTableLookupFunction dataLookupFunction =
new HttpTableLookupFunction(
pollingClientFactory,
responseSchemaDecoder,
lookupRow,
lookupConfig
);
if (lookupConfig.isUseAsync()) {
log.info("Using Async version of HttpLookupTable.");
return AsyncTableFunctionProvider.of(
new AsyncHttpTableLookupFunction(dataLookupFunction));
AsyncLookupFunction asyncLookupFunction =
new AsyncHttpTableLookupFunction(dataLookupFunction);
if (cache != null) {
log.info("Using async version of HttpLookupTable with cache.");
return PartialCachingAsyncLookupProvider.of(asyncLookupFunction, cache);
} else {
log.info("Using async version of HttpLookupTable without cache.");
return AsyncLookupFunctionProvider.of(asyncLookupFunction);
}
} else {
log.info("Using blocking version of HttpLookupTable.");
return TableFunctionProvider.of(dataLookupFunction);
if (cache != null) {
log.info("Using blocking version of HttpLookupTable with cache.");
return PartialCachingLookupProvider.of(dataLookupFunction, cache);
} else {
log.info("Using blocking version of HttpLookupTable without cache.");
return LookupFunctionProvider.of(dataLookupFunction);
}
}
}

Expand All @@ -118,7 +144,8 @@ public DynamicTableSource copy() {
physicalRowDataType,
lookupConfig,
decodingFormat,
dynamicTableFactoryContext
dynamicTableFactoryContext,
cache
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
Expand All @@ -15,6 +16,9 @@
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
Expand Down Expand Up @@ -48,7 +52,7 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, dynamicTableContext);

ReadableConfig readableConfig = helper.getOptions();
ReadableConfig readable = helper.getOptions();
helper.validateExcept(
// properties coming from org.apache.flink.table.api.config.ExecutionConfigOptions
"table.",
Expand All @@ -62,7 +66,7 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
FactoryUtil.FORMAT
);

HttpLookupConfig lookupConfig = getHttpLookupOptions(dynamicTableContext, readableConfig);
HttpLookupConfig lookupConfig = getHttpLookupOptions(dynamicTableContext, readable);

ResolvedSchema resolvedSchema = dynamicTableContext.getCatalogTable().getResolvedSchema();

Expand All @@ -73,7 +77,8 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
physicalRowDataType,
lookupConfig,
decodingFormat,
dynamicTableContext
dynamicTableContext,
getLookupCache(readable)
);
}

Expand All @@ -89,7 +94,18 @@ public Set<ConfigOption<?>> requiredOptions() {

@Override
public Set<ConfigOption<?>> optionalOptions() {
return Set.of(URL_ARGS, ASYNC_POLLING, LOOKUP_METHOD, REQUEST_CALLBACK_IDENTIFIER);

return Set.of(
URL_ARGS,
ASYNC_POLLING,
LOOKUP_METHOD,
REQUEST_CALLBACK_IDENTIFIER,
LookupOptions.CACHE_TYPE,
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
LookupOptions.PARTIAL_CACHE_MAX_ROWS,
LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY,
LookupOptions.MAX_RETRIES);
}

private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig readableConfig) {
Expand All @@ -115,6 +131,18 @@ private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig re
.build();
}

@Nullable
private LookupCache getLookupCache(ReadableConfig tableOptions) {
LookupCache cache = null;
// Do not support legacy cache options
if (tableOptions
.get(LookupOptions.CACHE_TYPE)
.equals(LookupOptions.LookupCacheType.PARTIAL)) {
cache = DefaultLookupCache.fromConfig(tableOptions);
}
return cache;
}

// TODO verify this since we are on 1.15 now.
// Backport from Flink 1.15-Master
private DataType toRowDataType(List<Column> columns, Predicate<Column> columnPredicate) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.getindata.connectors.http.internal.table.lookup;

import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -8,17 +10,16 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.LookupFunction;

import com.getindata.connectors.http.internal.PollingClient;
import com.getindata.connectors.http.internal.PollingClientFactory;
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;

@Slf4j
public class HttpTableLookupFunction extends TableFunction<RowData> {
public class HttpTableLookupFunction extends LookupFunction {

private final PollingClientFactory<RowData> pollingClientFactory;

Expand Down Expand Up @@ -50,32 +51,22 @@ public HttpTableLookupFunction(

@Override
public void open(FunctionContext context) throws Exception {
super.open(context);

this.responseSchemaDecoder.open(
SerializationSchemaUtils
.createDeserializationInitContext(HttpTableLookupFunction.class));

this.localHttpCallCounter = new AtomicInteger(0);
this.client = pollingClientFactory
.createPollClient(options, responseSchemaDecoder);

context
.getMetricGroup()
.gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue());
}
.createPollClient(options, responseSchemaDecoder);

/**
* This is a lookup method which is called by Flink framework in a runtime.
*/
public void eval(Object... keys) {
lookupByKeys(keys)
.ifPresent(this::collect);
context.getMetricGroup()
.gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue());
}

public Optional<RowData> lookupByKeys(Object[] keys) {
RowData rowData = GenericRowData.of(keys);
@Override
public Collection<RowData> lookup(RowData keyRow) {
localHttpCallCounter.incrementAndGet();
return client.pull(rowData);
Optional<RowData> result = client.pull(keyRow);
return result.map(Collections::singletonList).orElse(Collections.emptyList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public JavaNetHttpPollingClient(
@Override
public Optional<RowData> pull(RowData lookupRow) {
try {
log.debug("Optional<RowData> pull with Rowdata={}.", lookupRow);
return queryAndProcess(lookupRow);
} catch (Exception e) {
log.error("Exception during HTTP request.", e);
Expand Down
Loading
Loading