Skip to content

Commit 01dac19

Browse files
gaoliangzoudaokoulife
gaoliang
authored andcommitted
[Feature-#1899][connector][mysql] The connector supports MySQL Driver 8.x
1 parent 3a3f015 commit 01dac19

File tree

2 files changed

+395
-0
lines changed

2 files changed

+395
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,381 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.chunjun.connector.mysql.converter;
20+
21+
import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement;
22+
import com.dtstack.chunjun.converter.AbstractRowConverter;
23+
import com.dtstack.chunjun.converter.IDeserializationConverter;
24+
import com.dtstack.chunjun.converter.ISerializationConverter;
25+
import com.dtstack.chunjun.util.ExternalDataUtil;
26+
import com.dtstack.chunjun.util.GsonUtil;
27+
28+
import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil;
29+
import org.apache.flink.table.data.*;
30+
import org.apache.flink.table.types.logical.*;
31+
import org.apache.flink.table.types.utils.TypeConversions;
32+
33+
import io.vertx.core.json.JsonArray;
34+
import lombok.extern.slf4j.Slf4j;
35+
36+
import java.math.BigDecimal;
37+
import java.math.BigInteger;
38+
import java.sql.*;
39+
import java.time.LocalDate;
40+
import java.time.LocalDateTime;
41+
import java.time.LocalTime;
42+
import java.util.HashMap;
43+
import java.util.List;
44+
import java.util.Map;
45+
46+
/** mysql sql converter */
47+
@Slf4j
48+
public class MysqlSqlConverter
49+
extends AbstractRowConverter<
50+
ResultSet, JsonArray, FieldNamedPreparedStatement, LogicalType> {
51+
52+
private static final long serialVersionUID = -931022595406994406L;
53+
54+
public MysqlSqlConverter(RowType rowType) {
55+
super(rowType);
56+
List<RowType.RowField> fields = rowType.getFields();
57+
for (RowType.RowField field : fields) {
58+
toInternalConverters.add(
59+
wrapIntoNullableInternalConverter(createInternalConverter(field)));
60+
toExternalConverters.add(
61+
wrapIntoNullableExternalConverter(
62+
createExternalConverter(field), field.getType()));
63+
}
64+
}
65+
66+
@Override
67+
protected ISerializationConverter<FieldNamedPreparedStatement>
68+
wrapIntoNullableExternalConverter(
69+
ISerializationConverter<FieldNamedPreparedStatement> serializationConverter,
70+
LogicalType type) {
71+
int sqlType = 0;
72+
try {
73+
// Exclude nested data types, such as ROW(id int,data ROW(id string))
74+
sqlType =
75+
JdbcTypeUtil.typeInformationToSqlType(
76+
TypeConversions.fromDataTypeToLegacyInfo(
77+
TypeConversions.fromLogicalToDataType(type)));
78+
} catch (IllegalArgumentException e) {
79+
log.warn(e.getMessage());
80+
}
81+
int finalSqlType = sqlType;
82+
return (val, index, statement) -> {
83+
if (val == null
84+
|| val.isNullAt(index)
85+
|| LogicalTypeRoot.NULL.equals(type.getTypeRoot())) {
86+
statement.setNull(index, finalSqlType);
87+
} else {
88+
serializationConverter.serialize(val, index, statement);
89+
}
90+
};
91+
}
92+
93+
@Override
94+
public RowData toInternal(ResultSet resultSet) throws Exception {
95+
GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount());
96+
for (int pos = 0; pos < rowType.getFieldCount(); pos++) {
97+
Object field = resultSet.getObject(pos + 1);
98+
if (resultSet.getMetaData().getColumnTypeName(pos + 1).equals("BIGINT UNSIGNED")
99+
&& field != null) {
100+
field = ((BigInteger) field).longValue();
101+
}
102+
if (resultSet.getMetaData().getColumnTypeName(pos + 1).equals("INT UNSIGNED")
103+
&& field != null) {
104+
field = ((Long) field).intValue();
105+
}
106+
genericRowData.setField(pos, toInternalConverters.get(pos).deserialize(field));
107+
}
108+
return genericRowData;
109+
}
110+
111+
@Override
112+
public RowData toInternalLookup(JsonArray jsonArray) throws Exception {
113+
GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount());
114+
for (int pos = 0; pos < rowType.getFieldCount(); pos++) {
115+
Object field = jsonArray.getValue(pos);
116+
// 当sql里声明的字段类型为BIGINT时,将BigInteger (BIGINT UNSIGNED) 转换为Long
117+
if (rowType.getFields()
118+
.get(pos)
119+
.getType()
120+
.getTypeRoot()
121+
.name()
122+
.equalsIgnoreCase("BIGINT")
123+
&& field instanceof BigInteger) {
124+
field = ((BigInteger) field).longValue();
125+
}
126+
// 当sql里声明的字段类型为INT时,将Long (INT UNSIGNED) 转换为Integer
127+
if (rowType.getFields()
128+
.get(pos)
129+
.getType()
130+
.getTypeRoot()
131+
.name()
132+
.equalsIgnoreCase("INTEGER")
133+
&& field instanceof Long) {
134+
field = ((Long) field).intValue();
135+
}
136+
genericRowData.setField(pos, toInternalConverters.get(pos).deserialize(field));
137+
}
138+
return genericRowData;
139+
}
140+
141+
@Override
142+
public FieldNamedPreparedStatement toExternal(
143+
RowData rowData, FieldNamedPreparedStatement statement) throws Exception {
144+
for (int index = 0; index < fieldTypes.length; index++) {
145+
toExternalConverters.get(index).serialize(rowData, index, statement);
146+
}
147+
return statement;
148+
}
149+
150+
protected IDeserializationConverter createInternalConverter(RowType.RowField rowField) {
151+
LogicalType type = rowField.getType();
152+
switch (type.getTypeRoot()) {
153+
case NULL:
154+
return val -> null;
155+
case BOOLEAN:
156+
case FLOAT:
157+
case DOUBLE:
158+
case INTERVAL_YEAR_MONTH:
159+
case INTERVAL_DAY_TIME:
160+
case INTEGER:
161+
case BIGINT:
162+
return val -> val;
163+
case TINYINT:
164+
return val -> ((Integer) val).byteValue();
165+
case SMALLINT:
166+
// Converter for small type that casts value to int and then return short value,
167+
// since
168+
// JDBC 1.0 use int type for small values.
169+
return val -> val instanceof Integer ? ((Integer) val).shortValue() : val;
170+
case DECIMAL:
171+
final int precision = ((DecimalType) type).getPrecision();
172+
final int scale = ((DecimalType) type).getScale();
173+
// using decimal(20, 0) to support db type bigint unsigned, user should define
174+
// decimal(20, 0) in SQL,
175+
// but other precision like decimal(30, 0) can work too from lenient consideration.
176+
return val ->
177+
val instanceof BigInteger
178+
? DecimalData.fromBigDecimal(
179+
new BigDecimal((BigInteger) val, 0), precision, scale)
180+
: DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
181+
case DATE:
182+
return val ->
183+
val instanceof Timestamp
184+
? (int)
185+
(((Timestamp) val)
186+
.toLocalDateTime()
187+
.toLocalDate()
188+
.toEpochDay())
189+
: (int)
190+
((Date.valueOf(String.valueOf(val)))
191+
.toLocalDate()
192+
.toEpochDay());
193+
case TIME_WITHOUT_TIME_ZONE:
194+
return val ->
195+
(int)
196+
((Time.valueOf(String.valueOf(val))).toLocalTime().toNanoOfDay()
197+
/ 1_000_000L);
198+
case TIMESTAMP_WITH_TIME_ZONE:
199+
case TIMESTAMP_WITHOUT_TIME_ZONE:
200+
return val -> {
201+
if (val instanceof LocalDateTime) {
202+
return TimestampData.fromTimestamp(Timestamp.valueOf((LocalDateTime) val));
203+
}
204+
return TimestampData.fromTimestamp((Timestamp) val);
205+
};
206+
case CHAR:
207+
case VARCHAR:
208+
return val -> val == null ? null : StringData.fromString(val.toString());
209+
case BINARY:
210+
case VARBINARY:
211+
return val -> (byte[]) val;
212+
case ARRAY:
213+
return (val) -> {
214+
Array val1 = (Array) val;
215+
Object[] array = (Object[]) val1.getArray();
216+
Object[] result = new Object[array.length];
217+
LogicalType logicalType = type.getChildren().get(0);
218+
RowType.RowField rowField1 = new RowType.RowField("", logicalType, "");
219+
IDeserializationConverter internalConverter =
220+
createInternalConverter(rowField1);
221+
for (int i = 0; i < array.length; i++) {
222+
Object value = internalConverter.deserialize(array[i]);
223+
result[i] = value;
224+
}
225+
return new GenericArrayData(result);
226+
};
227+
228+
case ROW:
229+
return val -> {
230+
List<RowType.RowField> childrenFields = ((RowType) type).getFields();
231+
HashMap childrenData = GsonUtil.GSON.fromJson(val.toString(), HashMap.class);
232+
GenericRowData genericRowData = new GenericRowData(childrenFields.size());
233+
for (int i = 0; i < childrenFields.size(); i++) {
234+
Object value =
235+
createInternalConverter(childrenFields.get(i))
236+
.deserialize(
237+
childrenData.get(childrenFields.get(i).getName()));
238+
genericRowData.setField(i, value);
239+
}
240+
return genericRowData;
241+
};
242+
case MAP:
243+
return val -> {
244+
if (val == null) {
245+
return null;
246+
}
247+
HashMap<Object, Object> resultMap = new HashMap<>();
248+
Map map = GsonUtil.GSON.fromJson(val.toString(), Map.class);
249+
LogicalType keyType = ((MapType) type).getKeyType();
250+
LogicalType valueType = ((MapType) type).getValueType();
251+
RowType.RowField keyRowField = new RowType.RowField("", keyType, "");
252+
RowType.RowField valueRowField = new RowType.RowField("", valueType, "");
253+
IDeserializationConverter keyInternalConverter =
254+
createInternalConverter(keyRowField);
255+
IDeserializationConverter valueInternalConverter =
256+
createInternalConverter(valueRowField);
257+
for (Object key : map.keySet()) {
258+
resultMap.put(
259+
keyInternalConverter.deserialize(key),
260+
valueInternalConverter.deserialize(map.get(key)));
261+
}
262+
263+
return new GenericMapData(resultMap);
264+
};
265+
case STRUCTURED_TYPE:
266+
case RAW:
267+
default:
268+
throw new UnsupportedOperationException("Unsupported type:" + type);
269+
}
270+
}
271+
272+
protected ISerializationConverter<FieldNamedPreparedStatement> createExternalConverter(
273+
RowType.RowField rowField) {
274+
LogicalType type = rowField.getType();
275+
switch (type.getTypeRoot()) {
276+
case BOOLEAN:
277+
return (val, index, statement) ->
278+
statement.setBoolean(index, val.getBoolean(index));
279+
case TINYINT:
280+
return (val, index, statement) -> statement.setByte(index, val.getByte(index));
281+
case SMALLINT:
282+
return (val, index, statement) -> statement.setShort(index, val.getShort(index));
283+
case INTEGER:
284+
case INTERVAL_YEAR_MONTH:
285+
return (val, index, statement) -> statement.setInt(index, val.getInt(index));
286+
case BIGINT:
287+
case INTERVAL_DAY_TIME:
288+
return (val, index, statement) -> statement.setLong(index, val.getLong(index));
289+
case FLOAT:
290+
return (val, index, statement) -> statement.setFloat(index, val.getFloat(index));
291+
case DOUBLE:
292+
return (val, index, statement) -> statement.setDouble(index, val.getDouble(index));
293+
case CHAR:
294+
case VARCHAR:
295+
// value is BinaryString
296+
return (val, index, statement) ->
297+
statement.setString(index, val.getString(index).toString());
298+
case BINARY:
299+
case VARBINARY:
300+
return (val, index, statement) -> statement.setBytes(index, val.getBinary(index));
301+
case DATE:
302+
return (val, index, statement) ->
303+
statement.setDate(
304+
index, Date.valueOf(LocalDate.ofEpochDay(val.getInt(index))));
305+
case TIME_WITHOUT_TIME_ZONE:
306+
return (val, index, statement) ->
307+
statement.setTime(
308+
index,
309+
Time.valueOf(
310+
LocalTime.ofNanoOfDay(val.getInt(index) * 1_000_000L)));
311+
case TIMESTAMP_WITH_TIME_ZONE:
312+
case TIMESTAMP_WITHOUT_TIME_ZONE:
313+
final int timestampPrecision = ((TimestampType) type).getPrecision();
314+
return (val, index, statement) ->
315+
statement.setTimestamp(
316+
index, val.getTimestamp(index, timestampPrecision).toTimestamp());
317+
case DECIMAL:
318+
final int decimalPrecision = ((DecimalType) type).getPrecision();
319+
final int decimalScale = ((DecimalType) type).getScale();
320+
return (val, index, statement) ->
321+
statement.setBigDecimal(
322+
index,
323+
val.getDecimal(index, decimalPrecision, decimalScale)
324+
.toBigDecimal());
325+
case ROW:
326+
return (val, index, statement) -> {
327+
List<RowType.RowField> fields = ((RowType) type).getFields();
328+
HashMap<String, Object> map = new HashMap<>();
329+
for (int i = 0; i < fields.size(); i++) {
330+
ExternalDataUtil.rowDataToExternal(
331+
val.getRow(index, fields.size()),
332+
i,
333+
fields.get(i).getType(),
334+
map,
335+
fields.get(i).getName());
336+
}
337+
338+
statement.setObject(index, GsonUtil.GSON.toJson(map));
339+
};
340+
341+
case ARRAY:
342+
return (val, index, statement) -> {
343+
Connection connection = statement.getConnection();
344+
ArrayData array = val.getArray(index);
345+
Object[] obj = new Object[array.size()];
346+
ExternalDataUtil.arrayDataToExternal(type.getChildren().get(0), obj, array);
347+
Array result =
348+
connection.createArrayOf(
349+
type.getChildren().get(0).getTypeRoot().name(), obj);
350+
statement.setArray(index, result);
351+
};
352+
case MULTISET:
353+
return (val, index, statement) -> {
354+
Connection connection = statement.getConnection();
355+
MapData map = val.getMap(index);
356+
ArrayData arrayData = map.keyArray();
357+
Object[] obj = new Object[arrayData.size()];
358+
ExternalDataUtil.arrayDataToExternal(type.getChildren().get(0), obj, arrayData);
359+
Array result =
360+
connection.createArrayOf(
361+
type.getChildren().get(0).getTypeRoot().name(), obj);
362+
statement.setArray(index, result);
363+
};
364+
case MAP:
365+
return (val, index, statement) -> {
366+
MapData map = val.getMap(index);
367+
Map<Object, Object> resultMap = new HashMap<>();
368+
ExternalDataUtil.mapDataToExternal(
369+
map,
370+
((MapType) type).getKeyType(),
371+
((MapType) type).getValueType(),
372+
resultMap);
373+
statement.setObject(index, resultMap);
374+
};
375+
case STRUCTURED_TYPE:
376+
case RAW:
377+
default:
378+
throw new UnsupportedOperationException("Unsupported type:" + type);
379+
}
380+
}
381+
}

0 commit comments

Comments
 (0)