diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/RowDataWithMeta.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/RowDataWithMeta.java new file mode 100644 index 000000000..e83df78fa --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/RowDataWithMeta.java @@ -0,0 +1,198 @@ +package org.apache.doris.flink.sink.batch; + +import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; +import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; +import static org.apache.doris.flink.sink.writer.LoadConstants.JSON; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.Arrays; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.util.Preconditions; + +public class RowDataWithMeta { + private String database; + private String table; + private String[] fieldNames; + private DataType[] dataTypes; + private RowData rowData; + private String type; + private ArrowSerializer arrowSerializer; + private ByteArrayOutputStream outputStream; + + public RowDataWithMeta() { + } + + private RowDataWithMeta( + String type, + String database, + String table, + String[] fieldNames, + DataType[] dataTypes, + RowData rowData) { + this.type = type; + this.database = database; + this.table = table; + this.fieldNames = fieldNames; + this.dataTypes = dataTypes; + this.rowData = rowData; + if (ARROW.equals(type)) { + LogicalType[] logicalTypes = TypeConversions.fromDataToLogicalType(dataTypes); + RowType rowType = RowType.of(logicalTypes, fieldNames); + arrowSerializer = new ArrowSerializer(rowType, rowType); + outputStream = new ByteArrayOutputStream(); + try { + arrowSerializer.open(new ByteArrayInputStream(new byte[0]), outputStream); + } catch (Exception e) { + throw new RuntimeException("failed to open arrow serializer:", e); + } + } + } + + public static RowDataWithMeta.Builder builder() { + return new RowDataWithMeta.Builder(); + } + + public static class Builder { + private String database; + private String table; + private String[] fieldNames; + private DataType[] dataTypes; + private RowData rowData; + private String type; + + public RowDataWithMeta.Builder setDatabase(String database) { + this.database = database; + return this; + } + + public RowDataWithMeta.Builder setTable(String table) { + this.table = table; + return this; + } + + public RowDataWithMeta.Builder setFieldNames(String[] fieldNames) { + this.fieldNames = fieldNames; + return this; + } + + public RowDataWithMeta.Builder setDataTypes(DataType[] dataTypes) { + this.dataTypes = dataTypes; + return this; + } + + public RowDataWithMeta.Builder setRowData(RowData rowData) { + this.rowData = rowData; + return this; + } + + public RowDataWithMeta.Builder setType(String type) { + this.type = type; + return this; + } + + public RowDataWithMeta build() { + Preconditions.checkState(CSV.equals(type) || JSON.equals(type) || ARROW.equals(type)); + Preconditions.checkNotNull(dataTypes); + Preconditions.checkNotNull(fieldNames); + return new RowDataWithMeta(type, database, table, fieldNames, dataTypes, rowData); + } + } + + public String getDatabase() { + return database; + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } + + public RowData getRowData() { + return rowData; + } + + public void setRowData(RowData rowData) { + this.rowData = rowData; + } + + public String getTableIdentifier() { + return this.database + "." + this.table; + } + + public String[] getFieldNames() { + return fieldNames; + } + + public void setFieldNames(String[] fieldNames) { + Preconditions.checkNotNull(fieldNames); + this.fieldNames = fieldNames; + } + + public DataType[] getDataTypes() { + return dataTypes; + } + + public void setDataTypes(DataType[] dataTypes) { + Preconditions.checkNotNull(dataTypes); + this.dataTypes = dataTypes; + } + + public ArrowSerializer getArrowSerializer() { + return arrowSerializer; + } + + public ByteArrayOutputStream getOutputStream() { + return outputStream; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public void setArrowSerializer( + ArrowSerializer arrowSerializer) { + this.arrowSerializer = arrowSerializer; + } + + public void setOutputStream(ByteArrayOutputStream outputStream) { + this.outputStream = outputStream; + } + + @Override + public String toString() { + return "RowDataWithMeta{" + + "database='" + + database + + '\'' + + ", table='" + + table + + '\'' + + ", fieldNames=" + + Arrays.toString(fieldNames) + + ", dataTypes=" + + Arrays.toString(dataTypes) + + ", rowData=" + + rowData + + ", type='" + + type + + '\'' + + '}'; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataWithMetaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataWithMetaSerializer.java new file mode 100644 index 000000000..6512fd279 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataWithMetaSerializer.java @@ -0,0 +1,191 @@ +package org.apache.doris.flink.sink.writer.serializer; + +import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; +import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; +import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN; +import static org.apache.doris.flink.sink.writer.LoadConstants.JSON; +import static org.apache.doris.flink.sink.writer.LoadConstants.NULL_VALUE; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.StringJoiner; +import org.apache.doris.flink.deserialization.converter.DorisRowConverter; +import org.apache.doris.flink.sink.EscapeHandler; +import org.apache.doris.flink.sink.batch.RowDataWithMeta; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RowDataWithMetaSerializer implements DorisRecordSerializer { + + private static final Logger LOG = LoggerFactory.getLogger(RowDataWithMetaSerializer.class); + String type; + private ObjectMapper objectMapper; + private final String fieldDelimiter; + private final boolean enableDelete; + private final int arrowBatchCnt = 1000; + private int arrowWriteCnt = 0; + private ArrowSerializer arrowSerializer; + ByteArrayOutputStream outputStream; + + private RowDataWithMetaSerializer(String type, String fieldDelimiter, boolean enableDelete) { + this.type = type; + this.fieldDelimiter = fieldDelimiter; + this.enableDelete = enableDelete; + if (JSON.equals(type)) { + objectMapper = new ObjectMapper(); + } + } + + @Override + public DorisRecord serialize(RowDataWithMeta meta) throws IOException { + RowData record = meta.getRowData(); + String[] fieldNames = meta.getFieldNames(); + DataType[] dataTypes = meta.getDataTypes(); + DorisRowConverter rowConverter = new DorisRowConverter().setExternalConverter(dataTypes); + int maxIndex = Math.min(record.getArity(), fieldNames.length); + String valString; + if (JSON.equals(type)) { + valString = buildJsonString(record, maxIndex, rowConverter, fieldNames); + } else if (CSV.equals(type)) { + valString = buildCSVString(record, maxIndex, rowConverter); + } else if (ARROW.equals(type)) { + arrowSerializer = meta.getArrowSerializer(); + outputStream = meta.getOutputStream(); + if (arrowSerializer == null || outputStream == null) { + throw new IllegalArgumentException("rowDataWithMeta arrowSerializer is null!"); + } + arrowWriteCnt += 1; + arrowSerializer.write(record); + if (arrowWriteCnt < arrowBatchCnt) { + return DorisRecord.empty; + } + return arrowToDorisRecord(); + } else { + throw new IllegalArgumentException("The type " + type + " is not supported!"); + } + return DorisRecord.of( + meta.getDatabase(), meta.getTable(), valString.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public DorisRecord flush() { + if (JSON.equals(type) || CSV.equals(type)) { + return DorisRecord.empty; + } else if (ARROW.equals(type)) { + return arrowToDorisRecord(); + } else { + throw new IllegalArgumentException("The type " + type + " is not supported!"); + } + } + + @Override + public void close() throws Exception { + if (ARROW.equals(type) && arrowSerializer != null) { + arrowSerializer.close(); + } + } + + public DorisRecord arrowToDorisRecord() { + if (arrowWriteCnt == 0) { + return DorisRecord.empty; + } + arrowWriteCnt = 0; + try { + arrowSerializer.finishCurrentBatch(); + byte[] bytes = outputStream.toByteArray(); + outputStream.reset(); + arrowSerializer.resetWriter(); + return DorisRecord.of(bytes); + } catch (Exception e) { + LOG.error("Failed to convert arrow batch:", e); + } + return DorisRecord.empty; + } + + public String buildJsonString( + RowData record, int maxIndex, DorisRowConverter rowConverter, String[] fieldNames) + throws IOException { + int fieldIndex = 0; + Map valueMap = new HashMap<>(); + while (fieldIndex < maxIndex) { + Object field = rowConverter.convertExternal(record, fieldIndex); + String value = field != null ? field.toString() : null; + valueMap.put(fieldNames[fieldIndex], value); + fieldIndex++; + } + if (enableDelete) { + valueMap.put(DORIS_DELETE_SIGN, parseDeleteSign(record.getRowKind())); + } + return objectMapper.writeValueAsString(valueMap); + } + + public String buildCSVString(RowData record, int maxIndex, DorisRowConverter rowConverter) + throws IOException { + int fieldIndex = 0; + StringJoiner joiner = new StringJoiner(fieldDelimiter); + while (fieldIndex < maxIndex) { + Object field = rowConverter.convertExternal(record, fieldIndex); + String value = field != null ? field.toString() : NULL_VALUE; + joiner.add(value); + fieldIndex++; + } + if (enableDelete) { + joiner.add(parseDeleteSign(record.getRowKind())); + } + return joiner.toString(); + } + + public String parseDeleteSign(RowKind rowKind) { + if (RowKind.INSERT.equals(rowKind) || RowKind.UPDATE_AFTER.equals(rowKind)) { + return "0"; + } else if (RowKind.DELETE.equals(rowKind) || RowKind.UPDATE_BEFORE.equals(rowKind)) { + return "1"; + } else { + throw new IllegalArgumentException("Unrecognized row kind:" + rowKind.toString()); + } + } + + public static RowDataWithMetaSerializer.Builder builder() { + return new RowDataWithMetaSerializer.Builder(); + } + + /** Builder for RowDataWithMetaSerializer. */ + public static class Builder { + private String type; + private String fieldDelimiter; + private boolean deletable; + + public RowDataWithMetaSerializer.Builder setType(String type) { + this.type = type; + return this; + } + + public RowDataWithMetaSerializer.Builder setFieldDelimiter(String fieldDelimiter) { + this.fieldDelimiter = EscapeHandler.escapeString(fieldDelimiter); + return this; + } + + public RowDataWithMetaSerializer.Builder enableDelete(boolean deletable) { + this.deletable = deletable; + return this; + } + + public RowDataWithMetaSerializer build() { + Preconditions.checkState( + CSV.equals(type) && fieldDelimiter != null || JSON.equals(type) || ARROW.equals(type)); + if (ARROW.equals(type)) { + Preconditions.checkArgument(!deletable); + } + return new RowDataWithMetaSerializer(type, fieldDelimiter, deletable); + } + } +}