Skip to content

Commit

Permalink
[Fix] Fix the issue with parsing MongoDB timestamp and array types (#547
Browse files Browse the repository at this point in the history
)
  • Loading branch information
xuqinghuang authored Feb 10, 2025
1 parent fac45c2 commit 7f09d02
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ChangeStreamConstant implements Serializable {
public static final String FIELD_DOCUMENT_KEY = "documentKey";

public static final String DATE_FIELD = "$date";
public static final String TIMESTAMP_FIELD = "$timestamp";

public static final String DECIMAL_FIELD = "$numberDecimal";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,29 @@
import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.doris.flink.catalog.doris.DorisType;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.bson.BsonArray;
import org.bson.BsonTimestamp;
import org.bson.types.Decimal128;
import org.bson.types.ObjectId;

import java.math.BigDecimal;
import java.util.Date;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class MongoDBType {

public static final String DATE_TYPE = "$date";
public static final String DECIMAL_TYPE = "$numberDecimal";
public static final String LONG_TYPE = "$numberLong";
import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.DATE_FIELD;
import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.DECIMAL_FIELD;
import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.LONG_FIELD;
import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.TIMESTAMP_FIELD;

public class MongoDBType {
public static String toDorisType(Object value) {
if (value instanceof Integer) {
return DorisType.INT;
} else if (value instanceof Date) {
return DorisType.DATETIME_V2 + "(3)";
} else if (value instanceof BsonTimestamp) {
return DorisType.DATETIME_V2 + "(0)";
} else if (value instanceof Long) {
return DorisType.BIGINT;
} else if (value instanceof Double) {
Expand All @@ -60,8 +63,8 @@ public static String toDorisType(Object value) {
return DorisType.STRING;
} else if (value instanceof ObjectId) {
return DorisType.VARCHAR + "(30)";
} else if (value instanceof BsonArray) {
return DorisType.ARRAY;
} else if (value instanceof List) {
return DorisType.ARRAY + "<" + DorisType.STRING + ">";
} else if (value instanceof Decimal128) {
return checkAndRebuildBigDecimal(((Decimal128) value).bigDecimalValue());
} else {
Expand All @@ -77,19 +80,22 @@ public static String jsonNodeToDorisType(JsonNode value) {
} else if (value instanceof LongNode) {
return DorisType.BIGINT;
} else if (value instanceof DoubleNode) {
// When mongo double is in the JsonNode, it's actually a decimal type
return DorisType.DOUBLE;
} else if (value instanceof BooleanNode) {
return DorisType.BOOLEAN;
} else if (value instanceof ArrayNode) {
return DorisType.ARRAY;
return DorisType.ARRAY + "<" + DorisType.STRING + ">";
} else if (value instanceof DecimalNode) {
return checkAndRebuildBigDecimal(value.decimalValue());
} else if (value instanceof ObjectNode) {
if (value.size() == 1 && value.get(DATE_TYPE) != null) {
if (value.size() == 1 && value.get(DATE_FIELD) != null) {
return DorisType.DATETIME_V2 + "(3)";
} else if (value.size() == 1 && value.get(DECIMAL_TYPE) != null) {
return checkAndRebuildBigDecimal(new BigDecimal(value.get(DECIMAL_TYPE).asText()));
} else if (value.size() == 1 && value.get(LONG_TYPE) != null) {
} else if (value.size() == 1 && value.get(TIMESTAMP_FIELD) != null) {
return DorisType.DATETIME_V2 + "(0)";
} else if (value.size() == 1 && value.get(DECIMAL_FIELD) != null) {
return checkAndRebuildBigDecimal(new BigDecimal(value.get(DECIMAL_FIELD).asText()));
} else if (value.size() == 1 && value.get(LONG_FIELD) != null) {
return DorisType.BIGINT;
} else {
return DorisType.STRING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_NAMESPACE;
import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_TABLE;
import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.LONG_FIELD;
import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.TIMESTAMP_FIELD;

public class MongoJsonDebeziumSchemaChange extends CdcSchemaChange {

Expand All @@ -67,7 +68,7 @@ public class MongoJsonDebeziumSchemaChange extends CdcSchemaChange {
private final DorisOptions dorisOptions;

private final Set<String> specialFields =
new HashSet<>(Arrays.asList(DATE_FIELD, DECIMAL_FIELD, LONG_FIELD));
new HashSet<>(Arrays.asList(DATE_FIELD, TIMESTAMP_FIELD, DECIMAL_FIELD, LONG_FIELD));

public MongoJsonDebeziumSchemaChange(JsonDebeziumChangeContext changeContext) {
this.objectMapper = changeContext.getObjectMapper();
Expand Down Expand Up @@ -127,7 +128,12 @@ private void formatSpecialFieldData(JsonNode logData) {
if (specialFields.contains(fieldKey)) {
switch (fieldKey) {
case DATE_FIELD:
long timestamp = fieldNode.get(DATE_FIELD).asLong();
case TIMESTAMP_FIELD:
JsonNode jsonNode = fieldNode.get(fieldKey);
long timestamp =
fieldKey.equals(TIMESTAMP_FIELD)
? jsonNode.get("t").asLong() * 1000L
: jsonNode.asLong();
String formattedDate =
MongoDateConverter.convertTimestampToString(
timestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
package org.apache.doris.flink.tools.cdc.mongodb;

import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.types.Decimal128;
import org.bson.types.ObjectId;
import org.junit.Test;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;

import static org.junit.Assert.assertEquals;
Expand All @@ -40,6 +44,35 @@ public void getCdcTableName() throws Exception {
assertEquals("db_TEST\\.test_table", mongoDBSchema.getCdcTableName());
}

@Test
public void testMongoSampleDataFields() throws Exception {
ArrayList<Document> sampleData = new ArrayList<>();
sampleData.add(new Document("_id", new ObjectId("678643e649a4c9239b04297b")));
sampleData.add(new Document("c_string", "Hello, MongoDB!"));
sampleData.add(new Document("c_bool", true));
sampleData.add(new Document("c_int", 123456));
sampleData.add(new Document("c_long", 1234567890123456789L));
sampleData.add(new Document("c_double", 123.45));
sampleData.add(new Document("c_decimal", new Decimal128(BigDecimal.valueOf(12345.67))));
sampleData.add(new Document("c_date", new Date(1234567890)));
sampleData.add(new Document("c_timestamp", new BsonTimestamp(1334567890)));
Map<String, String> map = new LinkedHashMap<>();
map.put("key1", "value1");
map.put("key2", "value2");
sampleData.add(new Document("c_object", new Document(map)));
ArrayList<Object> array = new ArrayList<>();
array.add("str1");
array.add("str2");
array.add(789);
sampleData.add(new Document("c_array", array));

MongoDBSchema mongoDBSchema = new MongoDBSchema(sampleData, "db_TEST", "test_table", "");

assertEquals(
"{_id=FieldSchema{name='_id', typeString='VARCHAR(30)', defaultValue='null', comment='null'}, c_string=FieldSchema{name='c_string', typeString='STRING', defaultValue='null', comment='null'}, c_bool=FieldSchema{name='c_bool', typeString='BOOLEAN', defaultValue='null', comment='null'}, c_int=FieldSchema{name='c_int', typeString='INT', defaultValue='null', comment='null'}, c_long=FieldSchema{name='c_long', typeString='BIGINT', defaultValue='null', comment='null'}, c_double=FieldSchema{name='c_double', typeString='DECIMALV3(5,2)', defaultValue='null', comment='null'}, c_decimal=FieldSchema{name='c_decimal', typeString='DECIMALV3(7,2)', defaultValue='null', comment='null'}, c_date=FieldSchema{name='c_date', typeString='DATETIMEV2(3)', defaultValue='null', comment='null'}, c_timestamp=FieldSchema{name='c_timestamp', typeString='DATETIMEV2(0)', defaultValue='null', comment='null'}, c_object=FieldSchema{name='c_object', typeString='STRING', defaultValue='null', comment='null'}, c_array=FieldSchema{name='c_array', typeString='ARRAY<STRING>', defaultValue='null', comment='null'}}",
mongoDBSchema.getFields().toString());
}

@Test
public void replaceDecimalTypeIfNeededTest1() throws Exception {
ArrayList<Document> documents = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.doris.flink.catalog.doris.DorisType;
import org.bson.BsonArray;
import org.bson.BsonInt64;
import org.bson.BsonString;
import org.bson.BsonTimestamp;
import org.bson.types.Decimal128;
import org.bson.types.ObjectId;
import org.junit.Test;
Expand All @@ -46,17 +49,21 @@ public class MongoDBTypeTest {
public void toDorisType() {
assertEquals(DorisType.INT, MongoDBType.toDorisType(new Integer(123)));
assertEquals(DorisType.DATETIME_V2 + "(3)", MongoDBType.toDorisType(new Date()));
assertEquals(DorisType.DATETIME_V2 + "(0)", MongoDBType.toDorisType(new BsonTimestamp()));
assertEquals(DorisType.BIGINT, MongoDBType.toDorisType(new Long(1234567891)));
assertEquals("DECIMALV3(6,2)", MongoDBType.toDorisType(new Double("1234.56")));
assertEquals(DorisType.BOOLEAN, MongoDBType.toDorisType(new Boolean(true)));
assertEquals(DorisType.STRING, MongoDBType.toDorisType("string"));
assertEquals(
DorisType.VARCHAR + "(30)",
MongoDBType.toDorisType(new ObjectId("66583533791a67a6f8c5a339")));
assertEquals(DorisType.ARRAY, MongoDBType.toDorisType(new BsonArray()));
assertEquals(
"DECIMALV3(10,5)",
MongoDBType.toDorisType(new Decimal128(new BigDecimal("12345.55555"))));
BsonArray bsonArray = new BsonArray();
bsonArray.add(new BsonString("string"));
bsonArray.add(new BsonInt64(123456789));
assertEquals(DorisType.ARRAY + "<STRING>", MongoDBType.toDorisType(bsonArray));
}

@Test
Expand All @@ -67,22 +74,27 @@ public void jsonNodeToDorisType() {
assertEquals(DorisType.DOUBLE, MongoDBType.jsonNodeToDorisType(new DoubleNode(1234.23)));
assertEquals(DorisType.BOOLEAN, MongoDBType.jsonNodeToDorisType(BooleanNode.TRUE));
assertEquals(
DorisType.ARRAY,
DorisType.ARRAY + "<STRING>",
MongoDBType.jsonNodeToDorisType(JsonNodeFactory.instance.arrayNode()));
assertEquals(
"DECIMALV3(6,2)",
MongoDBType.jsonNodeToDorisType(new DecimalNode(new BigDecimal("1234.23"))));

ObjectNode dateJsonNodes = JsonNodeFactory.instance.objectNode();
dateJsonNodes.put(MongoDBType.DATE_TYPE, "");
dateJsonNodes.put(ChangeStreamConstant.DATE_FIELD, "");
assertEquals(DorisType.DATETIME_V2 + "(3)", MongoDBType.jsonNodeToDorisType(dateJsonNodes));

ObjectNode timestampJsonNodes = JsonNodeFactory.instance.objectNode();
timestampJsonNodes.put(ChangeStreamConstant.TIMESTAMP_FIELD, "");
assertEquals(
DorisType.DATETIME_V2 + "(0)", MongoDBType.jsonNodeToDorisType(timestampJsonNodes));

ObjectNode decimalJsonNodes = JsonNodeFactory.instance.objectNode();
decimalJsonNodes.put(MongoDBType.DECIMAL_TYPE, "1234.23");
decimalJsonNodes.put(ChangeStreamConstant.DECIMAL_FIELD, "1234.23");
assertEquals("DECIMALV3(6,2)", MongoDBType.jsonNodeToDorisType(decimalJsonNodes));

ObjectNode longJsonNodes = JsonNodeFactory.instance.objectNode();
longJsonNodes.put(MongoDBType.LONG_TYPE, "1234234466");
longJsonNodes.put(ChangeStreamConstant.LONG_FIELD, "1234234466");
assertEquals(DorisType.BIGINT, MongoDBType.jsonNodeToDorisType(longJsonNodes));
}

Expand Down

0 comments on commit 7f09d02

Please sign in to comment.