Skip to content

Commit 785811c

Browse files
committed
parse async side timestamp type
1 parent 08ecc68 commit 785811c

File tree

2 files changed

+328
-12
lines changed
  • core/src/main/java/com/dtstack/flink/sql/util
  • rdb/rdb-side/src/main/java/io/vertx/core/json

2 files changed

+328
-12
lines changed

core/src/main/java/com/dtstack/flink/sql/util/MathUtil.java

Lines changed: 71 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
* limitations under the License.
1717
*/
1818

19-
2019

2120
package com.dtstack.flink.sql.util;
2221

@@ -27,6 +26,16 @@
2726
import java.text.ParseException;
2827
import java.text.SimpleDateFormat;
2928

29+
30+
import java.time.Instant;
31+
import java.time.LocalDate;
32+
import java.time.LocalTime;
33+
import java.time.ZoneOffset;
34+
import java.util.TimeZone;
35+
import java.util.regex.Pattern;
36+
37+
import static java.time.format.DateTimeFormatter.ISO_INSTANT;
38+
3039
/**
3140
* Convert val to specified numeric type
3241
* Date: 2017/4/21
@@ -36,6 +45,13 @@
3645

3746
public class MathUtil {
3847

48+
private static final Pattern DATETIME = Pattern.compile("^\\d{4}-(?:0[0-9]|1[0-2])-[0-9]{2}T\\d{2}:\\d{2}:\\d{2}(\\.\\d{3,9})?Z$");
49+
private static final Pattern DATE = Pattern.compile("^\\d{4}-(?:0[0-9]|1[0-2])-[0-9]{2}$");
50+
private static final SimpleDateFormat TIMESTAMP_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
51+
private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd");
52+
53+
private static final int MILLIS_PER_SECOND = 1000;
54+
3955
public static Long getLongVal(Object obj) {
4056
if (obj == null) {
4157
return null;
@@ -126,12 +142,12 @@ public static Double getDoubleVal(Object obj) {
126142
return Double.valueOf((String) obj);
127143
} else if (obj instanceof Float) {
128144
return ((Float) obj).doubleValue();
129-
} else if (obj instanceof Double){
145+
} else if (obj instanceof Double) {
130146
return (Double) obj;
131-
}else if (obj instanceof BigDecimal) {
147+
} else if (obj instanceof BigDecimal) {
132148
return ((BigDecimal) obj).doubleValue();
133-
}else if (obj instanceof Integer){
134-
return ((Integer)obj).doubleValue();
149+
} else if (obj instanceof Integer) {
150+
return ((Integer) obj).doubleValue();
135151
}
136152

137153
throw new RuntimeException("not support type of " + obj.getClass() + " convert to Double.");
@@ -229,12 +245,7 @@ public static Date getDate(Object obj) {
229245
return null;
230246
}
231247
if (obj instanceof String) {
232-
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
233-
try {
234-
return new Date(format.parse((String) obj).getTime());
235-
} catch (ParseException e) {
236-
throw new RuntimeException("String convert to Date fail.");
237-
}
248+
return getDateFromStr((String) obj);
238249
} else if (obj instanceof Timestamp) {
239250
return new Date(((Timestamp) obj).getTime());
240251
} else if (obj instanceof Date) {
@@ -243,6 +254,30 @@ public static Date getDate(Object obj) {
243254
throw new RuntimeException("not support type of " + obj.getClass() + " convert to Date.");
244255
}
245256

257+
258+
public static Date getDateFromStr(String dateStr) {
259+
// 2020-01-01 format
260+
if (DATE.matcher(dateStr).matches()) {
261+
// convert from local date to instant
262+
Instant instant = LocalDate.parse(dateStr).atTime(LocalTime.of(0, 0, 0, 0)).toInstant(ZoneOffset.UTC);
263+
// calculate the timezone offset in millis
264+
int offset = TimeZone.getDefault().getOffset(instant.toEpochMilli());
265+
// need to remove the offset since time has no TZ component
266+
return new Date(instant.toEpochMilli() - offset);
267+
} else if (DATETIME.matcher(dateStr).matches()) {
268+
// 2020-01-01T12:12:12Z format
269+
Instant instant = Instant.from(ISO_INSTANT.parse(dateStr));
270+
return new Date(instant.toEpochMilli());
271+
} else {
272+
try {
273+
// 2020-01-01 12:12:12.0 format
274+
return new Date(TIMESTAMP_FORMAT.parse(dateStr).getTime());
275+
} catch (ParseException e) {
276+
throw new RuntimeException("String convert to Date fail.");
277+
}
278+
}
279+
}
280+
246281
public static Timestamp getTimestamp(Object obj) {
247282
if (obj == null) {
248283
return null;
@@ -252,8 +287,32 @@ public static Timestamp getTimestamp(Object obj) {
252287
} else if (obj instanceof Date) {
253288
return new Timestamp(((Date) obj).getTime());
254289
} else if (obj instanceof String) {
255-
return new Timestamp(getDate(obj).getTime());
290+
return getTimestampFromStr(obj.toString());
256291
}
257292
throw new RuntimeException("not support type of " + obj.getClass() + " convert to Date.");
258293
}
294+
295+
public static Timestamp getTimestampFromStr(String timeStr) {
296+
if (DATETIME.matcher(timeStr).matches()) {
297+
Instant instant = Instant.from(ISO_INSTANT.parse(timeStr));
298+
return new Timestamp(instant.getEpochSecond() * MILLIS_PER_SECOND);
299+
} else {
300+
Date date = null;
301+
try {
302+
date = new Date(TIMESTAMP_FORMAT.parse(timeStr).getTime());
303+
} catch (ParseException e) {
304+
throw new RuntimeException("getTimestampFromStr error data is " + timeStr);
305+
}
306+
return new Timestamp(date.getTime());
307+
}
308+
}
309+
310+
public static String getStringFromTimestamp(Timestamp timestamp) {
311+
return TIMESTAMP_FORMAT.format(timestamp);
312+
}
313+
314+
public static String getStringFromDate(Date date) {
315+
return DATE_FORMAT.format(date);
316+
}
317+
259318
}
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
/*
2+
* Copyright (c) 2011-2017 Contributors to the Eclipse Foundation
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License 2.0 which is available at
6+
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
*/
11+
12+
package io.vertx.core.json;
13+
14+
import com.dtstack.flink.sql.util.MathUtil;
15+
import com.fasterxml.jackson.core.JsonGenerator;
16+
import com.fasterxml.jackson.core.JsonParser;
17+
import com.fasterxml.jackson.core.type.TypeReference;
18+
import com.fasterxml.jackson.databind.*;
19+
import com.fasterxml.jackson.databind.module.SimpleModule;
20+
import io.netty.buffer.ByteBufInputStream;
21+
import io.vertx.core.buffer.Buffer;
22+
23+
import java.io.DataInput;
24+
import java.io.IOException;
25+
import java.io.InputStream;
26+
import java.math.BigDecimal;
27+
import java.sql.Timestamp;
28+
import java.time.Instant;
29+
import java.util.Base64;
30+
import java.util.Date;
31+
import java.util.Iterator;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.stream.Stream;
35+
import java.util.stream.StreamSupport;
36+
37+
import static java.time.format.DateTimeFormatter.ISO_INSTANT;
38+
39+
/**
40+
* @author <a href="http://tfox.org">Tim Fox</a>
41+
*/
42+
public class Json {
43+
44+
public static ObjectMapper mapper = new ObjectMapper();
45+
public static ObjectMapper prettyMapper = new ObjectMapper();
46+
47+
static {
48+
// Non-standard JSON but we allow C style comments in our JSON
49+
mapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
50+
51+
prettyMapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
52+
prettyMapper.configure(SerializationFeature.INDENT_OUTPUT, true);
53+
54+
SimpleModule module = new SimpleModule();
55+
// custom types
56+
module.addSerializer(JsonObject.class, new JsonObjectSerializer());
57+
module.addSerializer(JsonArray.class, new JsonArraySerializer());
58+
// he have 2 extensions: RFC-7493
59+
module.addSerializer(Instant.class, new InstantSerializer());
60+
module.addSerializer(byte[].class, new ByteArraySerializer());
61+
62+
mapper.registerModule(module);
63+
prettyMapper.registerModule(module);
64+
}
65+
66+
/**
67+
* Encode a POJO to JSON using the underlying Jackson mapper.
68+
*
69+
* @param obj a POJO
70+
* @return a String containing the JSON representation of the given POJO.
71+
* @throws EncodeException if a property cannot be encoded.
72+
*/
73+
public static String encode(Object obj) throws EncodeException {
74+
try {
75+
return mapper.writeValueAsString(obj);
76+
} catch (Exception e) {
77+
throw new EncodeException("Failed to encode as JSON: " + e.getMessage());
78+
}
79+
}
80+
81+
/**
82+
* Encode a POJO to JSON using the underlying Jackson mapper.
83+
*
84+
* @param obj a POJO
85+
* @return a Buffer containing the JSON representation of the given POJO.
86+
* @throws EncodeException if a property cannot be encoded.
87+
*/
88+
public static Buffer encodeToBuffer(Object obj) throws EncodeException {
89+
try {
90+
return Buffer.buffer(mapper.writeValueAsBytes(obj));
91+
} catch (Exception e) {
92+
throw new EncodeException("Failed to encode as JSON: " + e.getMessage());
93+
}
94+
}
95+
96+
/**
97+
* Encode a POJO to JSON with pretty indentation, using the underlying Jackson mapper.
98+
*
99+
* @param obj a POJO
100+
* @return a String containing the JSON representation of the given POJO.
101+
* @throws EncodeException if a property cannot be encoded.
102+
*/
103+
public static String encodePrettily(Object obj) throws EncodeException {
104+
try {
105+
return prettyMapper.writeValueAsString(obj);
106+
} catch (Exception e) {
107+
throw new EncodeException("Failed to encode as JSON: " + e.getMessage());
108+
}
109+
}
110+
111+
/**
112+
* Decode a given JSON string to a POJO of the given class type.
113+
* @param str the JSON string.
114+
* @param clazz the class to map to.
115+
* @param <T> the generic type.
116+
* @return an instance of T
117+
* @throws DecodeException when there is a parsing or invalid mapping.
118+
*/
119+
public static <T> T decodeValue(String str, Class<T> clazz) throws DecodeException {
120+
try {
121+
return mapper.readValue(str, clazz);
122+
} catch (Exception e) {
123+
throw new DecodeException("Failed to decode: " + e.getMessage());
124+
}
125+
}
126+
127+
/**
128+
* Decode a given JSON string to a POJO of the given type.
129+
* @param str the JSON string.
130+
* @param type the type to map to.
131+
* @param <T> the generic type.
132+
* @return an instance of T
133+
* @throws DecodeException when there is a parsing or invalid mapping.
134+
*/
135+
public static <T> T decodeValue(String str, TypeReference<T> type) throws DecodeException {
136+
try {
137+
return mapper.readValue(str, type);
138+
} catch (Exception e) {
139+
throw new DecodeException("Failed to decode: " + e.getMessage(), e);
140+
}
141+
}
142+
143+
/**
144+
* Decode a given JSON buffer to a POJO of the given class type.
145+
* @param buf the JSON buffer.
146+
* @param type the type to map to.
147+
* @param <T> the generic type.
148+
* @return an instance of T
149+
* @throws DecodeException when there is a parsing or invalid mapping.
150+
*/
151+
public static <T> T decodeValue(Buffer buf, TypeReference<T> type) throws DecodeException {
152+
try {
153+
return mapper.readValue(new ByteBufInputStream(buf.getByteBuf()), type);
154+
} catch (Exception e) {
155+
throw new DecodeException("Failed to decode:" + e.getMessage(), e);
156+
}
157+
}
158+
159+
/**
160+
* Decode a given JSON buffer to a POJO of the given class type.
161+
* @param buf the JSON buffer.
162+
* @param clazz the class to map to.
163+
* @param <T> the generic type.
164+
* @return an instance of T
165+
* @throws DecodeException when there is a parsing or invalid mapping.
166+
*/
167+
public static <T> T decodeValue(Buffer buf, Class<T> clazz) throws DecodeException {
168+
try {
169+
return mapper.readValue((InputStream) new ByteBufInputStream(buf.getByteBuf()), clazz);
170+
} catch (Exception e) {
171+
throw new DecodeException("Failed to decode:" + e.getMessage(), e);
172+
}
173+
}
174+
175+
@SuppressWarnings("unchecked")
176+
static Object checkAndCopy(Object val, boolean copy) {
177+
if (val == null) {
178+
// OK
179+
} else if (val instanceof Number && !(val instanceof BigDecimal)) {
180+
// OK
181+
} else if (val instanceof Boolean) {
182+
// OK
183+
} else if (val instanceof String) {
184+
// OK
185+
} else if (val instanceof Character) {
186+
// OK
187+
} else if (val instanceof CharSequence) {
188+
val = val.toString();
189+
} else if (val instanceof JsonObject) {
190+
if (copy) {
191+
val = ((JsonObject) val).copy();
192+
}
193+
} else if (val instanceof JsonArray) {
194+
if (copy) {
195+
val = ((JsonArray) val).copy();
196+
}
197+
} else if (val instanceof Map) {
198+
if (copy) {
199+
val = (new JsonObject((Map)val)).copy();
200+
} else {
201+
val = new JsonObject((Map)val);
202+
}
203+
} else if (val instanceof List) {
204+
if (copy) {
205+
val = (new JsonArray((List)val)).copy();
206+
} else {
207+
val = new JsonArray((List)val);
208+
}
209+
} else if (val instanceof byte[]) {
210+
val = Base64.getEncoder().encodeToString((byte[]) val);
211+
} else if (val instanceof Instant) {
212+
val = ISO_INSTANT.format((Instant) val);
213+
} else if (val instanceof Timestamp) {
214+
val = MathUtil.getStringFromTimestamp((Timestamp) val);
215+
} else if (val instanceof Date) {
216+
val = MathUtil.getStringFromDate((java.sql.Date) val);
217+
} else {
218+
val = val.toString();
219+
}
220+
return val;
221+
}
222+
223+
static <T> Stream<T> asStream(Iterator<T> sourceIterator) {
224+
Iterable<T> iterable = () -> sourceIterator;
225+
return StreamSupport.stream(iterable.spliterator(), false);
226+
}
227+
228+
private static class JsonObjectSerializer extends JsonSerializer<JsonObject> {
229+
@Override
230+
public void serialize(JsonObject value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
231+
jgen.writeObject(value.getMap());
232+
}
233+
}
234+
235+
private static class JsonArraySerializer extends JsonSerializer<JsonArray> {
236+
@Override
237+
public void serialize(JsonArray value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
238+
jgen.writeObject(value.getList());
239+
}
240+
}
241+
242+
private static class InstantSerializer extends JsonSerializer<Instant> {
243+
@Override
244+
public void serialize(Instant value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
245+
jgen.writeString(ISO_INSTANT.format(value));
246+
}
247+
}
248+
249+
private static class ByteArraySerializer extends JsonSerializer<byte[]> {
250+
private final Base64.Encoder BASE64 = Base64.getEncoder();
251+
252+
@Override
253+
public void serialize(byte[] value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
254+
jgen.writeString(BASE64.encodeToString(value));
255+
}
256+
}
257+
}

0 commit comments

Comments
 (0)