diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java index b44680611..ff6ad475b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java @@ -24,21 +24,14 @@ public interface ConfigurationOptions { String DORIS_DEFAULT_CLUSTER = "default_cluster"; String TABLE_IDENTIFIER = "table.identifier"; - String DORIS_TABLE_IDENTIFIER = "doris.table.identifier"; String DORIS_READ_FIELD = "doris.read.field"; String DORIS_FILTER_QUERY = "doris.filter.query"; - String DORIS_FILTER_QUERY_IN_MAX_COUNT = "doris.filter.query.in.max.count"; - Integer DORIS_FILTER_QUERY_IN_VALUE_UPPER_LIMIT = 10000; - String DORIS_USER = "username"; String DORIS_PASSWORD = "password"; - - String DORIS_REQUEST_AUTH_USER = "doris.request.auth.user"; - String DORIS_REQUEST_AUTH_PASSWORD = "doris.request.auth.password"; String DORIS_REQUEST_RETRIES = "doris.request.retries"; - String DORIS_REQUEST_CONNECT_TIMEOUT_MS = "doris.request.connect.timeout.ms"; - String DORIS_REQUEST_READ_TIMEOUT_MS = "doris.request.read.timeout.ms"; - String DORIS_REQUEST_QUERY_TIMEOUT_S = "doris.request.query.timeout.s"; + String DORIS_REQUEST_CONNECT_TIMEOUT_MS = "doris.request.connect.timeout"; + String DORIS_REQUEST_READ_TIMEOUT_MS = "doris.request.read.timeout"; + String DORIS_REQUEST_QUERY_TIMEOUT_S = "doris.request.query.timeout"; Integer DORIS_REQUEST_RETRIES_DEFAULT = 3; Integer DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000; Integer DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000; @@ -53,12 +46,9 @@ public interface ConfigurationOptions { String DORIS_EXEC_MEM_LIMIT = "doris.exec.mem.limit"; Long DORIS_EXEC_MEM_LIMIT_DEFAULT = 2147483648L; - - String DORIS_VALUE_READER_CLASS = "doris.value.reader.class"; - + String DORIS_EXEC_MEM_LIMIT_DEFAULT_STR = "2048mb"; String DORIS_DESERIALIZE_ARROW_ASYNC = "doris.deserialize.arrow.async"; Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false; - String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size"; Integer DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java index c6988716d..b91b04bac 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java @@ -19,6 +19,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.table.factories.FactoryUtil; import org.apache.doris.flink.sink.writer.WriteMode; @@ -30,7 +31,7 @@ import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT; -import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT_STR; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT; @@ -95,47 +96,52 @@ public class DorisConfigOptions { ConfigOptions.key("doris.request.tablet.size") .intType() .defaultValue(DORIS_TABLET_SIZE_DEFAULT) - .withDescription(""); - public static final ConfigOption DORIS_REQUEST_CONNECT_TIMEOUT_MS = - ConfigOptions.key("doris.request.connect.timeout.ms") - .intType() - .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT) - .withDescription(""); - public static final ConfigOption DORIS_REQUEST_READ_TIMEOUT_MS = - ConfigOptions.key("doris.request.read.timeout.ms") - .intType() - .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT) - .withDescription(""); - public static final ConfigOption DORIS_REQUEST_QUERY_TIMEOUT_S = - ConfigOptions.key("doris.request.query.timeout.s") - .intType() - .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT) - .withDescription(""); + .withDescription( + "The number of Doris Tablets corresponding to a Partition. The smaller this value is set, the more Partitions will be generated. This improves the parallelism on the Flink side, but at the same time puts more pressure on Doris."); + public static final ConfigOption DORIS_REQUEST_CONNECT_TIMEOUT_MS = + ConfigOptions.key("doris.request.connect.timeout") + .durationType() + .defaultValue(Duration.ofMillis(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)) + .withDescription("Connection timeout for sending requests to Doris"); + public static final ConfigOption DORIS_REQUEST_READ_TIMEOUT_MS = + ConfigOptions.key("doris.request.read.timeout") + .durationType() + .defaultValue(Duration.ofMillis(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)) + .withDescription("Read timeout for sending requests to Doris"); + public static final ConfigOption DORIS_REQUEST_QUERY_TIMEOUT_S = + ConfigOptions.key("doris.request.query.timeout") + .durationType() + .defaultValue(Duration.ofSeconds(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)) + .withDescription( + "The timeout time for querying Doris, the default value is 1 hour, -1 means no timeout limit"); public static final ConfigOption DORIS_REQUEST_RETRIES = ConfigOptions.key("doris.request.retries") .intType() .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT) - .withDescription(""); + .withDescription("Number of retries to send requests to Doris"); public static final ConfigOption DORIS_DESERIALIZE_ARROW_ASYNC = ConfigOptions.key("doris.deserialize.arrow.async") .booleanType() .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT) - .withDescription(""); + .withDescription( + "Whether to support asynchronous conversion of Arrow format to RowBatch needed for connector iterations"); public static final ConfigOption DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions.key("doris.deserialize.queue.size") .intType() .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT) - .withDescription(""); + .withDescription( + "Asynchronous conversion of internal processing queue in Arrow format, effective when doris.deserialize.arrow.async is true"); public static final ConfigOption DORIS_BATCH_SIZE = ConfigOptions.key("doris.batch.size") .intType() .defaultValue(DORIS_BATCH_SIZE_DEFAULT) - .withDescription(""); - public static final ConfigOption DORIS_EXEC_MEM_LIMIT = + .withDescription( + "The maximum number of rows to read data from BE at a time. Increasing this value reduces the number of connections established between Flink and Doris. Thereby reducing the additional time overhead caused by network delay."); + public static final ConfigOption DORIS_EXEC_MEM_LIMIT = ConfigOptions.key("doris.exec.mem.limit") - .longType() - .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT) - .withDescription(""); + .memoryType() + .defaultValue(MemorySize.parse(DORIS_EXEC_MEM_LIMIT_DEFAULT_STR)) + .withDescription("Memory limit for a single query. The default is 2048mb."); public static final ConfigOption SOURCE_USE_OLD_API = ConfigOptions.key("source.use-old-api") .booleanType() @@ -198,20 +204,20 @@ public class DorisConfigOptions { .defaultValue(true) .withDescription("enable 2PC while loading"); - public static final ConfigOption SINK_CHECK_INTERVAL = + public static final ConfigOption SINK_CHECK_INTERVAL = ConfigOptions.key("sink.check-interval") - .intType() - .defaultValue(10000) + .durationType() + .defaultValue(Duration.ofMillis(10000)) .withDescription("check exception with the interval while loading"); public static final ConfigOption SINK_MAX_RETRIES = ConfigOptions.key("sink.max-retries") .intType() .defaultValue(3) .withDescription("the max retry times if writing records to database failed."); - public static final ConfigOption SINK_BUFFER_SIZE = + public static final ConfigOption SINK_BUFFER_SIZE = ConfigOptions.key("sink.buffer-size") - .intType() - .defaultValue(1024 * 1024) + .memoryType() + .defaultValue(MemorySize.parse("1mb")) .withDescription("the buffer size to cache data for stream load."); public static final ConfigOption SINK_BUFFER_COUNT = ConfigOptions.key("sink.buffer-count") @@ -263,10 +269,10 @@ public class DorisConfigOptions { .withDescription( "The maximum number of flush items in each batch, the default is 5w"); - public static final ConfigOption SINK_BUFFER_FLUSH_MAX_BYTES = + public static final ConfigOption SINK_BUFFER_FLUSH_MAX_BYTES = ConfigOptions.key("sink.buffer-flush.max-bytes") - .intType() - .defaultValue(10 * 1024 * 1024) + .memoryType() + .defaultValue(MemorySize.parse("10mb")) .withDescription( "The maximum number of bytes flushed in each batch, the default is 10MB"); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index bf5cd8ce3..f327b9c07 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -204,13 +204,16 @@ private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) { final DorisReadOptions.Builder builder = DorisReadOptions.builder(); builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC)) .setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE)) - .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT)) + .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT).getBytes()) .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY)) .setReadFields(readableConfig.get(DORIS_READ_FIELD)) - .setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S)) + .setRequestQueryTimeoutS( + (int) readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S).getSeconds()) .setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE)) - .setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS)) - .setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS)) + .setRequestConnectTimeoutMs( + (int) readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS).toMillis()) + .setRequestReadTimeoutMs( + (int) readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS).toMillis()) .setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES)) .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE)) .setUseOldApi(readableConfig.get(SOURCE_USE_OLD_API)); @@ -220,9 +223,9 @@ private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) { private DorisExecutionOptions getDorisExecutionOptions( ReadableConfig readableConfig, Properties streamLoadProp) { final DorisExecutionOptions.Builder builder = DorisExecutionOptions.builder(); - builder.setCheckInterval(readableConfig.get(SINK_CHECK_INTERVAL)); + builder.setCheckInterval((int) readableConfig.get(SINK_CHECK_INTERVAL).toMillis()); builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES)); - builder.setBufferSize(readableConfig.get(SINK_BUFFER_SIZE)); + builder.setBufferSize((int) readableConfig.get(SINK_BUFFER_SIZE).getBytes()); builder.setBufferCount(readableConfig.get(SINK_BUFFER_COUNT)); builder.setLabelPrefix(readableConfig.get(SINK_LABEL_PREFIX)); builder.setStreamLoadProp(streamLoadProp); @@ -245,7 +248,8 @@ private DorisExecutionOptions getDorisExecutionOptions( } builder.setFlushQueueSize(readableConfig.get(SINK_FLUSH_QUEUE_SIZE)); builder.setBufferFlushMaxRows(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS)); - builder.setBufferFlushMaxBytes(readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES)); + builder.setBufferFlushMaxBytes( + (int) readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES).getBytes()); builder.setBufferFlushIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis()); builder.setUseCache(readableConfig.get(SINK_USE_CACHE)); return builder.build(); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 6b0a0fdb3..14d3fbb93 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -260,10 +260,10 @@ public DorisSink buildDorisSink(String tableIdentifier) { .ifPresent(executionBuilder::setBufferCount); sinkConfig .getOptional(DorisConfigOptions.SINK_BUFFER_SIZE) - .ifPresent(executionBuilder::setBufferSize); + .ifPresent(v -> executionBuilder.setBufferSize((int) v.getBytes())); sinkConfig .getOptional(DorisConfigOptions.SINK_CHECK_INTERVAL) - .ifPresent(executionBuilder::setCheckInterval); + .ifPresent(v -> executionBuilder.setCheckInterval((int) v.toMillis())); sinkConfig .getOptional(DorisConfigOptions.SINK_MAX_RETRIES) .ifPresent(executionBuilder::setMaxRetries); @@ -289,7 +289,7 @@ public DorisSink buildDorisSink(String tableIdentifier) { .ifPresent(executionBuilder::setBufferFlushMaxRows); sinkConfig .getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES) - .ifPresent(executionBuilder::setBufferFlushMaxBytes); + .ifPresent(v -> executionBuilder.setBufferFlushMaxBytes((int) v.getBytes())); sinkConfig .getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_INTERVAL) .ifPresent(v -> executionBuilder.setBufferFlushIntervalMs(v.toMillis()));