Skip to content

Commit 507e4cb

Browse files
committed
modify time parse position
1 parent 785811c commit 507e4cb

File tree

5 files changed

+78
-314
lines changed

5 files changed

+78
-314
lines changed

core/src/main/java/com/dtstack/flink/sql/util/DateUtil.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,18 @@
2323
import java.sql.Timestamp;
2424
import java.text.ParseException;
2525
import java.text.SimpleDateFormat;
26+
import java.time.Instant;
27+
import java.time.LocalDate;
28+
import java.time.LocalTime;
29+
import java.time.ZoneOffset;
2630
import java.util.Calendar;
2731
import java.util.Date;
2832
import java.util.Locale;
2933
import java.util.SimpleTimeZone;
34+
import java.util.TimeZone;
35+
import java.util.regex.Pattern;
36+
37+
import static java.time.format.DateTimeFormatter.ISO_INSTANT;
3038

3139

3240
/**
@@ -47,6 +55,12 @@ public class DateUtil {
4755
static final SimpleDateFormat dateFormatter = new SimpleDateFormat(dateFormat);
4856
static final SimpleDateFormat timeFormatter = new SimpleDateFormat(timeFormat);
4957

58+
private static final Pattern DATETIME = Pattern.compile("^\\d{4}-(?:0[0-9]|1[0-2])-[0-9]{2}T\\d{2}:\\d{2}:\\d{2}(\\.\\d{3,9})?Z$");
59+
private static final Pattern DATE = Pattern.compile("^\\d{4}-(?:0[0-9]|1[0-2])-[0-9]{2}$");
60+
61+
private static final int MILLIS_PER_SECOND = 1000;
62+
63+
5064
public static java.sql.Date columnToDate(Object column) {
5165
if(column instanceof String) {
5266
return new java.sql.Date(stringToDate((String)column).getTime());
@@ -770,4 +784,52 @@ public static String timestampToString(Date date) {
770784
return datetimeFormatter.format(date);
771785
}
772786

787+
788+
public static Timestamp getTimestampFromStr(String timeStr) {
789+
if (DATETIME.matcher(timeStr).matches()) {
790+
Instant instant = Instant.from(ISO_INSTANT.parse(timeStr));
791+
return new Timestamp(instant.getEpochSecond() * MILLIS_PER_SECOND);
792+
} else {
793+
java.sql.Date date = null;
794+
try {
795+
date = new java.sql.Date(datetimeFormatter.parse(timeStr).getTime());
796+
} catch (ParseException e) {
797+
throw new RuntimeException("getTimestampFromStr error data is " + timeStr);
798+
}
799+
return new Timestamp(date.getTime());
800+
}
801+
}
802+
803+
public static java.sql.Date getDateFromStr(String dateStr) {
804+
// 2020-01-01 format
805+
if (DATE.matcher(dateStr).matches()) {
806+
// convert from local date to instant
807+
Instant instant = LocalDate.parse(dateStr).atTime(LocalTime.of(0, 0, 0, 0)).toInstant(ZoneOffset.UTC);
808+
// calculate the timezone offset in millis
809+
int offset = TimeZone.getDefault().getOffset(instant.toEpochMilli());
810+
// need to remove the offset since time has no TZ component
811+
return new java.sql.Date(instant.toEpochMilli() - offset);
812+
} else if (DATETIME.matcher(dateStr).matches()) {
813+
// 2020-01-01T12:12:12Z format
814+
Instant instant = Instant.from(ISO_INSTANT.parse(dateStr));
815+
return new java.sql.Date(instant.toEpochMilli());
816+
} else {
817+
try {
818+
// 2020-01-01 12:12:12.0 format
819+
return new java.sql.Date(datetimeFormatter.parse(dateStr).getTime());
820+
} catch (ParseException e) {
821+
throw new RuntimeException("String convert to Date fail.");
822+
}
823+
}
824+
}
825+
826+
827+
public static String getStringFromTimestamp(Timestamp timestamp) {
828+
return datetimeFormatter.format(timestamp);
829+
}
830+
831+
public static String getStringFromDate(java.sql.Date date) {
832+
return dateFormatter.format(date);
833+
}
834+
773835
}

core/src/main/java/com/dtstack/flink/sql/util/MathUtil.java

Lines changed: 2 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,6 @@
4444
*/
4545

