Skip to content

Commit 1645bf4

Browse files
authored
Merge pull request #8 from questdb/feat/kafka_native_timestamps
feat: optionally use Kafka timestamps as designated timestamps
2 parents 4961939 + 4e537c9 commit 1645bf4

File tree

4 files changed

+84
-3
lines changed

4 files changed

+84
-3
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkConnector.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.questdb.kafka;
22

3+
import org.apache.kafka.common.config.Config;
34
import org.apache.kafka.common.config.ConfigDef;
45
import org.apache.kafka.connect.connector.Task;
56
import org.apache.kafka.connect.sink.SinkConnector;
@@ -43,4 +44,14 @@ public void stop() {
4344
public ConfigDef config() {
4445
return QuestDBSinkConnectorConfig.conf();
4546
}
47+
48+
@Override
49+
public Config validate(Map<String, String> connectorConfigs) {
50+
String s = connectorConfigs.get(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG);
51+
if (Boolean.parseBoolean(s) && connectorConfigs.get(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG) != null) {
52+
throw new IllegalArgumentException("Cannot use " + QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG
53+
+ " with " + QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG +". These options are mutually exclusive.");
54+
}
55+
return super.validate(connectorConfigs);
56+
}
4657
}

connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig {
3333
public static final String DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG = "timestamp.field.name";
3434
private static final String DESIGNATED_TIMESTAMP_COLUMN_NAME_DOC = "Designated timestamp field name";
3535

36+
public static final String DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG = "timestamp.kafka.native";
37+
private static final String DESIGNATED_TIMESTAMP_KAFKA_NATIVE_DOC = "Use Kafka record timestamps as designated timestamp. Mutually exclusive with timestamp.field.name";
38+
3639
public static final String TIMESTAMP_STRING_FIELDS = "timestamp.string.fields";
3740
private static final String TIMESTAMP_STRING_FIELDS_DOC = "Comma-separated list of string fields that should be parsed as timestamps.";
3841

@@ -94,7 +97,8 @@ public static ConfigDef conf() {
9497
.define(RETRY_BACKOFF_MS, Type.LONG, 3_000, Importance.LOW, RETRY_BACKOFF_MS_DOC)
9598
.define(MAX_RETRIES, Type.INT, 10, Importance.LOW, MAX_RETRIES_DOC)
9699
.define(TIMESTAMP_FORMAT, Type.STRING, DEFAULT_TIMESTAMP_FORMAT, TimestampFormatValidator.INSTANCE, Importance.MEDIUM, TIMESTAMP_FORMAT_DOC)
97-
.define(TIMESTAMP_STRING_FIELDS, Type.STRING, null, Importance.MEDIUM, TIMESTAMP_STRING_FIELDS_DOC);
100+
.define(TIMESTAMP_STRING_FIELDS, Type.STRING, null, Importance.MEDIUM, TIMESTAMP_STRING_FIELDS_DOC)
101+
.define(DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, DESIGNATED_TIMESTAMP_KAFKA_NATIVE_DOC);
98102
}
99103

100104
public String getHost() {
@@ -121,6 +125,10 @@ public String getValuePrefix() {
121125
return getString(VALUE_PREFIX_CONFIG);
122126
}
123127

128+
public boolean isDesignatedTimestampKafkaNative() {
129+
return getBoolean(DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG);
130+
}
131+
124132
public boolean isSkipUnsupportedTypes() {
125133
return getBoolean(SKIP_UNSUPPORTED_TYPES_CONFIG);
126134
}

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public final class QuestDBSinkTask extends SinkTask {
4646
private int remainingRetries;
4747
private long batchesSinceLastError = 0;
4848
private DateFormat dataFormat;
49+
private boolean kafkaTimestampsEnabled;
4950

5051
@Override
5152
public String version() {
@@ -78,6 +79,7 @@ public void start(Map<String, String> map) {
7879
this.sender = createSender();
7980
this.remainingRetries = config.getMaxRetries();
8081
this.timestampColumnName = config.getDesignatedTimestampColumnName();
82+
this.kafkaTimestampsEnabled = config.isDesignatedTimestampKafkaNative();
8183
this.timestampUnits = config.getTimestampUnitsOrNull();
8284
}
8385

@@ -176,11 +178,17 @@ private void handleSingleRecord(SinkRecord record) {
176178
}
177179
handleObject(config.getValuePrefix(), record.valueSchema(), record.value(), PRIMITIVE_VALUE_FALLBACK_NAME);
178180

181+
if (kafkaTimestampsEnabled) {
182+
timestampColumnValue = TimeUnit.MILLISECONDS.toNanos(record.timestamp());
183+
}
179184
if (timestampColumnValue == Long.MIN_VALUE) {
180185
sender.atNow();
181186
} else {
182-
sender.at(timestampColumnValue);
183-
timestampColumnValue = Long.MIN_VALUE;
187+
try {
188+
sender.at(timestampColumnValue);
189+
} finally {
190+
timestampColumnValue = Long.MIN_VALUE;
191+
}
184192
}
185193
}
186194

@@ -223,6 +231,7 @@ private boolean isDesignatedColumnName(String name, String fallbackName) {
223231

224232
private void handleObject(String name, Schema schema, Object value, String fallbackName) {
225233
assert !name.isEmpty() || !fallbackName.isEmpty();
234+
226235
if (isDesignatedColumnName(name, fallbackName)) {
227236
assert timestampColumnValue == Long.MIN_VALUE;
228237
if (value == null) {

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,19 @@
22

33
import org.apache.kafka.clients.consumer.ConsumerRecord;
44
import org.apache.kafka.clients.consumer.ConsumerRecords;
5+
import org.apache.kafka.clients.producer.KafkaProducer;
6+
import org.apache.kafka.clients.producer.ProducerRecord;
57
import org.apache.kafka.connect.data.Date;
68
import org.apache.kafka.connect.data.Decimal;
79
import org.apache.kafka.connect.data.Schema;
810
import org.apache.kafka.connect.data.SchemaBuilder;
911
import org.apache.kafka.connect.data.Struct;
1012
import org.apache.kafka.connect.data.Time;
1113
import org.apache.kafka.connect.data.Timestamp;
14+
import org.apache.kafka.connect.errors.ConnectException;
1215
import org.apache.kafka.connect.json.JsonConverter;
1316
import org.apache.kafka.connect.runtime.AbstractStatus;
17+
import org.apache.kafka.connect.runtime.Connect;
1418
import org.apache.kafka.connect.runtime.ConnectorConfig;
1519
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
1620
import org.apache.kafka.connect.storage.Converter;
@@ -46,6 +50,8 @@
4650
import static java.util.concurrent.TimeUnit.SECONDS;
4751
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
4852
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
53+
import static org.hamcrest.CoreMatchers.containsString;
54+
import static org.hamcrest.MatcherAssert.assertThat;
4955
import static org.junit.jupiter.api.Assertions.fail;
5056

5157
@Testcontainers
@@ -457,6 +463,53 @@ private void testTimestampUnitResolution0(String mode) {
457463
"select firstname,lastname,timestamp from " + topicName);
458464
}
459465

466+
@Test
467+
public void testKafkaNativeTimestampsAndExplicitDesignatedFieldTimestampMutuallyExclusive() {
468+
Map<String, String> props = baseConnectorProps(topicName);
469+
props.put("value.converter.schemas.enable", "false");
470+
props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "born");
471+
props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, "true");
472+
try {
473+
connect.configureConnector(CONNECTOR_NAME, props);
474+
fail("Expected ConnectException");
475+
} catch (ConnectException e) {
476+
assertThat(e.getMessage(), containsString("timestamp.field.name with timestamp.kafka.native"));
477+
}
478+
}
479+
480+
@Test
481+
public void testKafkaNativeTimestamp() {
482+
connect.kafka().createTopic(topicName, 1);
483+
Map<String, String> props = baseConnectorProps(topicName);
484+
props.put("value.converter.schemas.enable", "false");
485+
props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, "true");
486+
487+
connect.configureConnector(CONNECTOR_NAME, props);
488+
assertConnectorTaskRunningEventually();
489+
490+
QuestDBUtils.assertSql(questDBContainer,
491+
"{\"ddl\":\"OK\"}\n",
492+
"create table " + topicName + " (firstname string, lastname string, born timestamp) timestamp(born)",
493+
QuestDBUtils.Endpoint.EXEC);
494+
495+
java.util.Date birth = new Calendar.Builder()
496+
.setTimeZone(TimeZone.getTimeZone("UTC"))
497+
.setDate(2022, 9, 23) // note: month is 0-based, so it's October and not November
498+
.setTimeOfDay(13, 53, 59, 123)
499+
.build().getTime();
500+
501+
Map<String, Object> prodProps = new HashMap<>();
502+
try (KafkaProducer<byte[], byte[]> producer = connect.kafka().createProducer(prodProps)) {
503+
String val = "{\"firstname\":\"John\",\"lastname\":\"Doe\"}";
504+
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topicName, null, birth.getTime(), null, val.getBytes());
505+
producer.send(record);
506+
}
507+
508+
QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"born\"\r\n" +
509+
"\"John\",\"Doe\",\"2022-10-23T13:53:59.123000Z\"\r\n",
510+
"select * from " + topicName);
511+
}
512+
460513
@Test
461514
public void testTimestampSMT_parseTimestamp_schemaLess() {
462515
connect.kafka().createTopic(topicName, 1);

0 commit comments

Comments
 (0)