Skip to content

Commit

Permalink
add rowDataWithMeta and serializer
Browse files Browse the repository at this point in the history
  • Loading branch information
PeatBoy committed May 15, 2024
1 parent e98ea69 commit 9cc44a7
Show file tree
Hide file tree
Showing 2 changed files with 389 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package org.apache.doris.flink.sink.batch;

import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.Arrays;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.util.Preconditions;

public class RowDataWithMeta {
private String database;
private String table;
private String[] fieldNames;
private DataType[] dataTypes;
private RowData rowData;
private String type;
private ArrowSerializer arrowSerializer;
private ByteArrayOutputStream outputStream;

public RowDataWithMeta() {
}

private RowDataWithMeta(
String type,
String database,
String table,
String[] fieldNames,
DataType[] dataTypes,
RowData rowData) {
this.type = type;
this.database = database;
this.table = table;
this.fieldNames = fieldNames;
this.dataTypes = dataTypes;
this.rowData = rowData;
if (ARROW.equals(type)) {
LogicalType[] logicalTypes = TypeConversions.fromDataToLogicalType(dataTypes);
RowType rowType = RowType.of(logicalTypes, fieldNames);
arrowSerializer = new ArrowSerializer(rowType, rowType);
outputStream = new ByteArrayOutputStream();
try {
arrowSerializer.open(new ByteArrayInputStream(new byte[0]), outputStream);
} catch (Exception e) {
throw new RuntimeException("failed to open arrow serializer:", e);
}
}
}

public static RowDataWithMeta.Builder builder() {
return new RowDataWithMeta.Builder();
}

public static class Builder {
private String database;
private String table;
private String[] fieldNames;
private DataType[] dataTypes;
private RowData rowData;
private String type;

public RowDataWithMeta.Builder setDatabase(String database) {
this.database = database;
return this;
}

public RowDataWithMeta.Builder setTable(String table) {
this.table = table;
return this;
}

public RowDataWithMeta.Builder setFieldNames(String[] fieldNames) {
this.fieldNames = fieldNames;
return this;
}

public RowDataWithMeta.Builder setDataTypes(DataType[] dataTypes) {
this.dataTypes = dataTypes;
return this;
}

public RowDataWithMeta.Builder setRowData(RowData rowData) {
this.rowData = rowData;
return this;
}

public RowDataWithMeta.Builder setType(String type) {
this.type = type;
return this;
}

public RowDataWithMeta build() {
Preconditions.checkState(CSV.equals(type) || JSON.equals(type) || ARROW.equals(type));
Preconditions.checkNotNull(dataTypes);
Preconditions.checkNotNull(fieldNames);
return new RowDataWithMeta(type, database, table, fieldNames, dataTypes, rowData);
}
}

public String getDatabase() {
return database;
}

public String getTable() {
return table;
}

public void setTable(String table) {
this.table = table;
}

public RowData getRowData() {
return rowData;
}

public void setRowData(RowData rowData) {
this.rowData = rowData;
}

public String getTableIdentifier() {
return this.database + "." + this.table;
}

public String[] getFieldNames() {
return fieldNames;
}

public void setFieldNames(String[] fieldNames) {
Preconditions.checkNotNull(fieldNames);
this.fieldNames = fieldNames;
}

public DataType[] getDataTypes() {
return dataTypes;
}

public void setDataTypes(DataType[] dataTypes) {
Preconditions.checkNotNull(dataTypes);
this.dataTypes = dataTypes;
}

public ArrowSerializer getArrowSerializer() {
return arrowSerializer;
}

public ByteArrayOutputStream getOutputStream() {
return outputStream;
}

public void setDatabase(String database) {
this.database = database;
}

public String getType() {
return type;
}

public void setType(String type) {
this.type = type;
}

public void setArrowSerializer(
ArrowSerializer arrowSerializer) {
this.arrowSerializer = arrowSerializer;
}

public void setOutputStream(ByteArrayOutputStream outputStream) {
this.outputStream = outputStream;
}

@Override
public String toString() {
return "RowDataWithMeta{"
+ "database='"
+ database
+ '\''
+ ", table='"
+ table
+ '\''
+ ", fieldNames="
+ Arrays.toString(fieldNames)
+ ", dataTypes="
+ Arrays.toString(dataTypes)
+ ", rowData="
+ rowData
+ ", type='"
+ type
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package org.apache.doris.flink.sink.writer.serializer;

import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN;
import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
import static org.apache.doris.flink.sink.writer.LoadConstants.NULL_VALUE;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.StringJoiner;
import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
import org.apache.doris.flink.sink.EscapeHandler;
import org.apache.doris.flink.sink.batch.RowDataWithMeta;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RowDataWithMetaSerializer implements DorisRecordSerializer<RowDataWithMeta> {

private static final Logger LOG = LoggerFactory.getLogger(RowDataWithMetaSerializer.class);
String type;
private ObjectMapper objectMapper;
private final String fieldDelimiter;
private final boolean enableDelete;
private final int arrowBatchCnt = 1000;
private int arrowWriteCnt = 0;
private ArrowSerializer arrowSerializer;
ByteArrayOutputStream outputStream;

private RowDataWithMetaSerializer(String type, String fieldDelimiter, boolean enableDelete) {
this.type = type;
this.fieldDelimiter = fieldDelimiter;
this.enableDelete = enableDelete;
if (JSON.equals(type)) {
objectMapper = new ObjectMapper();
}
}

@Override
public DorisRecord serialize(RowDataWithMeta meta) throws IOException {
RowData record = meta.getRowData();
String[] fieldNames = meta.getFieldNames();
DataType[] dataTypes = meta.getDataTypes();
DorisRowConverter rowConverter = new DorisRowConverter().setExternalConverter(dataTypes);
int maxIndex = Math.min(record.getArity(), fieldNames.length);
String valString;
if (JSON.equals(type)) {
valString = buildJsonString(record, maxIndex, rowConverter, fieldNames);
} else if (CSV.equals(type)) {
valString = buildCSVString(record, maxIndex, rowConverter);
} else if (ARROW.equals(type)) {
arrowSerializer = meta.getArrowSerializer();
outputStream = meta.getOutputStream();
if (arrowSerializer == null || outputStream == null) {
throw new IllegalArgumentException("rowDataWithMeta arrowSerializer is null!");
}
arrowWriteCnt += 1;
arrowSerializer.write(record);
if (arrowWriteCnt < arrowBatchCnt) {
return DorisRecord.empty;
}
return arrowToDorisRecord();
} else {
throw new IllegalArgumentException("The type " + type + " is not supported!");
}
return DorisRecord.of(
meta.getDatabase(), meta.getTable(), valString.getBytes(StandardCharsets.UTF_8));
}

@Override
public DorisRecord flush() {
if (JSON.equals(type) || CSV.equals(type)) {
return DorisRecord.empty;
} else if (ARROW.equals(type)) {
return arrowToDorisRecord();
} else {
throw new IllegalArgumentException("The type " + type + " is not supported!");
}
}

@Override
public void close() throws Exception {
if (ARROW.equals(type) && arrowSerializer != null) {
arrowSerializer.close();
}
}

public DorisRecord arrowToDorisRecord() {
if (arrowWriteCnt == 0) {
return DorisRecord.empty;
}
arrowWriteCnt = 0;
try {
arrowSerializer.finishCurrentBatch();
byte[] bytes = outputStream.toByteArray();
outputStream.reset();
arrowSerializer.resetWriter();
return DorisRecord.of(bytes);
} catch (Exception e) {
LOG.error("Failed to convert arrow batch:", e);
}
return DorisRecord.empty;
}

public String buildJsonString(
RowData record, int maxIndex, DorisRowConverter rowConverter, String[] fieldNames)
throws IOException {
int fieldIndex = 0;
Map<String, String> valueMap = new HashMap<>();
while (fieldIndex < maxIndex) {
Object field = rowConverter.convertExternal(record, fieldIndex);
String value = field != null ? field.toString() : null;
valueMap.put(fieldNames[fieldIndex], value);
fieldIndex++;
}
if (enableDelete) {
valueMap.put(DORIS_DELETE_SIGN, parseDeleteSign(record.getRowKind()));
}
return objectMapper.writeValueAsString(valueMap);
}

public String buildCSVString(RowData record, int maxIndex, DorisRowConverter rowConverter)
throws IOException {
int fieldIndex = 0;
StringJoiner joiner = new StringJoiner(fieldDelimiter);
while (fieldIndex < maxIndex) {
Object field = rowConverter.convertExternal(record, fieldIndex);
String value = field != null ? field.toString() : NULL_VALUE;
joiner.add(value);
fieldIndex++;
}
if (enableDelete) {
joiner.add(parseDeleteSign(record.getRowKind()));
}
return joiner.toString();
}

public String parseDeleteSign(RowKind rowKind) {
if (RowKind.INSERT.equals(rowKind) || RowKind.UPDATE_AFTER.equals(rowKind)) {
return "0";
} else if (RowKind.DELETE.equals(rowKind) || RowKind.UPDATE_BEFORE.equals(rowKind)) {
return "1";
} else {
throw new IllegalArgumentException("Unrecognized row kind:" + rowKind.toString());
}
}

public static RowDataWithMetaSerializer.Builder builder() {
return new RowDataWithMetaSerializer.Builder();
}

/** Builder for RowDataWithMetaSerializer. */
public static class Builder {
private String type;
private String fieldDelimiter;
private boolean deletable;

public RowDataWithMetaSerializer.Builder setType(String type) {
this.type = type;
return this;
}

public RowDataWithMetaSerializer.Builder setFieldDelimiter(String fieldDelimiter) {
this.fieldDelimiter = EscapeHandler.escapeString(fieldDelimiter);
return this;
}

public RowDataWithMetaSerializer.Builder enableDelete(boolean deletable) {
this.deletable = deletable;
return this;
}

public RowDataWithMetaSerializer build() {
Preconditions.checkState(
CSV.equals(type) && fieldDelimiter != null || JSON.equals(type) || ARROW.equals(type));
if (ARROW.equals(type)) {
Preconditions.checkArgument(!deletable);
}
return new RowDataWithMetaSerializer(type, fieldDelimiter, deletable);
}
}
}

0 comments on commit 9cc44a7

Please sign in to comment.