4646
public class MathUtil {
47-
48-
private static final Pattern DATETIME = Pattern.compile("^\\d{4}-(?:0[0-9]|1[0-2])-[0-9]{2}T\\d{2}:\\d{2}:\\d{2}(\\.\\d{3,9})?Z$");
49-
private static final Pattern DATE = Pattern.compile("^\\d{4}-(?:0[0-9]|1[0-2])-[0-9]{2}$");
50-
private static final SimpleDateFormat TIMESTAMP_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
51-
private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd");
52-
53-
private static final int MILLIS_PER_SECOND = 1000;
54-
5547
public static Long getLongVal(Object obj) {
5648
if (obj == null) {
5749
return null;
@@ -245,7 +237,7 @@ public static Date getDate(Object obj) {
245237
return null;
246238
}
247239
if (obj instanceof String) {
248-
return getDateFromStr((String) obj);
240+
return DateUtil.getDateFromStr((String) obj);
249241
} else if (obj instanceof Timestamp) {
250242
return new Date(((Timestamp) obj).getTime());
251243
} else if (obj instanceof Date) {
@@ -255,28 +247,6 @@ public static Date getDate(Object obj) {
255247
}
256248

257249

258-
public static Date getDateFromStr(String dateStr) {
259-
// 2020-01-01 format
260-
if (DATE.matcher(dateStr).matches()) {
261-
// convert from local date to instant
262-
Instant instant = LocalDate.parse(dateStr).atTime(LocalTime.of(0, 0, 0, 0)).toInstant(ZoneOffset.UTC);
263-
// calculate the timezone offset in millis
264-
int offset = TimeZone.getDefault().getOffset(instant.toEpochMilli());
265-
// need to remove the offset since time has no TZ component
266-
return new Date(instant.toEpochMilli() - offset);
267-
} else if (DATETIME.matcher(dateStr).matches()) {
268-
// 2020-01-01T12:12:12Z format
269-
Instant instant = Instant.from(ISO_INSTANT.parse(dateStr));
270-
return new Date(instant.toEpochMilli());
271-
} else {
272-
try {
273-
// 2020-01-01 12:12:12.0 format
274-
return new Date(TIMESTAMP_FORMAT.parse(dateStr).getTime());
275-
} catch (ParseException e) {
276-
throw new RuntimeException("String convert to Date fail.");
277-
}
278-
}
279-
}
280250

281251
public static Timestamp getTimestamp(Object obj) {
282252
if (obj == null) {
@@ -287,32 +257,9 @@ public static Timestamp getTimestamp(Object obj) {
287257
} else if (obj instanceof Date) {
288258
return new Timestamp(((Date) obj).getTime());
289259
} else if (obj instanceof String) {
290-
return getTimestampFromStr(obj.toString());
260+
return DateUtil.getTimestampFromStr(obj.toString());
291261
}
292262
throw new RuntimeException("not support type of " + obj.getClass() + " convert to Date.");
293263
}
294264

295-
public static Timestamp getTimestampFromStr(String timeStr) {
296-
if (DATETIME.matcher(timeStr).matches()) {
297-
Instant instant = Instant.from(ISO_INSTANT.parse(timeStr));
298-
return new Timestamp(instant.getEpochSecond() * MILLIS_PER_SECOND);
299-
} else {
300-
Date date = null;
301-
try {
302-
date = new Date(TIMESTAMP_FORMAT.parse(timeStr).getTime());
303-
} catch (ParseException e) {
304-
throw new RuntimeException("getTimestampFromStr error data is " + timeStr);
305-
}
306-
return new Timestamp(date.getTime());
307-
}
308-
}
309-
310-
public static String getStringFromTimestamp(Timestamp timestamp) {
311-
return TIMESTAMP_FORMAT.format(timestamp);
312-
}
313-
314-
public static String getStringFromDate(Date date) {
315-
return DATE_FORMAT.format(date);
316-
}
317-
318265
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(Properties co
6969
Configuration newConf = new Configuration();
7070
confProp.forEach((key, val) -> newConf.setString(key.toString(), val.toString()));
7171

72-
AbstractYarnClusterDescriptor clusterDescriptor = getClusterDescriptor(newConf, yarnConf, ".");
72+
AbstractYarnClusterDescriptor clusterDescriptor = getClusterDescriptor(newConf, yarnConf, launcherOptions.getFlinkconf());
7373

7474
if (StringUtils.isNotBlank(flinkJarPath)) {
7575
if (!new File(flinkJarPath).exists()) {

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.dtstack.flink.sql.side.*;
2424
import com.dtstack.flink.sql.side.cache.CacheObj;
2525
import com.dtstack.flink.sql.side.rdb.util.SwitchUtil;
26+
import com.dtstack.flink.sql.util.DateUtil;
27+
import com.dtstack.flink.sql.util.MathUtil;
2628
import io.vertx.core.Vertx;
2729
import io.vertx.core.VertxOptions;
2830
import io.vertx.core.json.JsonArray;
@@ -38,6 +40,7 @@
3840
import org.slf4j.LoggerFactory;
3941

4042
import java.sql.Timestamp;
43+
import java.util.Date;
4144
import java.util.List;
4245
import java.util.Map;
4346

@@ -84,7 +87,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
8487
dealMissKey(inputRow, resultFuture);
8588
return;
8689
}
87-
inputParams.add(equalObj);
90+
inputParams.add(convertDateType(equalObj));
8891
}
8992

9093
String key = buildCacheKey(inputParams);
@@ -148,6 +151,15 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
148151
});
149152
}
150153

154+
private Object convertDateType(Object val) {
155+
if (val instanceof Timestamp) {
156+
val = DateUtil.getStringFromTimestamp((Timestamp) val);
157+
} else if (val instanceof Date) {
158+
val = DateUtil.getStringFromDate((java.sql.Date) val);
159+
}
160+
return val;
161+
}
162+
151163
protected List<Row> getRows(Row inputRow, List<JsonArray> cacheContent, List<JsonArray> results) {
152164
List<Row> rowList = Lists.newArrayList();
153165
for (JsonArray line : results) {

0 commit comments

Comments
 (0)