Skip to content

Commit

Permalink
Merge branch 'master' into multi-db-sync
Browse files Browse the repository at this point in the history
# Conflicts:
#	flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
#	flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
  • Loading branch information
JNSimba committed Feb 29, 2024
2 parents e0295fc + 64fe57a commit 06064a6
Show file tree
Hide file tree
Showing 18 changed files with 223 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;

public class TableSchema {
public static final String DORIS_TABLE_REGEX = "^[a-zA-Z][a-zA-Z0-9-_]*$";
private String database;
private String table;
private String tableComment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ public String getAvailableBackend() {

public static boolean tryHttpConnection(String host) {
try {
LOG.info("try to connect host {}", host);
if (LOG.isDebugEnabled()) {
LOG.debug("try to connect host {}", host);
}
host = "http://" + host;
URL url = new URL(host);
HttpURLConnection co = (HttpURLConnection) url.openConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public static class Builder {
private DorisOptions dorisOptions;
private Pattern addDropDDLPattern;
private String sourceTableName;
private boolean newSchemaChange;
private boolean newSchemaChange = true;
private DorisExecutionOptions executionOptions;
private Map<String, String> tableMapping;
private Map<String, String> tableProperties;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.sink.writer.serializer.jsondebezium;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.doris.flink.sink.writer.ChangeEvent;
import org.apache.doris.flink.sink.writer.serializer.DorisRecord;

import java.io.IOException;
import java.util.Map;

/**
* When cdc connector captures data changes from the source database you need to inherit this class
* to complete the synchronized data changes to Doris schema. Supports data messages serialized to
* json
*/
public abstract class CdcDataChange implements ChangeEvent {

protected abstract DorisRecord serialize(String record, JsonNode recordRoot, String op)
throws IOException;

protected abstract Map<String, Object> extractBeforeRow(JsonNode record);

protected abstract Map<String, Object> extractAfterRow(JsonNode record);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.sink.writer.serializer.jsondebezium;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.doris.flink.sink.writer.ChangeEvent;

import java.io.IOException;

/**
* When cdc connector captures data changes about source database schema changes, you need to
* inherit this class to complete the synchronized changes to Doris schema. Supports data messages
* serialized to json
*/
public abstract class CdcSchemaChange implements ChangeEvent {

protected abstract String extractDatabase(JsonNode record);

protected abstract String extractTable(JsonNode record);

public abstract boolean schemaChange(JsonNode recordRoot) throws IOException;

protected abstract String getCdcTableIdentifier(JsonNode record);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.sink.writer.ChangeEvent;
import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,7 +40,7 @@
* into doris through stream load.<br>
* Supported data changes include: read, insert, update, delete.
*/
public class JsonDebeziumDataChange implements ChangeEvent {
public class JsonDebeziumDataChange extends CdcDataChange {
private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumDataChange.class);

private static final String OP_READ = "r"; // snapshot read
Expand Down Expand Up @@ -122,11 +121,13 @@ private byte[] extractUpdate(JsonNode recordRoot) throws JsonProcessingException
return updateRow.toString().getBytes(StandardCharsets.UTF_8);
}

private Map<String, Object> extractBeforeRow(JsonNode record) {
@Override
protected Map<String, Object> extractBeforeRow(JsonNode record) {
return extractRow(record.get("before"));
}

private Map<String, Object> extractAfterRow(JsonNode record) {
@Override
protected Map<String, Object> extractAfterRow(JsonNode record) {
return extractRow(record.get("after"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.sink.schema.SchemaChangeManager;
import org.apache.doris.flink.sink.writer.ChangeEvent;
import org.apache.doris.flink.tools.cdc.SourceSchema;

import java.util.Map;
Expand All @@ -43,7 +42,7 @@
* comment synchronization, supports multi-column changes, and supports column name rename. Need to
* be enabled by configuring use-new-schema-change.
*/
public abstract class JsonDebeziumSchemaChange implements ChangeEvent {
public abstract class JsonDebeziumSchemaChange extends CdcSchemaChange {
protected static String addDropDDLRegex =
"ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
protected Pattern addDropDDLPattern;
Expand All @@ -69,6 +68,7 @@ protected boolean checkTable(JsonNode recordRoot) {
return sourceTableName.equals(dbTbl);
}

@Override
protected String extractDatabase(JsonNode record) {
if (record.get("source").has("schema")) {
// compatible with schema
Expand All @@ -78,6 +78,7 @@ protected String extractDatabase(JsonNode record) {
}
}

@Override
protected String extractTable(JsonNode record) {
return extractJsonNode(record.get("source"), "table");
}
Expand All @@ -102,6 +103,7 @@ protected Tuple2<String, String> getDorisTableTuple(JsonNode record) {
}

@VisibleForTesting
@Override
public String getCdcTableIdentifier(JsonNode record) {
String db = extractJsonNode(record.get("source"), "db");
String schema = extractJsonNode(record.get("source"), "schema");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ private static void syncDatabase(
String multiToOneTarget = params.get("multi-to-one-target");
boolean createTableOnly = params.has("create-table-only");
boolean ignoreDefaultValue = params.has("ignore-default-value");
boolean useNewSchemaChange = params.has("use-new-schema-change");
boolean singleSink = params.has("single-sink");

Preconditions.checkArgument(params.has("sink-conf"));
Expand All @@ -140,7 +139,6 @@ private static void syncDatabase(
.setSinkConfig(sinkConfig)
.setTableConfig(tableMap)
.setCreateTableOnly(createTableOnly)
.setNewSchemaChange(useNewSchemaChange)
.setSingleSink(singleSink)
.create();
databaseSync.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.WriteMode;
import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
import org.apache.doris.flink.table.DorisConfigOptions;
import org.slf4j.Logger;
Expand Down Expand Up @@ -74,7 +75,7 @@ public abstract class DatabaseSync {

public StreamExecutionEnvironment env;
private boolean createTableOnly = false;
private boolean newSchemaChange;
private boolean newSchemaChange = true;
protected String includingTables;
protected String excludingTables;
protected String multiToOneOrigin;
Expand Down Expand Up @@ -167,8 +168,7 @@ public void build() throws Exception {
if (singleSink) {
streamSource.sinkTo(buildDorisSink());
} else {
SingleOutputStreamOperator<Void> parsedStream =
streamSource.process(new ParsingProcessFunction(converter));
SingleOutputStreamOperator<Void> parsedStream = streamSource.process(buildProcessFunction());
for (Tuple2<String, String> dbTbl : dorisTables) {
OutputTag<String> recordOutputTag =
ParsingProcessFunction.createRecordOutputTag(dbTbl.f1);
Expand Down Expand Up @@ -209,16 +209,26 @@ public DorisSink<String> buildDorisSink() {
return buildDorisSink(null);
}

public ParsingProcessFunction buildProcessFunction() {
return new ParsingProcessFunction(converter);
}

/** create doris sink. */
public DorisSink<String> buildDorisSink(String tableIdentifier) {
String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);
String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
String jdbcUrl = sinkConfig.getString(DorisConfigOptions.JDBC_URL);

DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes(fenodes).setBenodes(benodes).setUsername(user).setPassword(passwd);
dorisBuilder
.setJdbcUrl(jdbcUrl)
.setFenodes(fenodes)
.setBenodes(benodes)
.setUsername(user)
.setPassword(passwd);
sinkConfig
.getOptional(DorisConfigOptions.AUTO_REDIRECT)
.ifPresent(dorisBuilder::setAutoRedirect);
Expand Down Expand Up @@ -293,21 +303,23 @@ public DorisSink<String> buildDorisSink(String tableIdentifier) {
DorisExecutionOptions executionOptions = executionBuilder.build();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionOptions)
.setSerializer(
JsonDebeziumSchemaSerializer.builder()
.setDorisOptions(dorisBuilder.build())
.setNewSchemaChange(newSchemaChange)
.setExecutionOptions(executionOptions)
.setTableMapping(tableMapping)
.setTableProperties(tableConfig)
.setTargetDatabase(database)
.setTargetTablePrefix(tablePrefix)
.setTargetTableSuffix(tableSuffix)
.build())
.setSerializer(buildSchemaSerializer(dorisBuilder, executionOptions))
.setDorisOptions(dorisBuilder.build());
return builder.build();
}

public DorisRecordSerializer<String> buildSchemaSerializer(
DorisOptions.Builder dorisBuilder, DorisExecutionOptions executionOptions) {
return JsonDebeziumSchemaSerializer.builder()
.setDorisOptions(dorisBuilder.build())
.setNewSchemaChange(newSchemaChange)
.setExecutionOptions(executionOptions)
.setTableMapping(tableMapping)
.setTableProperties(tableConfig)
.setTargetDatabase(database)
.build();
}

/** Filter table that need to be synchronized. */
protected boolean isSyncNeeded(String tableName) {
boolean sync = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.tools.cdc;

import org.apache.doris.flink.catalog.doris.FieldSchema;

import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.LinkedHashMap;

/**
* JdbcSourceSchema is a subclass of SourceSchema, used to build metadata about jdbc-related
* databases.
*/
public abstract class JdbcSourceSchema extends SourceSchema {

public JdbcSourceSchema(
DatabaseMetaData metaData,
String databaseName,
String schemaName,
String tableName,
String tableComment)
throws Exception {
super(databaseName, schemaName, tableName, tableComment);
fields = new LinkedHashMap<>();
try (ResultSet rs = metaData.getColumns(databaseName, schemaName, tableName, null)) {
while (rs.next()) {
String fieldName = rs.getString("COLUMN_NAME");
String comment = rs.getString("REMARKS");
String fieldType = rs.getString("TYPE_NAME");
Integer precision = rs.getInt("COLUMN_SIZE");

if (rs.wasNull()) {
precision = null;
}
Integer scale = rs.getInt("DECIMAL_DIGITS");
if (rs.wasNull()) {
scale = null;
}
String dorisTypeStr = convertToDorisType(fieldType, precision, scale);
fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr, comment));
}
}

primaryKeys = new ArrayList<>();
try (ResultSet rs = metaData.getPrimaryKeys(databaseName, schemaName, tableName)) {
while (rs.next()) {
String fieldName = rs.getString("COLUMN_NAME");
primaryKeys.add(fieldName);
}
}
}

public abstract String convertToDorisType(String fieldType, Integer precision, Integer scale);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.Map;

public class ParsingProcessFunction extends ProcessFunction<String, Void> {
private ObjectMapper objectMapper = new ObjectMapper();
protected ObjectMapper objectMapper = new ObjectMapper();
private transient Map<String, OutputTag<String>> recordOutputTags;
private DatabaseSync.TableNameConverter converter;

Expand All @@ -46,13 +46,17 @@ public void open(Configuration parameters) throws Exception {
public void processElement(
String record, ProcessFunction<String, Void>.Context context, Collector<Void> collector)
throws Exception {
JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
String tableName = extractJsonNode(recordRoot.get("source"), "table");
String tableName = getRecordTableName(record);
String dorisName = converter.convert(tableName);
context.output(getRecordOutputTag(dorisName), record);
}

private String extractJsonNode(JsonNode record, String key) {
protected String getRecordTableName(String record) throws Exception {
JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
return extractJsonNode(recordRoot.get("source"), "table");
}

protected String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null ? record.get(key).asText() : null;
}

Expand Down
Loading

0 comments on commit 06064a6

Please sign in to comment.