Skip to content

Commit 4961939

Browse files
authored
Merge pull request #7 from questdb/feat/timestamp_parsing
feat: native support for parsing string timestamps
2 parents b229a92 + d5d3aa8 commit 4961939

File tree

5 files changed

+145
-15
lines changed

5 files changed

+145
-15
lines changed

connector/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@
120120
<goal>kafka-connect</goal>
121121
</goals>
122122
<configuration>
123-
<title>Kafka Connect QuestDB</title>
123+
<title>QuestDB Kafka Connector</title>
124124
<documentationUrl>https://questdb.io/docs/third-party-tools/kafka/questdb-kafka/</documentationUrl>
125125
<description>
126126
QuestDB Sink connector for Apache Kafka. QuestDB is a columnar time-series database with

connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,11 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig {
3333
public static final String DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG = "timestamp.field.name";
3434
private static final String DESIGNATED_TIMESTAMP_COLUMN_NAME_DOC = "Designated timestamp field name";
3535

36+
public static final String TIMESTAMP_STRING_FIELDS = "timestamp.string.fields";
37+
private static final String TIMESTAMP_STRING_FIELDS_DOC = "Comma-separated list of string fields that should be parsed as timestamps.";
38+
3639
public static final String TIMESTAMP_UNITS_CONFIG = "timestamp.units";
37-
private static final String TIMESTAMP_UNITS_DOC = "Units of timestamp field. Possible values: auto, millis, micros, nanos";
40+
private static final String TIMESTAMP_UNITS_DOC = "Units of designated timestamp field. Possible values: auto, millis, micros, nanos";
3841

3942
public static final String INCLUDE_KEY_CONFIG = "include.key";
4043
private static final String INCLUDE_KEY_DOC = "Include key in the table";
@@ -60,6 +63,11 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig {
6063
public static final String MAX_RETRIES = "max.retries";
6164
private static final String MAX_RETRIES_DOC = "The maximum number of times to retry on errors before failing the task";
6265

66+
public static final String TIMESTAMP_FORMAT = "timestamp.string.format";
67+
private static final String TIMESTAMP_FORMAT_DOC = "Timestamp format. Used when parsing timestamp string fields";
68+
69+
private static final String DEFAULT_TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'";
70+
6371
public QuestDBSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
6472
super(config, parsedConfig);
6573
}
@@ -84,7 +92,9 @@ public static ConfigDef conf() {
8492
.define(TLS, Type.BOOLEAN, false, Importance.MEDIUM, TLS_DOC)
8593
.define(TIMESTAMP_UNITS_CONFIG, Type.STRING, "auto", ConfigDef.ValidString.in("auto", "millis", "micros", "nanos"), Importance.LOW, TIMESTAMP_UNITS_DOC, null, -1, ConfigDef.Width.NONE, TIMESTAMP_UNITS_CONFIG, Collections.emptyList(), TimestampUnitsRecommender.INSTANCE)
8694
.define(RETRY_BACKOFF_MS, Type.LONG, 3_000, Importance.LOW, RETRY_BACKOFF_MS_DOC)
87-
.define(MAX_RETRIES, Type.INT, 10, Importance.LOW, MAX_RETRIES_DOC);
95+
.define(MAX_RETRIES, Type.INT, 10, Importance.LOW, MAX_RETRIES_DOC)
96+
.define(TIMESTAMP_FORMAT, Type.STRING, DEFAULT_TIMESTAMP_FORMAT, TimestampFormatValidator.INSTANCE, Importance.MEDIUM, TIMESTAMP_FORMAT_DOC)
97+
.define(TIMESTAMP_STRING_FIELDS, Type.STRING, null, Importance.MEDIUM, TIMESTAMP_STRING_FIELDS_DOC);
8898
}
8999

90100
public String getHost() {
@@ -95,6 +105,14 @@ public String getTable() {
95105
return getString(TABLE_CONFIG);
96106
}
97107

108+
public String getTimestampFormat() {
109+
return getString(TIMESTAMP_FORMAT);
110+
}
111+
112+
public String getTimestampStringFields() {
113+
return getString(TIMESTAMP_STRING_FIELDS);
114+
}
115+
98116
public String getKeyPrefix() {
99117
return getString(KEY_PREFIX_CONFIG);
100118
}
@@ -174,6 +192,16 @@ public boolean visible(String name, Map<String, Object> parsedConfig) {
174192
}
175193
}
176194

195+
private static class TimestampFormatValidator implements ConfigDef.Validator {
196+
private static final TimestampFormatValidator INSTANCE = new TimestampFormatValidator();
197+
@Override
198+
public void ensureValid(String name, Object value) {
199+
if (!(value instanceof String)) {
200+
throw new ConfigException(name, value, "Timestamp format must be a string");
201+
}
202+
}
203+
}
204+
177205
private static class TablenameValidator implements ConfigDef.Validator {
178206
private static final TablenameValidator INSTANCE = new TablenameValidator();
179207
@Override

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22

33
import io.questdb.client.Sender;
44
import io.questdb.cutlass.line.LineSenderException;
5+
import io.questdb.std.NumericException;
6+
import io.questdb.std.datetime.DateFormat;
57
import io.questdb.std.datetime.microtime.Timestamps;
8+
import io.questdb.std.datetime.millitime.DateFormatUtils;
69
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
710
import org.apache.kafka.common.TopicPartition;
811
import org.apache.kafka.connect.data.Date;
@@ -39,8 +42,10 @@ public final class QuestDBSinkTask extends SinkTask {
3942
private long timestampColumnValue = Long.MIN_VALUE;
4043
private TimeUnit timestampUnits;
4144
private Set<CharSequence> doubleColumns;
45+
private Set<String> stringTimestampColumns;
4246
private int remainingRetries;
4347
private long batchesSinceLastError = 0;
48+
private DateFormat dataFormat;
4449

4550
@Override
4651
public String version() {
@@ -50,6 +55,17 @@ public String version() {
5055
@Override
5156
public void start(Map<String, String> map) {
5257
this.config = new QuestDBSinkConnectorConfig(map);
58+
String timestampStringFields = config.getTimestampStringFields();
59+
if (timestampStringFields != null) {
60+
stringTimestampColumns = new HashSet<>();
61+
for (String symbolColumn : timestampStringFields.split(",")) {
62+
stringTimestampColumns.add(symbolColumn.trim());
63+
}
64+
dataFormat = TimestampParserCompiler.compilePattern(config.getTimestampFormat());
65+
} else {
66+
stringTimestampColumns = Collections.emptySet();
67+
}
68+
5369
String doubleColumnsConfig = config.getDoubleColumns();
5470
if (doubleColumnsConfig == null) {
5571
doubleColumns = Collections.emptySet();
@@ -233,6 +249,10 @@ private long resolveDesignatedTimestampColumnValue(Object value, Schema schema)
233249
log.debug("Timestamp column value is a java.util.Date");
234250
return TimeUnit.MILLISECONDS.toNanos(((java.util.Date) value).getTime());
235251
}
252+
if (value instanceof String) {
253+
log.debug("Timestamp column value is a string");
254+
return parseToMicros((String) value) * 1000;
255+
}
236256
if (!(value instanceof Long)) {
237257
throw new ConnectException("Unsupported timestamp column type: " + value.getClass());
238258
}
@@ -255,7 +275,13 @@ private void writePhysicalTypeWithoutSchema(String name, Object value, String fa
255275
}
256276
String actualName = name.isEmpty() ? fallbackName : sanitizeName(name);
257277
if (value instanceof String) {
258-
sender.stringColumn(actualName, (String) value);
278+
String stringVal = (String) value;
279+
if (stringTimestampColumns.contains(actualName)) {
280+
long timestamp = parseToMicros(stringVal);
281+
sender.timestampColumn(actualName, timestamp);
282+
} else {
283+
sender.stringColumn(actualName, stringVal);
284+
}
259285
} else if (value instanceof Long) {
260286
Long longValue = (Long) value;
261287
if (doubleColumns.contains(actualName)) {
@@ -284,6 +310,16 @@ private void writePhysicalTypeWithoutSchema(String name, Object value, String fa
284310
}
285311
}
286312

313+
private long parseToMicros(String timestamp) {
314+
try {
315+
return dataFormat.parse(timestamp, DateFormatUtils.enLocale);
316+
} catch (NumericException e) {
317+
throw new ConnectException("Cannot parse timestamp: " + timestamp + " with the configured format '" + config.getTimestampFormat() +"' use '"
318+
+ QuestDBSinkConnectorConfig.TIMESTAMP_FORMAT + "' to configure the right timestamp format. " +
319+
"See https://questdb.io/docs/reference/function/date-time/#date-and-timestamp-format for timestamp parser documentation. ", e);
320+
}
321+
}
322+
287323
private static String sanitizeName(String name) {
288324
// todo: proper implementation
289325
return name.replace('.', '_');
@@ -315,7 +351,12 @@ private boolean tryWritePhysicalTypeFromSchema(String name, Schema schema, Objec
315351
break;
316352
case STRING:
317353
String s = (String) value;
318-
sender.stringColumn(sanitizedName, s);
354+
if (stringTimestampColumns.contains(primitiveTypesName)) {
355+
long timestamp = parseToMicros(s);
356+
sender.timestampColumn(sanitizedName, timestamp);
357+
} else {
358+
sender.stringColumn(sanitizedName, s);
359+
}
319360
break;
320361
case STRUCT:
321362
handleStruct(name, (Struct) value, schema);
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package io.questdb.kafka;
2+
3+
import io.questdb.std.CharSequenceObjHashMap;
4+
import io.questdb.std.datetime.DateFormat;
5+
import io.questdb.std.datetime.microtime.TimestampFormatCompiler;
6+
7+
class TimestampParserCompiler {
8+
private static TimestampFormatCompiler compiler;
9+
private static final Object MUTEX = new Object();
10+
// we assume that there will just a few patterns hence to issue with unbounded cache growth
11+
private static CharSequenceObjHashMap<DateFormat> cache;
12+
13+
public static DateFormat compilePattern(String timestampPattern) {
14+
synchronized (MUTEX) {
15+
if (compiler == null) {
16+
compiler = new TimestampFormatCompiler();
17+
// DateFormat instances are thread-safe, so we can cache them and use for multiple workers
18+
cache = new CharSequenceObjHashMap<>();
19+
}
20+
DateFormat format = cache.get(timestampPattern);
21+
if (format != null) {
22+
return format;
23+
}
24+
format = compiler.compile(timestampPattern);
25+
cache.put(timestampPattern, format);
26+
return format;
27+
}
28+
}
29+
}

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -458,14 +458,14 @@ private void testTimestampUnitResolution0(String mode) {
458458
}
459459

460460
@Test
461-
public void testTimestampSMT_parseMicroseconds_schemaLess() {
461+
public void testTimestampSMT_parseTimestamp_schemaLess() {
462462
connect.kafka().createTopic(topicName, 1);
463463
Map<String, String> props = baseConnectorProps(topicName);
464464
props.put("value.converter.schemas.enable", "false");
465465
props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "born");
466466
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
467467

468-
String timestampFormat = "yyyy-MM-dd HH:mm:ss.SSSSSS z";
468+
String timestampFormat = "yyyy-MM-dd HH:mm:ss.SSS z";
469469
props.put("transforms", "Timestamp-born,Timestamp-death");
470470
props.put("transforms.Timestamp-born.type", "org.apache.kafka.connect.transforms.TimestampConverter$Value");
471471
props.put("transforms.Timestamp-born.field", "born");
@@ -485,8 +485,8 @@ public void testTimestampSMT_parseMicroseconds_schemaLess() {
485485
"create table " + topicName + " (firstname string, lastname string, death timestamp, born timestamp) timestamp(born)",
486486
QuestDBUtils.Endpoint.EXEC);
487487

488-
String birthTimestamp = "1985-08-02 16:41:55.402095 UTC";
489-
String deadTimestamp = "2023-08-02 16:41:55.402095 UTC";
488+
String birthTimestamp = "1985-08-02 16:41:55.402 UTC";
489+
String deadTimestamp = "2023-08-02 16:41:55.402 UTC";
490490
connect.kafka().produce(topicName, "foo",
491491
"{\"firstname\":\"John\""
492492
+ ",\"lastname\":\"Doe\""
@@ -495,18 +495,18 @@ public void testTimestampSMT_parseMicroseconds_schemaLess() {
495495
);
496496

497497
QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"death\",\"born\"\r\n" +
498-
"\"John\",\"Doe\",\"2023-08-02T16:48:37.095000Z\",\"1985-08-02T16:48:37.095000Z\"\r\n",
498+
"\"John\",\"Doe\",\"2023-08-02T16:41:55.402000Z\",\"1985-08-02T16:41:55.402000Z\"\r\n",
499499
"select * from " + topicName);
500500
}
501501

502502
@Test
503-
public void testTimestampSMT_parseMicroseconds_withSchema() {
503+
public void testTimestampSMT_parseTimestamp_withSchema() {
504504
connect.kafka().createTopic(topicName, 1);
505505
Map<String, String> props = baseConnectorProps(topicName);
506506
props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "born");
507507
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
508508

509-
String timestampFormat = "yyyy-MM-dd HH:mm:ss.SSSSSS z";
509+
String timestampFormat = "yyyy-MM-dd HH:mm:ss.SSS z";
510510
props.put("transforms", "Timestamp-born,Timestamp-death");
511511
props.put("transforms.Timestamp-born.type", "org.apache.kafka.connect.transforms.TimestampConverter$Value");
512512
props.put("transforms.Timestamp-born.field", "born");
@@ -530,14 +530,14 @@ public void testTimestampSMT_parseMicroseconds_withSchema() {
530530
Struct struct = new Struct(schema)
531531
.put("firstname", "John")
532532
.put("lastname", "Doe")
533-
.put("born", "1985-08-02 16:41:55.402095 UTC")
534-
.put("death", "2023-08-02 16:41:55.402095 UTC");
533+
.put("born", "1985-08-02 16:41:55.402 UTC")
534+
.put("death", "2023-08-02 16:41:55.402 UTC");
535535

536536

537537
connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct)));
538538

539539
QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"death\",\"timestamp\"\r\n" +
540-
"\"John\",\"Doe\",\"2023-08-02T16:48:37.095000Z\",\"1985-08-02T16:48:37.095000Z\"\r\n",
540+
"\"John\",\"Doe\",\"2023-08-02T16:41:55.402000Z\",\"1985-08-02T16:41:55.402000Z\"\r\n",
541541
"select * from " + topicName);
542542
}
543543

@@ -740,6 +740,38 @@ public void testPrimitiveKey() {
740740
"select firstname, lastname, age, key from " + topicName);
741741
}
742742

743+
@Test
744+
public void testParsingStringTimestamp() {
745+
connect.kafka().createTopic(topicName, 1);
746+
Map<String, String> props = baseConnectorProps(topicName);
747+
props.put("value.converter.schemas.enable", "false");
748+
props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "born");
749+
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
750+
props.put(QuestDBSinkConnectorConfig.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss.SSSUUU z");
751+
props.put(QuestDBSinkConnectorConfig.TIMESTAMP_STRING_FIELDS, "born,death");
752+
753+
connect.configureConnector(CONNECTOR_NAME, props);
754+
assertConnectorTaskRunningEventually();
755+
756+
QuestDBUtils.assertSql(questDBContainer,
757+
"{\"ddl\":\"OK\"}\n",
758+
"create table " + topicName + " (firstname string, lastname string, death timestamp, born timestamp) timestamp(born)",
759+
QuestDBUtils.Endpoint.EXEC);
760+
761+
String birthTimestamp = "1985-08-02 16:41:55.402095 UTC";
762+
String deadTimestamp = "2023-08-02 16:41:55.402095 UTC";
763+
connect.kafka().produce(topicName, "foo",
764+
"{\"firstname\":\"John\""
765+
+ ",\"lastname\":\"Doe\""
766+
+ ",\"death\":\"" + deadTimestamp + "\""
767+
+ ",\"born\":\"" + birthTimestamp + "\"}"
768+
);
769+
770+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"death\",\"born\"\r\n" +
771+
"\"John\",\"Doe\",\"2023-08-02T16:41:55.402095Z\",\"1985-08-02T16:41:55.402095Z\"\r\n",
772+
"select * from " + topicName);
773+
}
774+
743775
@Test
744776
public void testCustomPrefixWithPrimitiveKeyAndValues() {
745777
connect.kafka().createTopic(topicName, 1);

0 commit comments

Comments
 (0)