Skip to content

Commit 994c2d1

Browse files
libailinzoudaokoulife
authored andcommitted
[Feature-#1877][ftp] Supports reading Chinese paths and optimizing reading excel
[Feature-#1877][ftp] Supports reading Chinese paths and optimizing reading excel
1 parent 03ab05a commit 994c2d1

File tree

7 files changed

+105
-14
lines changed

7 files changed

+105
-14
lines changed

chunjun-connectors/chunjun-connector-ftp/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ under the License.
6363
<dependency>
6464
<groupId>com.alibaba</groupId>
6565
<artifactId>easyexcel</artifactId>
66-
<version>3.0.1</version>
66+
<version>3.2.0</version>
6767
</dependency>
6868

6969
<dependency>

chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/excel/ExcelReadListener.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,35 @@
1818

1919
package com.dtstack.chunjun.connector.ftp.client.excel;
2020

21+
import com.dtstack.chunjun.util.DateUtil;
22+
2123
import com.alibaba.excel.context.AnalysisContext;
2224
import com.alibaba.excel.read.listener.ReadListener;
2325

26+
import java.time.LocalDateTime;
2427
import java.util.Map;
2528
import java.util.concurrent.BlockingQueue;
2629
import java.util.concurrent.LinkedBlockingQueue;
2730

28-
public class ExcelReadListener implements ReadListener<Map<Integer, String>> {
31+
public class ExcelReadListener implements ReadListener<Map<Integer, Object>> {
2932

3033
private final BlockingQueue<Row> queue = new LinkedBlockingQueue<>(4096);
3134

3235
@Override
33-
public void invoke(Map<Integer, String> data, AnalysisContext context) {
36+
public void invoke(Map<Integer, Object> data, AnalysisContext context) {
3437
String[] piece = new String[data.size()];
35-
for (Map.Entry<Integer, String> entry : data.entrySet()) {
36-
piece[entry.getKey()] = entry.getValue() == null ? "" : entry.getValue();
38+
for (Map.Entry<Integer, Object> entry : data.entrySet()) {
39+
String value = "";
40+
if (entry.getValue() != null) {
41+
if (entry.getValue() instanceof LocalDateTime) {
42+
value =
43+
DateUtil.timestampToString(
44+
DateUtil.localDateTimetoDate((LocalDateTime) entry.getValue()));
45+
} else {
46+
value = String.valueOf(entry.getValue());
47+
}
48+
}
49+
piece[entry.getKey()] = value;
3750
}
3851
Row row =
3952
new Row(

chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/converter/FtpSqlConverter.java

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,23 @@
1919
package com.dtstack.chunjun.connector.ftp.converter;
2020

2121
import com.dtstack.chunjun.converter.AbstractRowConverter;
22+
import com.dtstack.chunjun.converter.IDeserializationConverter;
2223

2324
import org.apache.flink.api.common.serialization.DeserializationSchema;
2425
import org.apache.flink.api.common.serialization.SerializationSchema;
26+
import org.apache.flink.table.data.DecimalData;
27+
import org.apache.flink.table.data.GenericRowData;
2528
import org.apache.flink.table.data.RowData;
29+
import org.apache.flink.table.data.StringData;
30+
import org.apache.flink.table.types.logical.DecimalType;
2631
import org.apache.flink.table.types.logical.LogicalType;
32+
import org.apache.flink.table.types.logical.RowType;
2733

28-
public class FtpSqlConverter extends AbstractRowConverter<String, String, String, LogicalType> {
34+
import java.math.BigDecimal;
35+
import java.sql.Date;
36+
import java.sql.Time;
37+
38+
public class FtpSqlConverter extends AbstractRowConverter<String[], String, String, LogicalType> {
2939

3040
private static final long serialVersionUID = 4127516611259169686L;
3141

@@ -41,10 +51,59 @@ public FtpSqlConverter(SerializationSchema<RowData> valueSerialization) {
4151
this.valueSerialization = valueSerialization;
4252
}
4353

54+
public FtpSqlConverter(RowType rowType) {
55+
super(rowType);
56+
for (int i = 0; i < rowType.getFieldCount(); i++) {
57+
toInternalConverters.add(
58+
wrapIntoNullableInternalConverter(
59+
createInternalConverter(rowType.getTypeAt(i))));
60+
}
61+
}
62+
63+
@Override
64+
protected IDeserializationConverter createInternalConverter(LogicalType type) {
65+
switch (type.getTypeRoot()) {
66+
case NULL:
67+
return val -> null;
68+
case INTEGER:
69+
return val -> Integer.valueOf((String) val);
70+
case BIGINT:
71+
return val -> Long.valueOf((String) val);
72+
case FLOAT:
73+
return val -> Float.valueOf((String) val);
74+
case DOUBLE:
75+
return val -> Double.valueOf((String) val);
76+
case DECIMAL:
77+
DecimalType decimalType = (DecimalType) type;
78+
final int precision = decimalType.getPrecision();
79+
final int scale = decimalType.getScale();
80+
return val -> {
81+
BigDecimal decimal = new BigDecimal(String.valueOf(val));
82+
return DecimalData.fromBigDecimal(decimal, precision, scale);
83+
};
84+
case CHAR:
85+
case VARCHAR:
86+
return val -> StringData.fromString((String) val);
87+
case DATE:
88+
return val ->
89+
(int) ((Date.valueOf(String.valueOf(val))).toLocalDate().toEpochDay());
90+
case TIME_WITHOUT_TIME_ZONE:
91+
return val ->
92+
(int)
93+
((Time.valueOf(String.valueOf(val))).toLocalTime().toNanoOfDay()
94+
/ 1_000_000L);
95+
default:
96+
throw new UnsupportedOperationException(type.toString());
97+
}
98+
}
99+
44100
@Override
45-
public RowData toInternal(String input) throws Exception {
46-
valueDeserialization.open(new DummyInitializationContext());
47-
return valueDeserialization.deserialize(input.getBytes());
101+
public RowData toInternal(String[] input) throws Exception {
102+
GenericRowData rowData = new GenericRowData(input.length);
103+
for (int i = 0; i < fieldTypes.length; i++) {
104+
rowData.setField(i, toInternalConverters.get(i).deserialize(input[i]));
105+
}
106+
return rowData;
48107
}
49108

50109
@Override

chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandler.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class FtpHandler implements DTFtpHandler {
4646
private FTPClient ftpClient = null;
4747
private String controlEncoding;
4848
private FtpConfig ftpConfig;
49+
private boolean isEncodePath = true;
4950

5051
public FTPClient getFtpClient() {
5152
return ftpClient;
@@ -57,6 +58,8 @@ public void loginFtpServer(FtpConfig ftpConfig) {
5758
controlEncoding = ftpConfig.getControlEncoding();
5859
ftpClient = new FTPClient();
5960
try {
61+
// 设置编码: 解决中文路径问题, 需要在连接前设置编码
62+
ftpClient.setControlEncoding(controlEncoding);
6063
// 连接
6164
ftpClient.connect(ftpConfig.getHost(), ftpConfig.getPort());
6265
// 登录
@@ -87,12 +90,18 @@ public void loginFtpServer(FtpConfig ftpConfig) {
8790
log.error(message);
8891
throw new RuntimeException(message);
8992
}
90-
ftpClient.setControlEncoding(ftpConfig.getControlEncoding());
9193
ftpClient.setListHiddenFiles(ftpConfig.isListHiddenFiles());
9294
if (StringUtils.isNotEmpty(ftpConfig.getCompressType())) {
9395
// 设置文件传输类型为二进制
9496
ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
9597
}
98+
// 开启服务器对UTF-8的支持,解决读取中文路径或者中文文件名失败的问题
99+
if (FTPReply.isPositiveCompletion(ftpClient.sendCommand("OPTS UTF8", "ON"))) {
100+
log.info("ftp server support UTF-8");
101+
isEncodePath = false;
102+
} else {
103+
log.warn("ftp server not support UTF-8");
104+
}
96105
} catch (Exception e) {
97106
throw new RuntimeException(e);
98107
}
@@ -440,7 +449,10 @@ private boolean isExist(String path) {
440449
}
441450

442451
private String encodePath(String path) throws UnsupportedEncodingException {
443-
return new String(path.getBytes(controlEncoding), FTP.DEFAULT_CONTROL_ENCODING);
452+
if (isEncodePath) {
453+
return new String(path.getBytes(controlEncoding), FTP.DEFAULT_CONTROL_ENCODING);
454+
}
455+
return new String(path.getBytes(controlEncoding));
444456
}
445457

446458
public void reconnectFtp() {

chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/iformat/ExcelFileFormat.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.concurrent.ThreadPoolExecutor;
4141
import java.util.concurrent.TimeUnit;
4242

43+
import static com.alibaba.excel.enums.ReadDefaultReturnEnum.ACTUAL_DATA;
4344
import static java.util.concurrent.TimeUnit.NANOSECONDS;
4445

4546
@Slf4j
@@ -62,13 +63,17 @@ public void open(File file, InputStream inputStream, IFormatConfig config) {
6263
ExcelReadListener listener = new ExcelReadListener();
6364
this.queue = listener.getQueue();
6465
this.ec = new ExcelSubExceptionCarrier();
65-
6666
ExcelReaderBuilder builder = EasyExcel.read(inputStream, listener);
6767
if (!config.isFirstLineHeader()) {
6868
builder.headRowNumber(0);
6969
}
7070
builder.ignoreEmptyRow(true);
7171
builder.autoCloseStream(true);
72+
// @since 3.2.0
73+
// STRING:会返回一个Map<Integer,String>的数组,返回值就是你在excel里面不点击单元格看到的内容
74+
// ACTUAL_DATA:会返回一个Map<Integer,Object>的数组,返回实际上存储的数据,会帮自动转换类型,Object类型为BigDecimal、Boolean、String、LocalDateTime、null,中的一个,
75+
// READ_CELL_DATA: 会返回一个Map<Integer,ReadCellData<?>>的数组,其中?类型参照ACTUAL_DATA的
76+
builder.readDefaultReturn(ACTUAL_DATA);
7277
ExcelReader reader = builder.build();
7378

7479
this.sheetNum = reader.excelExecutor().sheetList().size();

chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpDynamicTableSource.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
6363
builder.setFtpConfig(ftpConfig);
6464
builder.setRowConverter(
6565
new FtpSqlConverter(
66-
decodingFormat.createRuntimeDecoder(runtimeProviderContext, dataType)));
66+
InternalTypeInfo.of(schema.toPhysicalRowDataType().getLogicalType())
67+
.toRowType()));
6768

6869
return ParallelSourceFunctionProvider.of(
6970
new DtInputFormatSourceFunction<>(builder.finish(), typeInformation),

chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputFormat.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,8 @@ protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException
173173
}
174174

175175
if (rowConverter instanceof FtpSqlConverter) {
176-
rowData = rowConverter.toInternal(String.join(",", fields));
176+
// 解决数据里包含特殊符号(逗号、换行符)
177+
rowData = rowConverter.toInternal(fields);
177178
} else if (rowConverter instanceof FtpSyncConverter) {
178179

179180
List<FieldConfig> columns = ftpConfig.getColumn();

0 commit comments

Comments
 (0)