Skip to content

Commit b262934

Browse files
committed
Merge branch 'main' into jh_auto_flush_improvements
2 parents 4ee853f + a3c3aad commit b262934

File tree

2 files changed

+55
-1
lines changed

2 files changed

+55
-1
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,18 @@ private boolean tryWriteLogicalType(String name, Schema schema, Object value) {
496496
return false;
497497
}
498498

499+
@Override
500+
public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
501+
if (sender != null) {
502+
flush(currentOffsets);
503+
return currentOffsets;
504+
} else {
505+
// null sender indicates there was an error and we cannot guarantee that the data was actually sent
506+
// returning empty map will cause the task to avoid committing offsets to Kafka
507+
return Collections.emptyMap();
508+
}
509+
}
510+
499511
@Override
500512
public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
501513
if (httpTransport) {

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -997,7 +997,7 @@ public void testDoNotIncludeKey(boolean useHttp) {
997997
}
998998

999999
@Test
1000-
public void testExtractKafkaIngestionTimestampAsField() {
1000+
public void testExtractKafkaIngestionTimestampAsField_designated() {
10011001
connect.kafka().createTopic(topicName, 1);
10021002
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
10031003
props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birth"); // the field is injected via InsertField SMT
@@ -1035,6 +1035,48 @@ public void testExtractKafkaIngestionTimestampAsField() {
10351035
httpPort);
10361036
}
10371037

1038+
@Test
1039+
public void testExtractKafkaIngestionTimestampAsField_nondesignated_schemaless() {
1040+
connect.kafka().createTopic(topicName, 1);
1041+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
1042+
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
1043+
props.put("value.converter.schemas.enable", "false");
1044+
props.put("transforms", "InsertField,TimestampConverter");
1045+
props.put("transforms.InsertField.type", "org.apache.kafka.connect.transforms.InsertField$Value");
1046+
props.put("transforms.InsertField.timestamp.field", "birth");
1047+
props.put("transforms.TimestampConverter.type", "org.apache.kafka.connect.transforms.TimestampConverter$Value");
1048+
props.put("transforms.TimestampConverter.field", "birth");
1049+
props.put("transforms.TimestampConverter.target.type", "Timestamp");
1050+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
1051+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
1052+
1053+
QuestDBUtils.assertSql(
1054+
"{\"ddl\":\"OK\"}",
1055+
"create table " + topicName + " (firstname string, lastname string, birth timestamp, ts timestamp) timestamp(ts) partition by day wal",
1056+
httpPort,
1057+
QuestDBUtils.Endpoint.EXEC);
1058+
1059+
// note: there is no birth field in the message payload
1060+
String personJson = "{\"firstname\":\"John\",\"lastname\":\"Doe\"}";
1061+
1062+
Map<String, Object> prodProps = new HashMap<>();
1063+
try (KafkaProducer<byte[], byte[]> producer = connect.kafka().createProducer(prodProps)) {
1064+
java.util.Date birth = new Calendar.Builder()
1065+
.setTimeZone(TimeZone.getTimeZone("UTC"))
1066+
.setDate(2022, 9, 23) // note: month is 0-based
1067+
.setTimeOfDay(13, 53, 59, 123)
1068+
.build().getTime();
1069+
long kafkaTimestamp = birth.getTime();
1070+
ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(topicName, null, kafkaTimestamp, "key".getBytes(), personJson.getBytes());
1071+
producer.send(producerRecord);
1072+
}
1073+
1074+
QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"birth\"\r\n"
1075+
+ "\"John\",\"Doe\",\"2022-10-23T13:53:59.123000Z\"\r\n",
1076+
"select firstname, lastname, birth from " + topicName,
1077+
httpPort);
1078+
}
1079+
10381080

10391081
@ParameterizedTest
10401082
@ValueSource(booleans = {true, false})

0 commit comments

Comments
 (0)