From 7f09d02831e3c0f595ddb7742aa481e8060a06b5 Mon Sep 17 00:00:00 2001 From: Qinghuang Xu <781240052@qq.com> Date: Mon, 10 Feb 2025 10:47:59 +0800 Subject: [PATCH] [Fix] Fix the issue with parsing MongoDB timestamp and array types (#547) --- .../cdc/mongodb/ChangeStreamConstant.java | 1 + .../flink/tools/cdc/mongodb/MongoDBType.java | 32 ++++++++++-------- .../MongoJsonDebeziumSchemaChange.java | 10 ++++-- .../tools/cdc/mongodb/MongoDBSchemaTest.java | 33 +++++++++++++++++++ .../tools/cdc/mongodb/MongoDBTypeTest.java | 22 ++++++++++--- 5 files changed, 78 insertions(+), 20 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/ChangeStreamConstant.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/ChangeStreamConstant.java index f8772c9f7..4e0d8e532 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/ChangeStreamConstant.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/ChangeStreamConstant.java @@ -35,6 +35,7 @@ public class ChangeStreamConstant implements Serializable { public static final String FIELD_DOCUMENT_KEY = "documentKey"; public static final String DATE_FIELD = "$date"; + public static final String TIMESTAMP_FIELD = "$timestamp"; public static final String DECIMAL_FIELD = "$numberDecimal"; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java index 578a407cd..cb4896ab2 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java @@ -30,26 +30,29 @@ import com.fasterxml.jackson.databind.node.TextNode; import org.apache.doris.flink.catalog.doris.DorisType; import org.apache.doris.flink.exception.DorisRuntimeException; -import org.bson.BsonArray; +import org.bson.BsonTimestamp; import org.bson.types.Decimal128; import org.bson.types.ObjectId; import java.math.BigDecimal; import java.util.Date; +import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; -public class MongoDBType { - - public static final String DATE_TYPE = "$date"; - public static final String DECIMAL_TYPE = "$numberDecimal"; - public static final String LONG_TYPE = "$numberLong"; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.DATE_FIELD; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.DECIMAL_FIELD; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.LONG_FIELD; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.TIMESTAMP_FIELD; +public class MongoDBType { public static String toDorisType(Object value) { if (value instanceof Integer) { return DorisType.INT; } else if (value instanceof Date) { return DorisType.DATETIME_V2 + "(3)"; + } else if (value instanceof BsonTimestamp) { + return DorisType.DATETIME_V2 + "(0)"; } else if (value instanceof Long) { return DorisType.BIGINT; } else if (value instanceof Double) { @@ -60,8 +63,8 @@ public static String toDorisType(Object value) { return DorisType.STRING; } else if (value instanceof ObjectId) { return DorisType.VARCHAR + "(30)"; - } else if (value instanceof BsonArray) { - return DorisType.ARRAY; + } else if (value instanceof List) { + return DorisType.ARRAY + "<" + DorisType.STRING + ">"; } else if (value instanceof Decimal128) { return checkAndRebuildBigDecimal(((Decimal128) value).bigDecimalValue()); } else { @@ -77,19 +80,22 @@ public static String jsonNodeToDorisType(JsonNode value) { } else if (value instanceof LongNode) { return DorisType.BIGINT; } else if (value instanceof DoubleNode) { + // When mongo double is in the JsonNode, it's actually a decimal type return DorisType.DOUBLE; } else if (value instanceof BooleanNode) { return DorisType.BOOLEAN; } else if (value instanceof ArrayNode) { - return DorisType.ARRAY; + return DorisType.ARRAY + "<" + DorisType.STRING + ">"; } else if (value instanceof DecimalNode) { return checkAndRebuildBigDecimal(value.decimalValue()); } else if (value instanceof ObjectNode) { - if (value.size() == 1 && value.get(DATE_TYPE) != null) { + if (value.size() == 1 && value.get(DATE_FIELD) != null) { return DorisType.DATETIME_V2 + "(3)"; - } else if (value.size() == 1 && value.get(DECIMAL_TYPE) != null) { - return checkAndRebuildBigDecimal(new BigDecimal(value.get(DECIMAL_TYPE).asText())); - } else if (value.size() == 1 && value.get(LONG_TYPE) != null) { + } else if (value.size() == 1 && value.get(TIMESTAMP_FIELD) != null) { + return DorisType.DATETIME_V2 + "(0)"; + } else if (value.size() == 1 && value.get(DECIMAL_FIELD) != null) { + return checkAndRebuildBigDecimal(new BigDecimal(value.get(DECIMAL_FIELD).asText())); + } else if (value.size() == 1 && value.get(LONG_FIELD) != null) { return DorisType.BIGINT; } else { return DorisType.STRING; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java index c3a4a7d8d..c67e856e9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java @@ -50,6 +50,7 @@ import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_NAMESPACE; import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_TABLE; import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.LONG_FIELD; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.TIMESTAMP_FIELD; public class MongoJsonDebeziumSchemaChange extends CdcSchemaChange { @@ -67,7 +68,7 @@ public class MongoJsonDebeziumSchemaChange extends CdcSchemaChange { private final DorisOptions dorisOptions; private final Set specialFields = - new HashSet<>(Arrays.asList(DATE_FIELD, DECIMAL_FIELD, LONG_FIELD)); + new HashSet<>(Arrays.asList(DATE_FIELD, TIMESTAMP_FIELD, DECIMAL_FIELD, LONG_FIELD)); public MongoJsonDebeziumSchemaChange(JsonDebeziumChangeContext changeContext) { this.objectMapper = changeContext.getObjectMapper(); @@ -127,7 +128,12 @@ private void formatSpecialFieldData(JsonNode logData) { if (specialFields.contains(fieldKey)) { switch (fieldKey) { case DATE_FIELD: - long timestamp = fieldNode.get(DATE_FIELD).asLong(); + case TIMESTAMP_FIELD: + JsonNode jsonNode = fieldNode.get(fieldKey); + long timestamp = + fieldKey.equals(TIMESTAMP_FIELD) + ? jsonNode.get("t").asLong() * 1000L + : jsonNode.asLong(); String formattedDate = MongoDateConverter.convertTimestampToString( timestamp); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java index a9cc8bbc3..2f0956084 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java @@ -18,12 +18,16 @@ package org.apache.doris.flink.tools.cdc.mongodb; import org.apache.doris.flink.catalog.doris.FieldSchema; +import org.bson.BsonTimestamp; import org.bson.Document; import org.bson.types.Decimal128; +import org.bson.types.ObjectId; import org.junit.Test; import java.math.BigDecimal; import java.util.ArrayList; +import java.util.Date; +import java.util.LinkedHashMap; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -40,6 +44,35 @@ public void getCdcTableName() throws Exception { assertEquals("db_TEST\\.test_table", mongoDBSchema.getCdcTableName()); } + @Test + public void testMongoSampleDataFields() throws Exception { + ArrayList sampleData = new ArrayList<>(); + sampleData.add(new Document("_id", new ObjectId("678643e649a4c9239b04297b"))); + sampleData.add(new Document("c_string", "Hello, MongoDB!")); + sampleData.add(new Document("c_bool", true)); + sampleData.add(new Document("c_int", 123456)); + sampleData.add(new Document("c_long", 1234567890123456789L)); + sampleData.add(new Document("c_double", 123.45)); + sampleData.add(new Document("c_decimal", new Decimal128(BigDecimal.valueOf(12345.67)))); + sampleData.add(new Document("c_date", new Date(1234567890))); + sampleData.add(new Document("c_timestamp", new BsonTimestamp(1334567890))); + Map map = new LinkedHashMap<>(); + map.put("key1", "value1"); + map.put("key2", "value2"); + sampleData.add(new Document("c_object", new Document(map))); + ArrayList array = new ArrayList<>(); + array.add("str1"); + array.add("str2"); + array.add(789); + sampleData.add(new Document("c_array", array)); + + MongoDBSchema mongoDBSchema = new MongoDBSchema(sampleData, "db_TEST", "test_table", ""); + + assertEquals( + "{_id=FieldSchema{name='_id', typeString='VARCHAR(30)', defaultValue='null', comment='null'}, c_string=FieldSchema{name='c_string', typeString='STRING', defaultValue='null', comment='null'}, c_bool=FieldSchema{name='c_bool', typeString='BOOLEAN', defaultValue='null', comment='null'}, c_int=FieldSchema{name='c_int', typeString='INT', defaultValue='null', comment='null'}, c_long=FieldSchema{name='c_long', typeString='BIGINT', defaultValue='null', comment='null'}, c_double=FieldSchema{name='c_double', typeString='DECIMALV3(5,2)', defaultValue='null', comment='null'}, c_decimal=FieldSchema{name='c_decimal', typeString='DECIMALV3(7,2)', defaultValue='null', comment='null'}, c_date=FieldSchema{name='c_date', typeString='DATETIMEV2(3)', defaultValue='null', comment='null'}, c_timestamp=FieldSchema{name='c_timestamp', typeString='DATETIMEV2(0)', defaultValue='null', comment='null'}, c_object=FieldSchema{name='c_object', typeString='STRING', defaultValue='null', comment='null'}, c_array=FieldSchema{name='c_array', typeString='ARRAY', defaultValue='null', comment='null'}}", + mongoDBSchema.getFields().toString()); + } + @Test public void replaceDecimalTypeIfNeededTest1() throws Exception { ArrayList documents = new ArrayList<>(); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBTypeTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBTypeTest.java index ee511ce24..4e273ab85 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBTypeTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBTypeTest.java @@ -29,6 +29,9 @@ import com.fasterxml.jackson.databind.node.TextNode; import org.apache.doris.flink.catalog.doris.DorisType; import org.bson.BsonArray; +import org.bson.BsonInt64; +import org.bson.BsonString; +import org.bson.BsonTimestamp; import org.bson.types.Decimal128; import org.bson.types.ObjectId; import org.junit.Test; @@ -46,6 +49,7 @@ public class MongoDBTypeTest { public void toDorisType() { assertEquals(DorisType.INT, MongoDBType.toDorisType(new Integer(123))); assertEquals(DorisType.DATETIME_V2 + "(3)", MongoDBType.toDorisType(new Date())); + assertEquals(DorisType.DATETIME_V2 + "(0)", MongoDBType.toDorisType(new BsonTimestamp())); assertEquals(DorisType.BIGINT, MongoDBType.toDorisType(new Long(1234567891))); assertEquals("DECIMALV3(6,2)", MongoDBType.toDorisType(new Double("1234.56"))); assertEquals(DorisType.BOOLEAN, MongoDBType.toDorisType(new Boolean(true))); @@ -53,10 +57,13 @@ public void toDorisType() { assertEquals( DorisType.VARCHAR + "(30)", MongoDBType.toDorisType(new ObjectId("66583533791a67a6f8c5a339"))); - assertEquals(DorisType.ARRAY, MongoDBType.toDorisType(new BsonArray())); assertEquals( "DECIMALV3(10,5)", MongoDBType.toDorisType(new Decimal128(new BigDecimal("12345.55555")))); + BsonArray bsonArray = new BsonArray(); + bsonArray.add(new BsonString("string")); + bsonArray.add(new BsonInt64(123456789)); + assertEquals(DorisType.ARRAY + "", MongoDBType.toDorisType(bsonArray)); } @Test @@ -67,22 +74,27 @@ public void jsonNodeToDorisType() { assertEquals(DorisType.DOUBLE, MongoDBType.jsonNodeToDorisType(new DoubleNode(1234.23))); assertEquals(DorisType.BOOLEAN, MongoDBType.jsonNodeToDorisType(BooleanNode.TRUE)); assertEquals( - DorisType.ARRAY, + DorisType.ARRAY + "", MongoDBType.jsonNodeToDorisType(JsonNodeFactory.instance.arrayNode())); assertEquals( "DECIMALV3(6,2)", MongoDBType.jsonNodeToDorisType(new DecimalNode(new BigDecimal("1234.23")))); ObjectNode dateJsonNodes = JsonNodeFactory.instance.objectNode(); - dateJsonNodes.put(MongoDBType.DATE_TYPE, ""); + dateJsonNodes.put(ChangeStreamConstant.DATE_FIELD, ""); assertEquals(DorisType.DATETIME_V2 + "(3)", MongoDBType.jsonNodeToDorisType(dateJsonNodes)); + ObjectNode timestampJsonNodes = JsonNodeFactory.instance.objectNode(); + timestampJsonNodes.put(ChangeStreamConstant.TIMESTAMP_FIELD, ""); + assertEquals( + DorisType.DATETIME_V2 + "(0)", MongoDBType.jsonNodeToDorisType(timestampJsonNodes)); + ObjectNode decimalJsonNodes = JsonNodeFactory.instance.objectNode(); - decimalJsonNodes.put(MongoDBType.DECIMAL_TYPE, "1234.23"); + decimalJsonNodes.put(ChangeStreamConstant.DECIMAL_FIELD, "1234.23"); assertEquals("DECIMALV3(6,2)", MongoDBType.jsonNodeToDorisType(decimalJsonNodes)); ObjectNode longJsonNodes = JsonNodeFactory.instance.objectNode(); - longJsonNodes.put(MongoDBType.LONG_TYPE, "1234234466"); + longJsonNodes.put(ChangeStreamConstant.LONG_FIELD, "1234234466"); assertEquals(DorisType.BIGINT, MongoDBType.jsonNodeToDorisType(longJsonNodes)); }