diff --git a/CHANGELOG.md b/CHANGELOG.md index 21b8211d..e38f2349 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## [Unreleased] +### Added + +- Added support for caching of lookup joins. + ### Fixed - Fixed issue in the logging code of the `JavaNetHttpPollingClient` which prevents showing the status code and response body when the log level is configured at DEBUG (or lower) level. @@ -16,8 +20,6 @@ [Slf4JHttpLookupPostRequestCallback](https://github.com/getindata/flink-http-connector/blob/main/src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallback.java) is used instead. -- Added support for caching of synchronous lookup joins. - ## [0.13.0] - 2024-04-03 ### Added diff --git a/README.md b/README.md index 016de876..3b3ce57a 100644 --- a/README.md +++ b/README.md @@ -424,12 +424,12 @@ is set to `'true'`, it will be used as header value as is, without any extra mod | url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_ | | asyncPolling | optional | true/false - determines whether Async Polling should be used. Mechanism is based on Flink's Async I/O. | | lookup-method | optional | GET/POST/PUT (and any other) - determines what REST method should be used for lookup REST query. If not specified, `GET` method will be used. | -| lookup.cache | optional | Enum possible values: `NONE`, `PARTIAL`. The cache strategy for the lookup table. Currently supports `NONE` (no caching) and `PARTIAL` (caching entries on lookup operation in external API). | -| lookup.partial-cache.max-rows | optional | The max number of rows of lookup cache, over this value, the oldest rows will be expired. `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | -| lookup.partial-cache.expire-after-write | optional | The max time to live for each rows in lookup cache after writing into the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | -| lookup.partial-cache.expire-after-access | optional | The max time to live for each rows in lookup cache after accessing the entry in the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | -| lookup.partial-cache.cache-missing-key | optional | This is a boolean that defaults to true. Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | -| lookup.max-retries | optional | The max retry times if the lookup failed; default is 3. See the following Lookup Cache section for more details. | +| lookup.cache | optional | Enum possible values: `NONE`, `PARTIAL`. The cache strategy for the lookup table. Currently supports `NONE` (no caching) and `PARTIAL` (caching entries on lookup operation in external API). | +| lookup.partial-cache.max-rows | optional | The max number of rows of lookup cache, over this value, the oldest rows will be expired. `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | +| lookup.partial-cache.expire-after-write | optional | The max time to live for each rows in lookup cache after writing into the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | +| lookup.partial-cache.expire-after-access | optional | The max time to live for each rows in lookup cache after accessing the entry in the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | +| lookup.partial-cache.cache-missing-key | optional | This is a boolean that defaults to true. Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | +| lookup.max-retries | optional | The max retry times if the lookup failed; default is 3. See the following Lookup Cache section for more details. | | gid.connector.http.lookup.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Source, separated with comma. | | gid.connector.http.lookup.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.lookup.error.code` list, separated with comma. | | gid.connector.http.security.cert.server | optional | Path to trusted HTTP server certificate that should be add to connectors key store. More than one path can be specified using `,` as path delimiter. | @@ -469,18 +469,20 @@ is set to `'true'`, it will be used as header value as is, without any extra mod ## Lookup Cache -The HTTP Client connector can be used in temporal join as a lookup source (also known as a dimension table). +The HTTP Client connector can be used in lookup join as a lookup source (also known as a dimension table). -By default, the lookup cache is not enabled. You can enable it by setting `lookup.cache` to `PARTIAL`. Caching is only enabled if `asyncPolling` = false. -The scope of the cache is per job, so long running jobs can benefit from this caching. +By default, the lookup cache is not enabled. You can enable it by setting `lookup.cache` to `PARTIAL`. +The scope of the cache is per job, so long-running jobs can benefit from this caching. -The lookup cache is used to improve the performance of temporal joins. By default, the lookup cache is not enabled, so all the API requests are sent on the network. When the lookup cache is enabled, Flink looks in the cache first, and only sends requests -on the network when there is no cached value, then the cache is updated with the returned rows. The oldest rows in this cache are expired when the cache hits the max cached rows `lookup.partial-cache.max-rows` or when the row exceeds the max time to live specified by `lookup.partial-cache.expire-after-write` or `lookup.partial-cache.expire-after-access`. -The cached rows might not be the latest, but users can tune expiration options to a smaller value to have fresher data, but this may increase the number of API requests sent. So this is a balance between throughput and correctness. -A good use case for enabling this cache, is when the API responses are very slowly changing; for example master or reference data. -There are many cases when caching is not appropriate, for example calling an API to get the latest stock price. +The lookup cache is used to improve the performance of temporal joins. By default, the lookup cache is not enabled, +so all the API requests are sent on the network. When the lookup cache is enabled, Flink looks in the cache first, +and only sends requests on the network when there is no cached value, then the cache is updated with the returned rows. +The oldest rows in this cache are expired when the cache hits the max cached rows `lookup.partial-cache.max-rows` +or when the row exceeds the max time to live specified by `lookup.partial-cache.expire-after-write` +or `lookup.partial-cache.expire-after-access`. -By default, flink caches the empty query result for a Primary key. You can toggle this behaviour by setting `lookup.partial-cache.cache-missing-key` to false. +By default, flink caches the empty query result for the primary key. You can toggle this behaviour by setting +`lookup.partial-cache.cache-missing-key` to false. ## Build and deployment @@ -568,7 +570,6 @@ The mapping from Http Json Response to SQL table schema is done via Flink's Json ## TODO ### HTTP TableLookup Source -- Implement caches for async. - Think about Retry Policy for Http Request - Check other `//TODO`'s. diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/AsyncHttpTableLookupFunction.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/AsyncHttpTableLookupFunction.java index 0dccc7d5..720186ad 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/AsyncHttpTableLookupFunction.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/AsyncHttpTableLookupFunction.java @@ -1,8 +1,6 @@ package com.getindata.connectors.http.internal.table.lookup; import java.util.Collection; -import java.util.Collections; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -10,7 +8,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.functions.AsyncTableFunction; +import org.apache.flink.table.functions.AsyncLookupFunction; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.util.concurrent.ExecutorThreadFactory; @@ -19,7 +17,7 @@ @Slf4j @RequiredArgsConstructor -public class AsyncHttpTableLookupFunction extends AsyncTableFunction { +public class AsyncHttpTableLookupFunction extends AsyncLookupFunction { private static final String PULLING_THREAD_POOL_SIZE = "8"; @@ -73,29 +71,27 @@ public void open(FunctionContext context) throws Exception { ); } - public void eval(CompletableFuture> resultFuture, Object... keys) { - - CompletableFuture> future = new CompletableFuture<>(); - future.completeAsync(() -> decorate.lookupByKeys(keys), pullingThreadPool); + @Override + public CompletableFuture> asyncLookup(RowData keyRow) { + CompletableFuture> future = new CompletableFuture<>(); + future.completeAsync(() -> decorate.lookup(keyRow), pullingThreadPool); // We don't want to use ForkJoinPool at all. We are using a different thread pool // for publishing here intentionally to avoid thread starvation. + CompletableFuture> resultFuture = new CompletableFuture<>(); future.whenCompleteAsync( - (optionalResult, throwable) -> { + (result, throwable) -> { if (throwable != null) { log.error("Exception while processing Http Async request", throwable); resultFuture.completeExceptionally( new RuntimeException("Exception while processing Http Async request", throwable)); } else { - if (optionalResult.isPresent()) { - resultFuture.complete(Collections.singleton(optionalResult.get())); - } else { - resultFuture.complete(Collections.emptyList()); - } + resultFuture.complete(result); } }, publishingThreadPool); + return resultFuture; } public LookupRow getLookupRow() { diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/CachingHttpTableLookupFunction.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/CachingHttpTableLookupFunction.java deleted file mode 100644 index 1f1462be..00000000 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/CachingHttpTableLookupFunction.java +++ /dev/null @@ -1,88 +0,0 @@ -package com.getindata.connectors.http.internal.table.lookup; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicInteger; - -import lombok.AccessLevel; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.table.connector.source.lookup.cache.LookupCache; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.functions.FunctionContext; -import org.apache.flink.table.functions.LookupFunction; - -import com.getindata.connectors.http.internal.PollingClient; -import com.getindata.connectors.http.internal.PollingClientFactory; -import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils; - -@Slf4j -public class CachingHttpTableLookupFunction extends LookupFunction { - private final PollingClientFactory pollingClientFactory; - - private final DeserializationSchema responseSchemaDecoder; - - @VisibleForTesting - @Getter(AccessLevel.PACKAGE) - private final LookupRow lookupRow; - - @VisibleForTesting - @Getter(AccessLevel.PACKAGE) - private final HttpLookupConfig options; - - private transient AtomicInteger localHttpCallCounter; - - private transient PollingClient client; - - private LookupCache cache; - - public CachingHttpTableLookupFunction( - PollingClientFactory pollingClientFactory, - DeserializationSchema responseSchemaDecoder, - LookupRow lookupRow, - HttpLookupConfig options, - LookupCache cache) { - - this.pollingClientFactory = pollingClientFactory; - this.responseSchemaDecoder = responseSchemaDecoder; - this.lookupRow = lookupRow; - this.options = options; - this.cache = cache; - } - - @Override - public void open(FunctionContext context) throws Exception { - super.open(context); - - this.responseSchemaDecoder.open( - SerializationSchemaUtils - .createDeserializationInitContext(CachingHttpTableLookupFunction.class)); - - this.localHttpCallCounter = new AtomicInteger(0); - this.client = pollingClientFactory - .createPollClient(options, responseSchemaDecoder); - - context - .getMetricGroup() - .gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue()); - } - - /** - * This is a lookup method which is called by Flink framework at runtime. - */ - @Override - public Collection lookup(RowData keyRow) throws IOException { - log.debug("lookup=" + lookupRow); - localHttpCallCounter.incrementAndGet(); - Optional rowData= client.pull(keyRow); - List result = new ArrayList<>(); - rowData.ifPresent(row -> { result.add(row); }); - log.debug("lookup result=" + result); - return result; - } -} diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java index 67bcc1aa..c077f21c 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java @@ -2,6 +2,7 @@ import java.util.ArrayList; import java.util.List; +import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.serialization.DeserializationSchema; @@ -10,17 +11,19 @@ import org.apache.flink.table.api.DataTypes.Field; import org.apache.flink.table.connector.Projection; import org.apache.flink.table.connector.format.DecodingFormat; -import org.apache.flink.table.connector.source.AsyncTableFunctionProvider; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.LookupTableSource; import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +import org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider ; import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider; +import org.apache.flink.table.connector.source.lookup.PartialCachingAsyncLookupProvider; import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider; import org.apache.flink.table.connector.source.lookup.cache.LookupCache; import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.functions.AsyncLookupFunction; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; @@ -48,6 +51,7 @@ public class HttpLookupTableSource private final DynamicTableFactory.Context dynamicTableFactoryContext; private final DecodingFormat> decodingFormat; + @Nullable private final LookupCache cache; public HttpLookupTableSource( @@ -55,8 +59,7 @@ public HttpLookupTableSource( HttpLookupConfig lookupConfig, DecodingFormat> decodingFormat, DynamicTableFactory.Context dynamicTablecontext, - LookupCache cache) { - + @Nullable LookupCache cache) { this.physicalRowDataType = physicalRowDataType; this.lookupConfig = lookupConfig; this.decodingFormat = decodingFormat; @@ -100,39 +103,36 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContex PollingClientFactory pollingClientFactory = createPollingClientFactory(lookupQueryCreator, lookupConfig); - // In line with the JDBC implementation and current requirements, we are only - // supporting Partial Caching for synchronous operations. return getLookupRuntimeProvider(lookupRow, responseSchemaDecoder, pollingClientFactory); } protected LookupRuntimeProvider getLookupRuntimeProvider(LookupRow lookupRow, DeserializationSchema responseSchemaDecoder, PollingClientFactory pollingClientFactory) { + + HttpTableLookupFunction dataLookupFunction = + new HttpTableLookupFunction( + pollingClientFactory, + responseSchemaDecoder, + lookupRow, + lookupConfig + ); if (lookupConfig.isUseAsync()) { - HttpTableLookupFunction dataLookupFunction = - new HttpTableLookupFunction( - pollingClientFactory, - responseSchemaDecoder, - lookupRow, - lookupConfig - ); - log.info("Using Async version of HttpLookupTable."); - return AsyncTableFunctionProvider.of( - new AsyncHttpTableLookupFunction(dataLookupFunction)); + AsyncLookupFunction asyncLookupFunction = + new AsyncHttpTableLookupFunction(dataLookupFunction); + if (cache != null) { + log.info("Using async version of HttpLookupTable with cache."); + return PartialCachingAsyncLookupProvider.of(asyncLookupFunction, cache); + } else { + log.info("Using async version of HttpLookupTable without cache."); + return AsyncLookupFunctionProvider.of(asyncLookupFunction); + } } else { - CachingHttpTableLookupFunction dataLookupFunction = - new CachingHttpTableLookupFunction( - pollingClientFactory, - responseSchemaDecoder, - lookupRow, - lookupConfig, - cache - ); if (cache != null) { - log.debug("PartialCachingLookupProvider; cache = " + cache); + log.info("Using blocking version of HttpLookupTable with cache."); return PartialCachingLookupProvider.of(dataLookupFunction, cache); } else { - log.debug("Using LookupFunctionProvider."); + log.info("Using blocking version of HttpLookupTable without cache."); return LookupFunctionProvider.of(dataLookupFunction); } } diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java index 562b8e75..8579a02f 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java @@ -1,5 +1,7 @@ package com.getindata.connectors.http.internal.table.lookup; +import java.util.Collection; +import java.util.Collections; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; @@ -8,17 +10,16 @@ import lombok.extern.slf4j.Slf4j; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.FunctionContext; -import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.functions.LookupFunction; import com.getindata.connectors.http.internal.PollingClient; import com.getindata.connectors.http.internal.PollingClientFactory; import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils; @Slf4j -public class HttpTableLookupFunction extends TableFunction { +public class HttpTableLookupFunction extends LookupFunction { private final PollingClientFactory pollingClientFactory; @@ -50,11 +51,9 @@ public HttpTableLookupFunction( @Override public void open(FunctionContext context) throws Exception { - super.open(context); - this.responseSchemaDecoder.open( - SerializationSchemaUtils - .createDeserializationInitContext(HttpTableLookupFunction.class)); + SerializationSchemaUtils + .createDeserializationInitContext(HttpTableLookupFunction.class)); this.localHttpCallCounter = new AtomicInteger(0); this.client = pollingClientFactory @@ -64,17 +63,10 @@ public void open(FunctionContext context) throws Exception { .gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue()); } - /** - * This is a lookup method which is called by Flink framework in a runtime. - */ - public void eval(Object... keys) { - lookupByKeys(keys) - .ifPresent(this::collect); - } - - public Optional lookupByKeys(Object[] keys) { - RowData rowData = GenericRowData.of(keys); + @Override + public Collection lookup(RowData keyRow) { localHttpCallCounter.incrementAndGet(); - return client.pull(rowData); + Optional result = client.pull(keyRow); + return result.map(Collections::singletonList).orElse(Collections.emptyList()); } } diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index 0b0f3040..ce3a31cc 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -66,7 +66,7 @@ public JavaNetHttpPollingClient( @Override public Optional pull(RowData lookupRow) { try { - log.debug("Optional pull with Rowdata=" + lookupRow); + log.debug("Optional pull with Rowdata={}.", lookupRow); return queryAndProcess(lookupRow); } catch (Exception e) { log.error("Exception during HTTP request.", e); diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/AsyncHttpTableLookupFunctionTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/AsyncHttpTableLookupFunctionTest.java index d66070bf..fb627480 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/AsyncHttpTableLookupFunctionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/AsyncHttpTableLookupFunctionTest.java @@ -5,7 +5,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -13,8 +12,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.FunctionContext; import org.junit.jupiter.api.BeforeEach; @@ -82,7 +84,7 @@ void shouldEvaluateInAsyncWay() throws InterruptedException { assertThat(result.size()).isEqualTo(rowKeys.length); assertThat(threadNames.size()).isEqualTo(rowKeys.length); - verify(decorate, times(rowKeys.length)).lookupByKeys(any()); + verify(decorate, times(rowKeys.length)).lookup(any()); } @Test @@ -111,7 +113,7 @@ void shouldHandleExceptionOnOneThread() throws InterruptedException { throwable -> { wasException.set(true); latch.countDown(); - return Collections.emptyList(); + return emptyList(); }); } @@ -126,12 +128,12 @@ void shouldHandleExceptionOnOneThread() throws InterruptedException { // -1 since one will have an exception assertThat(result.size()).isEqualTo(rowKeys.length - 1); assertThat(threadNames.size()).isEqualTo(rowKeys.length); - verify(decorate, times(rowKeys.length)).lookupByKeys(any()); + verify(decorate, times(rowKeys.length)).lookup(any()); } @Test - void shouldHandleEmptyOptionalResult() throws InterruptedException { - mockPollingWithEmptyOptional(); + void shouldHandleEmptyCollectionResult() throws InterruptedException { + mockPollingWithEmptyList(); final List result = Collections.synchronizedList(new ArrayList<>()); @@ -169,48 +171,48 @@ void shouldHandleEmptyOptionalResult() throws InterruptedException { // -1 since one will have one empty result. assertThat(result.size()).isEqualTo(rowKeys.length - 1); assertThat(threadNames.size()).isEqualTo(rowKeys.length); - verify(decorate, times(rowKeys.length)).lookupByKeys(any()); + verify(decorate, times(rowKeys.length)).lookup(any()); } private void mockPolling() { - when(decorate.lookupByKeys(any())) + when(decorate.lookup(any())) .thenAnswer( invocationOnMock -> { threadNames.add(Thread.currentThread().getName()); // make sure we pile up all keyRows on threads barrier.await(); - return Optional.of(buildGenericRowData( - Collections.singletonList(invocationOnMock.getArgument(0)))); + return singletonList(buildGenericRowData( + singletonList(invocationOnMock.getArgument(0)))); }); } private void mockPollingWithException() { - when(decorate.lookupByKeys(any())) + when(decorate.lookup(any())) .thenAnswer( invocationOnMock -> { threadNames.add(Thread.currentThread().getName()); // make sure we pile up all keyRows on threads barrier.await(); - Integer argument = (Integer) ((Object[]) invocationOnMock.getArgument(0))[0]; + Integer argument = ((GenericRowData) invocationOnMock.getArgument(0)).getInt(0); if (argument == 12) { throw new RuntimeException("Exception On problematic item"); } - return Optional.of(buildGenericRowData(Collections.singletonList(argument))); + return singletonList(buildGenericRowData(singletonList(argument))); }); } - private void mockPollingWithEmptyOptional() { - when(decorate.lookupByKeys(any())) + private void mockPollingWithEmptyList() { + when(decorate.lookup(any())) .thenAnswer( invocationOnMock -> { threadNames.add(Thread.currentThread().getName()); // make sure we pile up all keyRows on threads barrier.await(); - Integer argument = (Integer) ((Object[]) invocationOnMock.getArgument(0))[0]; + Integer argument = ((GenericRowData) invocationOnMock.getArgument(0)).getInt(0); if (argument == 12) { - return Optional.empty(); + return emptyList(); } - return Optional.of(buildGenericRowData(Collections.singletonList(argument))); + return singletonList(buildGenericRowData(singletonList(argument))); }); } } diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java index 1d7c43e8..3b2f0b29 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java @@ -4,6 +4,7 @@ import java.util.Collection; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.SortedSet; import java.util.TreeSet; @@ -24,6 +25,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.connector.source.lookup.cache.LookupCache; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager; +import org.apache.flink.table.test.lookup.cache.LookupCacheAssert; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.StringUtils; @@ -772,6 +778,67 @@ public void testNestedLookupJoinWithoutCast() throws Exception { assertThat(collectedRows.size()).isEqualTo(5); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testHttpLookupJoinWithCache(boolean isAsync) throws Exception { + // GIVEN + LookupCacheManager.keepCacheOnRelease(true); + + setupServerStub(wireMockServer); + + String lookupTable = + "CREATE TABLE Customers (" + + "id STRING," + + "id2 STRING," + + "msg STRING," + + "uuid STRING," + + "details ROW<" + + "isActive BOOLEAN," + + "nestedDetails ROW<" + + "balance STRING" + + ">" + + ">" + + ") WITH (" + + "'format' = 'json'," + + "'connector' = 'rest-lookup'," + + "'lookup-method' = 'GET'," + + "'url' = 'http://localhost:9090/client'," + + "'gid.connector.http.source.lookup.header.Content-Type' = 'application/json'," + + (isAsync ? "'asyncPolling' = 'true'," : "") + + "'lookup.cache' = 'partial'," + + "'lookup.partial-cache.max-rows' = '100'" + + ")"; + + // WHEN + SortedSet rows = testLookupJoin(lookupTable); + + // THEN + try { + assertEnrichedRows(rows); + + LookupCacheAssert.assertThat(getCache()).hasSize(4) + .containsKey(GenericRowData.of( + BinaryStringData.fromString("3"), BinaryStringData.fromString("4"))) + .containsKey(GenericRowData.of( + BinaryStringData.fromString("4"), BinaryStringData.fromString("5"))) + .containsKey(GenericRowData.of( + BinaryStringData.fromString("1"), BinaryStringData.fromString("2"))) + .containsKey(GenericRowData.of( + BinaryStringData.fromString("2"), BinaryStringData.fromString("3"))); + } finally { + LookupCacheManager.getInstance().checkAllReleased(); + LookupCacheManager.getInstance().clear(); + LookupCacheManager.keepCacheOnRelease(false); + } + } + + private LookupCache getCache() { + Map managedCaches = + LookupCacheManager.getInstance().getManagedCaches(); + assertThat(managedCaches).as("There should be only 1 shared cache registered").hasSize(1); + return managedCaches.get(managedCaches.keySet().iterator().next()).getCache(); + } + private @NotNull SortedSet testLookupJoin(String lookupTable) throws Exception { String sourceTable = diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceTest.java index 8de98b25..86e81e32 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceTest.java @@ -1,6 +1,11 @@ package com.getindata.connectors.http.internal.table.lookup; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.metrics.groups.CacheMetricGroup; @@ -8,9 +13,10 @@ import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UniqueConstraint; -import org.apache.flink.table.connector.source.AsyncTableFunctionProvider; import org.apache.flink.table.connector.source.LookupTableSource; +import org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider; import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider; +import org.apache.flink.table.connector.source.lookup.PartialCachingAsyncLookupProvider; import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider; import org.apache.flink.table.connector.source.lookup.cache.LookupCache; import org.apache.flink.table.data.RowData; @@ -76,9 +82,9 @@ void shouldCreateTableSourceWithParams() { (HttpLookupTableSource) createTableSource(SCHEMA, getOptions()); LookupTableSource.LookupRuntimeProvider lookupProvider = - tableSource.getLookupRuntimeProvider(new LookupRuntimeProviderContext(lookupKey)); - CachingHttpTableLookupFunction tableFunction = (CachingHttpTableLookupFunction) - ((LookupFunctionProvider) lookupProvider).createLookupFunction(); + tableSource.getLookupRuntimeProvider(new LookupRuntimeProviderContext(lookupKey)); + HttpTableLookupFunction tableFunction = (HttpTableLookupFunction) + ((LookupFunctionProvider) lookupProvider).createLookupFunction(); LookupRow actualLookupRow = tableFunction.getLookupRow(); assertThat(actualLookupRow).isNotNull(); @@ -103,36 +109,37 @@ void shouldCreateAsyncTableSourceWithParams() { Map options = getOptionsWithAsync(); HttpLookupTableSource tableSource = - (HttpLookupTableSource) createTableSource(SCHEMA, options); + (HttpLookupTableSource) createTableSource(SCHEMA, options); - AsyncTableFunctionProvider lookupProvider = - (AsyncTableFunctionProvider) - tableSource.getLookupRuntimeProvider( - new LookupRuntimeProviderContext(lookupKey)); + AsyncLookupFunctionProvider lookupProvider = + (AsyncLookupFunctionProvider) + tableSource.getLookupRuntimeProvider( + new LookupRuntimeProviderContext(lookupKey)); AsyncHttpTableLookupFunction tableFunction = - (AsyncHttpTableLookupFunction) lookupProvider.createAsyncTableFunction(); + (AsyncHttpTableLookupFunction) lookupProvider.createAsyncLookupFunction(); LookupRow actualLookupRow = tableFunction.getLookupRow(); assertThat(actualLookupRow).isNotNull(); assertThat(actualLookupRow.getLookupEntries()).isNotEmpty(); assertThat(actualLookupRow.getLookupPhysicalRowDataType()) - .isEqualTo(PHYSICAL_ROW_DATA_TYPE); + .isEqualTo(PHYSICAL_ROW_DATA_TYPE); HttpLookupConfig actualLookupConfig = tableFunction.getOptions(); assertThat(actualLookupConfig).isNotNull(); assertThat(actualLookupConfig.isUseAsync()).isTrue(); assertThat( - actualLookupConfig.getReadableConfig().get(HttpLookupConnectorOptions.ASYNC_POLLING) + actualLookupConfig.getReadableConfig().get(HttpLookupConnectorOptions.ASYNC_POLLING) ) - .withFailMessage( - "Readable config probably was not passed" + - " from Table Factory or it is empty.") - .isTrue(); + .withFailMessage( + "Readable config probably was not passed" + + " from Table Factory or it is empty.") + .isTrue(); } + @ParameterizedTest @MethodSource("configProvider") - void testgetLookupRuntimeProvider(TestSpec testSpec) throws Exception { + void testGetLookupRuntimeProvider(TestSpec testSpec) { LookupCache cache = new LookupCache() { @Override public void open(CacheMetricGroup cacheMetricGroup) { @@ -170,7 +177,7 @@ public void close() throws Exception { .useAsync(testSpec.isAsync) .build(); LookupTableSource.LookupRuntimeProvider lookupRuntimeProvider = - getLookupRuntimeProvider(testSpec.hasCache?cache:null, options); + getLookupRuntimeProvider(testSpec.hasCache ? cache : null, options); assertTrue(testSpec.expected.isInstance(lookupRuntimeProvider)); } @@ -194,7 +201,7 @@ private TestSpec(boolean hasCache, public String toString() { return "TestSpec{" + "hasCache=" - +hasCache + + hasCache + ", isAsync=" + isAsync + ", expected=" @@ -212,27 +219,10 @@ static Collection configProvider() { @NotNull private static ImmutableList getTestSpecs() { return ImmutableList.of( - - new TestSpec( - false, - false, - LookupFunctionProvider.class - ), - new TestSpec( - true, - false, - PartialCachingLookupProvider.class - ), - new TestSpec( - false, - true, - AsyncTableFunctionProvider.class - ), - new TestSpec( - true, - true, - AsyncTableFunctionProvider.class - ) + new TestSpec(false, false, LookupFunctionProvider.class), + new TestSpec(true, false, PartialCachingLookupProvider.class), + new TestSpec(false, true, AsyncLookupFunctionProvider.class), + new TestSpec(true, true, PartialCachingAsyncLookupProvider.class) ); } @@ -246,6 +236,7 @@ private static ImmutableList getTestSpecs() { new LookupRuntimeProviderContext(lookupKeys); return tableSource.getLookupRuntimeProvider(null, null, null); } + private Map getOptionsWithAsync() { Map options = getOptions(); options = new HashMap<>(options);