From 2e123bfa294b984f1009fd048c72e06b5894b536 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Fri, 23 Feb 2024 11:04:24 +0800 Subject: [PATCH 1/3] [improve]change use-new-schema-change config deafult to true (#318) --- .../main/java/org/apache/doris/flink/sink/BackendUtil.java | 4 +++- .../sink/writer/serializer/JsonDebeziumSchemaSerializer.java | 2 +- .../main/java/org/apache/doris/flink/tools/cdc/CdcTools.java | 2 -- .../java/org/apache/doris/flink/tools/cdc/DatabaseSync.java | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java index 9a45ff0d5..cb5b6f2ae 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java @@ -89,7 +89,9 @@ public String getAvailableBackend() { public static boolean tryHttpConnection(String host) { try { - LOG.info("try to connect host {}", host); + if (LOG.isDebugEnabled()) { + LOG.debug("try to connect host {}", host); + } host = "http://" + host; URL url = new URL(host); HttpURLConnection co = (HttpURLConnection) url.openConnection(); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java index d34c1a360..c63326430 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java @@ -196,7 +196,7 @@ public static class Builder { private DorisOptions dorisOptions; private Pattern addDropDDLPattern; private String sourceTableName; - private boolean newSchemaChange; + private boolean newSchemaChange = true; private DorisExecutionOptions executionOptions; private Map tableMapping; private Map tableProperties; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java index c76506aeb..e1a01ab13 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -117,7 +117,6 @@ private static void syncDatabase( String multiToOneTarget = params.get("multi-to-one-target"); boolean createTableOnly = params.has("create-table-only"); boolean ignoreDefaultValue = params.has("ignore-default-value"); - boolean useNewSchemaChange = params.has("use-new-schema-change"); boolean singleSink = params.has("single-sink"); Preconditions.checkArgument(params.has("sink-conf")); @@ -140,7 +139,6 @@ private static void syncDatabase( .setSinkConfig(sinkConfig) .setTableConfig(tableMap) .setCreateTableOnly(createTableOnly) - .setNewSchemaChange(useNewSchemaChange) .setSingleSink(singleSink) .create(); databaseSync.build(); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index c211789fd..2aa09b681 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -73,7 +73,7 @@ public abstract class DatabaseSync { public StreamExecutionEnvironment env; private boolean createTableOnly = false; - private boolean newSchemaChange; + private boolean newSchemaChange = true; protected String includingTables; protected String excludingTables; protected String multiToOneOrigin; From 54c94a9e19cca392a622674af5144465413e973d Mon Sep 17 00:00:00 2001 From: Petrichor <31833513+vinlee19@users.noreply.github.com> Date: Mon, 26 Feb 2024 15:36:28 +0800 Subject: [PATCH 2/3] [fix](cdc) add Oracle table name validation (#320) --- .../apache/doris/flink/catalog/doris/TableSchema.java | 1 + .../flink/tools/cdc/oracle/OracleDatabaseSync.java | 11 +++++++++++ .../tools/cdc/sqlserver/SqlServerDatabaseSync.java | 2 +- 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java index f3da96289..4cc9098fa 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java @@ -23,6 +23,7 @@ import java.util.Map; public class TableSchema { + public static final String DORIS_TABLE_REGEX = "^[a-zA-Z][a-zA-Z0-9-_]*$"; private String database; private String table; private String tableComment; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java index ef2e7ac84..8ca66e4d8 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java @@ -32,6 +32,8 @@ import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import com.ververica.cdc.debezium.table.DebeziumOptions; import org.apache.doris.flink.catalog.doris.DataModel; +import org.apache.doris.flink.catalog.doris.TableSchema; +import org.apache.doris.flink.exception.CreateTableException; import org.apache.doris.flink.tools.cdc.DatabaseSync; import org.apache.doris.flink.tools.cdc.SourceSchema; import org.slf4j.Logger; @@ -118,6 +120,15 @@ public List getSchemaList() throws Exception { if (!isSyncNeeded(tableName)) { continue; } + // Oracle allows table names to contain special characters such as /, #, $, + // etc., as in 'A/B'. + // However, Doris does not support tables with these characters. + if (!tableName.matches(TableSchema.DORIS_TABLE_REGEX)) { + throw new CreateTableException( + String.format( + "The table name %s is invalid. Table names in Doris must match the regex pattern %s. Please consider renaming the table or use the 'excluding-tables' option to filter it out.", + tableName, TableSchema.DORIS_TABLE_REGEX)); + } SourceSchema sourceSchema = new OracleSchema( metaData, databaseName, schemaName, tableName, tableComment); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java index da96f0804..f4d6ba3f1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java @@ -125,7 +125,7 @@ public DataStreamSource buildCdcSource(StreamExecutionEnvironment env) { String databaseName = config.get(JdbcSourceOptions.DATABASE_NAME); String schemaName = config.get(JdbcSourceOptions.SCHEMA_NAME); Preconditions.checkNotNull(databaseName, "database-name in sqlserver is required"); - Preconditions.checkNotNull(databaseName, "schema-name in sqlserver is required"); + Preconditions.checkNotNull(schemaName, "schema-name in sqlserver is required"); String tableName = config.get(JdbcSourceOptions.TABLE_NAME); String hostname = config.get(JdbcSourceOptions.HOSTNAME); From 64fe57a22e867e8f7e56de8832a4420cd4147802 Mon Sep 17 00:00:00 2001 From: bingquanzhao Date: Thu, 29 Feb 2024 14:52:41 +0800 Subject: [PATCH 3/3] [improve] adjust the code framework related to CDC. (#319) --- .../jsondebezium/CdcDataChange.java | 40 +++++++++++ .../jsondebezium/CdcSchemaChange.java | 39 ++++++++++ .../jsondebezium/JsonDebeziumDataChange.java | 9 +-- .../JsonDebeziumSchemaChange.java | 6 +- .../doris/flink/tools/cdc/DatabaseSync.java | 39 ++++++---- .../flink/tools/cdc/JdbcSourceSchema.java | 71 +++++++++++++++++++ .../tools/cdc/ParsingProcessFunction.java | 12 ++-- .../doris/flink/tools/cdc/SourceSchema.java | 52 ++------------ .../flink/tools/cdc/mysql/MysqlSchema.java | 4 +- .../flink/tools/cdc/oracle/OracleSchema.java | 4 +- .../tools/cdc/postgres/PostgresSchema.java | 4 +- .../tools/cdc/sqlserver/SqlServerSchema.java | 4 +- 12 files changed, 208 insertions(+), 76 deletions(-) create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java new file mode 100644 index 000000000..c344aae36 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer.serializer.jsondebezium; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.doris.flink.sink.writer.ChangeEvent; +import org.apache.doris.flink.sink.writer.serializer.DorisRecord; + +import java.io.IOException; +import java.util.Map; + +/** + * When cdc connector captures data changes from the source database you need to inherit this class + * to complete the synchronized data changes to Doris schema. Supports data messages serialized to + * json + */ +public abstract class CdcDataChange implements ChangeEvent { + + protected abstract DorisRecord serialize(String record, JsonNode recordRoot, String op) + throws IOException; + + protected abstract Map extractBeforeRow(JsonNode record); + + protected abstract Map extractAfterRow(JsonNode record); +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java new file mode 100644 index 000000000..858a5effd --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer.serializer.jsondebezium; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.doris.flink.sink.writer.ChangeEvent; + +import java.io.IOException; + +/** + * When cdc connector captures data changes about source database schema changes, you need to + * inherit this class to complete the synchronized changes to Doris schema. Supports data messages + * serialized to json + */ +public abstract class CdcSchemaChange implements ChangeEvent { + + protected abstract String extractDatabase(JsonNode record); + + protected abstract String extractTable(JsonNode record); + + public abstract boolean schemaChange(JsonNode recordRoot) throws IOException; + + protected abstract String getCdcTableIdentifier(JsonNode record); +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java index 67aef0201..5075adf8c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.doris.flink.cfg.DorisOptions; -import org.apache.doris.flink.sink.writer.ChangeEvent; import org.apache.doris.flink.sink.writer.serializer.DorisRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +40,7 @@ * into doris through stream load.
* Supported data changes include: read, insert, update, delete. */ -public class JsonDebeziumDataChange implements ChangeEvent { +public class JsonDebeziumDataChange extends CdcDataChange { private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumDataChange.class); private static final String OP_READ = "r"; // snapshot read @@ -122,11 +121,13 @@ private byte[] extractUpdate(JsonNode recordRoot) throws JsonProcessingException return updateRow.toString().getBytes(StandardCharsets.UTF_8); } - private Map extractBeforeRow(JsonNode record) { + @Override + protected Map extractBeforeRow(JsonNode record) { return extractRow(record.get("before")); } - private Map extractAfterRow(JsonNode record) { + @Override + protected Map extractAfterRow(JsonNode record) { return extractRow(record.get("after")); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java index f449857c1..ccb204693 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java @@ -27,7 +27,6 @@ import com.fasterxml.jackson.databind.node.NullNode; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.sink.schema.SchemaChangeManager; -import org.apache.doris.flink.sink.writer.ChangeEvent; import org.apache.doris.flink.tools.cdc.SourceSchema; import java.util.Map; @@ -43,7 +42,7 @@ * comment synchronization, supports multi-column changes, and supports column name rename. Need to * be enabled by configuring use-new-schema-change. */ -public abstract class JsonDebeziumSchemaChange implements ChangeEvent { +public abstract class JsonDebeziumSchemaChange extends CdcSchemaChange { protected static String addDropDDLRegex = "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*"; protected Pattern addDropDDLPattern; @@ -69,6 +68,7 @@ protected boolean checkTable(JsonNode recordRoot) { return sourceTableName.equals(dbTbl); } + @Override protected String extractDatabase(JsonNode record) { if (record.get("source").has("schema")) { // compatible with schema @@ -78,6 +78,7 @@ protected String extractDatabase(JsonNode record) { } } + @Override protected String extractTable(JsonNode record) { return extractJsonNode(record.get("source"), "table"); } @@ -102,6 +103,7 @@ protected Tuple2 getDorisTableTuple(JsonNode record) { } @VisibleForTesting + @Override public String getCdcTableIdentifier(JsonNode record) { String db = extractJsonNode(record.get("source"), "db"); String schema = extractJsonNode(record.get("source"), "schema"); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 2aa09b681..8627a476e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -35,6 +35,7 @@ import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.sink.DorisSink; import org.apache.doris.flink.sink.writer.WriteMode; +import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer; import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer; import org.apache.doris.flink.table.DorisConfigOptions; import org.slf4j.Logger; @@ -159,7 +160,7 @@ public void build() throws Exception { streamSource.sinkTo(buildDorisSink()); } else { SingleOutputStreamOperator parsedStream = - streamSource.process(new ParsingProcessFunction(converter)); + streamSource.process(buildProcessFunction()); for (String table : dorisTables) { OutputTag recordOutputTag = ParsingProcessFunction.createRecordOutputTag(table); @@ -200,16 +201,26 @@ public DorisSink buildDorisSink() { return buildDorisSink(null); } + public ParsingProcessFunction buildProcessFunction() { + return new ParsingProcessFunction(converter); + } + /** create doris sink. */ public DorisSink buildDorisSink(String table) { String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES); String benodes = sinkConfig.getString(DorisConfigOptions.BENODES); String user = sinkConfig.getString(DorisConfigOptions.USERNAME); String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, ""); + String jdbcUrl = sinkConfig.getString(DorisConfigOptions.JDBC_URL); DorisSink.Builder builder = DorisSink.builder(); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); - dorisBuilder.setFenodes(fenodes).setBenodes(benodes).setUsername(user).setPassword(passwd); + dorisBuilder + .setJdbcUrl(jdbcUrl) + .setFenodes(fenodes) + .setBenodes(benodes) + .setUsername(user) + .setPassword(passwd); sinkConfig .getOptional(DorisConfigOptions.AUTO_REDIRECT) .ifPresent(dorisBuilder::setAutoRedirect); @@ -284,21 +295,23 @@ public DorisSink buildDorisSink(String table) { DorisExecutionOptions executionOptions = executionBuilder.build(); builder.setDorisReadOptions(DorisReadOptions.builder().build()) .setDorisExecutionOptions(executionOptions) - .setSerializer( - JsonDebeziumSchemaSerializer.builder() - .setDorisOptions(dorisBuilder.build()) - .setNewSchemaChange(newSchemaChange) - .setExecutionOptions(executionOptions) - .setTableMapping(tableMapping) - .setTableProperties(tableConfig) - .setTargetDatabase(database) - .setTargetTablePrefix(tablePrefix) - .setTargetTableSuffix(tableSuffix) - .build()) + .setSerializer(buildSchemaSerializer(dorisBuilder, executionOptions)) .setDorisOptions(dorisBuilder.build()); return builder.build(); } + public DorisRecordSerializer buildSchemaSerializer( + DorisOptions.Builder dorisBuilder, DorisExecutionOptions executionOptions) { + return JsonDebeziumSchemaSerializer.builder() + .setDorisOptions(dorisBuilder.build()) + .setNewSchemaChange(newSchemaChange) + .setExecutionOptions(executionOptions) + .setTableMapping(tableMapping) + .setTableProperties(tableConfig) + .setTargetDatabase(database) + .build(); + } + /** Filter table that need to be synchronized. */ protected boolean isSyncNeeded(String tableName) { boolean sync = true; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java new file mode 100644 index 000000000..86d6336dd --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc; + +import org.apache.doris.flink.catalog.doris.FieldSchema; + +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.LinkedHashMap; + +/** + * JdbcSourceSchema is a subclass of SourceSchema, used to build metadata about jdbc-related + * databases. + */ +public abstract class JdbcSourceSchema extends SourceSchema { + + public JdbcSourceSchema( + DatabaseMetaData metaData, + String databaseName, + String schemaName, + String tableName, + String tableComment) + throws Exception { + super(databaseName, schemaName, tableName, tableComment); + fields = new LinkedHashMap<>(); + try (ResultSet rs = metaData.getColumns(databaseName, schemaName, tableName, null)) { + while (rs.next()) { + String fieldName = rs.getString("COLUMN_NAME"); + String comment = rs.getString("REMARKS"); + String fieldType = rs.getString("TYPE_NAME"); + Integer precision = rs.getInt("COLUMN_SIZE"); + + if (rs.wasNull()) { + precision = null; + } + Integer scale = rs.getInt("DECIMAL_DIGITS"); + if (rs.wasNull()) { + scale = null; + } + String dorisTypeStr = convertToDorisType(fieldType, precision, scale); + fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr, comment)); + } + } + + primaryKeys = new ArrayList<>(); + try (ResultSet rs = metaData.getPrimaryKeys(databaseName, schemaName, tableName)) { + while (rs.next()) { + String fieldName = rs.getString("COLUMN_NAME"); + primaryKeys.add(fieldName); + } + } + } + + public abstract String convertToDorisType(String fieldType, Integer precision, Integer scale); +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java index 95b687c31..787d0ae1a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java @@ -29,7 +29,7 @@ import java.util.Map; public class ParsingProcessFunction extends ProcessFunction { - private ObjectMapper objectMapper = new ObjectMapper(); + protected ObjectMapper objectMapper = new ObjectMapper(); private transient Map> recordOutputTags; private DatabaseSync.TableNameConverter converter; @@ -46,13 +46,17 @@ public void open(Configuration parameters) throws Exception { public void processElement( String record, ProcessFunction.Context context, Collector collector) throws Exception { - JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class); - String tableName = extractJsonNode(recordRoot.get("source"), "table"); + String tableName = getRecordTableName(record); String dorisName = converter.convert(tableName); context.output(getRecordOutputTag(dorisName), record); } - private String extractJsonNode(JsonNode record, String key) { + protected String getRecordTableName(String record) throws Exception { + JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class); + return extractJsonNode(recordRoot.get("source"), "table"); + } + + protected String extractJsonNode(JsonNode record, String key) { return record != null && record.get(key) != null ? record.get(key).asText() : null; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java index c52520251..e09eb00e2 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java @@ -23,8 +23,6 @@ import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.catalog.doris.TableSchema; -import java.sql.DatabaseMetaData; -import java.sql.ResultSet; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; @@ -33,59 +31,23 @@ import java.util.StringJoiner; public abstract class SourceSchema { - private final String databaseName; - private final String schemaName; - private final String tableName; - private final String tableComment; - private final LinkedHashMap fields; - public final List primaryKeys; + protected final String databaseName; + protected final String schemaName; + protected final String tableName; + protected final String tableComment; + protected LinkedHashMap fields; + public List primaryKeys; public DataModel model = DataModel.UNIQUE; public SourceSchema( - DatabaseMetaData metaData, - String databaseName, - String schemaName, - String tableName, - String tableComment) + String databaseName, String schemaName, String tableName, String tableComment) throws Exception { this.databaseName = databaseName; this.schemaName = schemaName; this.tableName = tableName; this.tableComment = tableComment; - - fields = new LinkedHashMap<>(); - try (ResultSet rs = metaData.getColumns(databaseName, schemaName, tableName, null)) { - while (rs.next()) { - String fieldName = rs.getString("COLUMN_NAME"); - String comment = rs.getString("REMARKS"); - String fieldType = rs.getString("TYPE_NAME"); - String defaultValue = rs.getString("COLUMN_DEF"); - Integer precision = rs.getInt("COLUMN_SIZE"); - if (rs.wasNull()) { - precision = null; - } - - Integer scale = rs.getInt("DECIMAL_DIGITS"); - if (rs.wasNull()) { - scale = null; - } - String dorisTypeStr = convertToDorisType(fieldType, precision, scale); - fields.put( - fieldName, new FieldSchema(fieldName, dorisTypeStr, defaultValue, comment)); - } - } - - primaryKeys = new ArrayList<>(); - try (ResultSet rs = metaData.getPrimaryKeys(databaseName, schemaName, tableName)) { - while (rs.next()) { - String fieldName = rs.getString("COLUMN_NAME"); - primaryKeys.add(fieldName); - } - } } - public abstract String convertToDorisType(String fieldType, Integer precision, Integer scale); - public String getTableIdentifier() { return getString(databaseName, schemaName, tableName); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java index bf0eee7f6..f84ca9431 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java @@ -17,11 +17,11 @@ package org.apache.doris.flink.tools.cdc.mysql; -import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.apache.doris.flink.tools.cdc.JdbcSourceSchema; import java.sql.DatabaseMetaData; -public class MysqlSchema extends SourceSchema { +public class MysqlSchema extends JdbcSourceSchema { public MysqlSchema( DatabaseMetaData metaData, String databaseName, String tableName, String tableComment) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java index c0857542f..f843b6d25 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java @@ -17,11 +17,11 @@ package org.apache.doris.flink.tools.cdc.oracle; -import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.apache.doris.flink.tools.cdc.JdbcSourceSchema; import java.sql.DatabaseMetaData; -public class OracleSchema extends SourceSchema { +public class OracleSchema extends JdbcSourceSchema { public OracleSchema( DatabaseMetaData metaData, diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java index ecb6edfa2..32081164a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java @@ -17,11 +17,11 @@ package org.apache.doris.flink.tools.cdc.postgres; -import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.apache.doris.flink.tools.cdc.JdbcSourceSchema; import java.sql.DatabaseMetaData; -public class PostgresSchema extends SourceSchema { +public class PostgresSchema extends JdbcSourceSchema { public PostgresSchema( DatabaseMetaData metaData, diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java index ce060ea59..6d5ab9aac 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java @@ -17,11 +17,11 @@ package org.apache.doris.flink.tools.cdc.sqlserver; -import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.apache.doris.flink.tools.cdc.JdbcSourceSchema; import java.sql.DatabaseMetaData; -public class SqlServerSchema extends SourceSchema { +public class SqlServerSchema extends JdbcSourceSchema { public SqlServerSchema( DatabaseMetaData metaData,