|
23 | 23 | import com.dtstack.chunjun.converter.IDeserializationConverter;
|
24 | 24 | import com.dtstack.chunjun.converter.ISerializationConverter;
|
25 | 25 | import com.dtstack.chunjun.throwable.UnsupportedTypeException;
|
| 26 | +import com.dtstack.chunjun.util.DateUtil; |
26 | 27 |
|
| 28 | +import org.apache.flink.table.data.DecimalData; |
27 | 29 | import org.apache.flink.table.data.GenericRowData;
|
28 | 30 | import org.apache.flink.table.data.RowData;
|
29 | 31 | import org.apache.flink.table.data.StringData;
|
| 32 | +import org.apache.flink.table.data.TimestampData; |
| 33 | +import org.apache.flink.table.types.logical.DecimalType; |
30 | 34 | import org.apache.flink.table.types.logical.LogicalType;
|
31 | 35 | import org.apache.flink.table.types.logical.RowType;
|
| 36 | +import org.apache.flink.table.types.logical.TimestampType; |
32 | 37 |
|
| 38 | +import org.apache.commons.lang3.StringUtils; |
| 39 | + |
| 40 | +import java.math.BigDecimal; |
| 41 | +import java.math.BigInteger; |
33 | 42 | import java.sql.Date;
|
34 | 43 | import java.sql.Time;
|
| 44 | +import java.sql.Timestamp; |
35 | 45 | import java.time.LocalDate;
|
| 46 | +import java.time.LocalDateTime; |
36 | 47 | import java.time.LocalTime;
|
37 | 48 |
|
38 | 49 | public class S3SqlConverter extends AbstractRowConverter<String[], RowData, String[], LogicalType> {
|
@@ -82,12 +93,41 @@ protected IDeserializationConverter createInternalConverter(LogicalType type) {
|
82 | 93 | return val -> Float.valueOf((String) val);
|
83 | 94 | case DOUBLE:
|
84 | 95 | return val -> Double.valueOf((String) val);
|
| 96 | + case DECIMAL: |
| 97 | + final int precision = ((DecimalType) type).getPrecision(); |
| 98 | + final int scale = ((DecimalType) type).getScale(); |
| 99 | + return val -> |
| 100 | + val instanceof BigInteger |
| 101 | + ? DecimalData.fromBigDecimal( |
| 102 | + new BigDecimal((BigInteger) val, 0), precision, scale) |
| 103 | + : DecimalData.fromBigDecimal( |
| 104 | + StringUtils.isNotEmpty(String.valueOf(val)) |
| 105 | + ? new BigDecimal(String.valueOf(val)) |
| 106 | + : BigDecimal.ZERO, |
| 107 | + precision, |
| 108 | + scale); |
| 109 | + case TIMESTAMP_WITH_TIME_ZONE: |
| 110 | + case TIMESTAMP_WITHOUT_TIME_ZONE: |
| 111 | + return val -> { |
| 112 | + if (val instanceof String) { |
| 113 | + return TimestampData.fromTimestamp(Timestamp.valueOf((String) val)); |
| 114 | + } else if (val instanceof LocalDateTime) { |
| 115 | + return TimestampData.fromTimestamp(Timestamp.valueOf((LocalDateTime) val)); |
| 116 | + } else { |
| 117 | + return TimestampData.fromTimestamp(((Timestamp) val)); |
| 118 | + } |
| 119 | + }; |
85 | 120 | case CHAR:
|
86 | 121 | case VARCHAR:
|
87 | 122 | return val -> StringData.fromString((String) val);
|
88 | 123 | case DATE:
|
89 |
| - return val -> |
90 |
| - (int) ((Date.valueOf(String.valueOf(val))).toLocalDate().toEpochDay()); |
| 124 | + return val -> { |
| 125 | + if (StringUtils.isEmpty(String.valueOf(val))) { |
| 126 | + return null; |
| 127 | + } |
| 128 | + Date date = new Date(DateUtil.stringToDate(String.valueOf(val)).getTime()); |
| 129 | + return (int) date.toLocalDate().toEpochDay(); |
| 130 | + }; |
91 | 131 | case TIME_WITHOUT_TIME_ZONE:
|
92 | 132 | return val ->
|
93 | 133 | (int)
|
@@ -124,6 +164,22 @@ protected ISerializationConverter<String[]> createExternalConverter(LogicalType
|
124 | 164 | output[index] =
|
125 | 165 | Time.valueOf(LocalTime.ofNanoOfDay(val.getInt(index) * 1_000_000L))
|
126 | 166 | .toString();
|
| 167 | + case DECIMAL: |
| 168 | + return (rowData, index, data) -> |
| 169 | + data[index] = |
| 170 | + String.valueOf( |
| 171 | + rowData.getDecimal( |
| 172 | + index, |
| 173 | + ((DecimalType) type).getPrecision(), |
| 174 | + ((DecimalType) type).getScale())); |
| 175 | + case TIMESTAMP_WITHOUT_TIME_ZONE: |
| 176 | + return (rowData, index, data) -> |
| 177 | + data[index] = |
| 178 | + String.valueOf( |
| 179 | + rowData.getTimestamp( |
| 180 | + index, |
| 181 | + ((TimestampType) type).getPrecision()) |
| 182 | + .toTimestamp()); |
127 | 183 | default:
|
128 | 184 | throw new UnsupportedTypeException(type.toString());
|
129 | 185 | }
|
|
0 commit comments