Skip to content

Commit 8fbe05a

Browse files
committed
chore: add a test with extracting a timestamp field from message metadata
1 parent ea0cbdd commit 8fbe05a

File tree

1 file changed

+40
-0
lines changed

1 file changed

+40
-0
lines changed

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -996,6 +996,46 @@ public void testDoNotIncludeKey(boolean useHttp) {
996996
httpPort);
997997
}
998998

999+
@Test
1000+
public void testExtractKafkaIngestionTimestampAsField() {
1001+
connect.kafka().createTopic(topicName, 1);
1002+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
1003+
props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birth"); // the field is injected via InsertField SMT
1004+
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
1005+
props.put("transforms", "InsertField");
1006+
props.put("transforms.InsertField.type", "org.apache.kafka.connect.transforms.InsertField$Value");
1007+
props.put("transforms.InsertField.timestamp.field", "birth");
1008+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
1009+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
1010+
1011+
// note: there is no birth field in the message payload
1012+
Schema schema = SchemaBuilder.struct().name("com.example.Person")
1013+
.field("firstname", Schema.STRING_SCHEMA)
1014+
.field("lastname", Schema.STRING_SCHEMA)
1015+
.build();
1016+
Struct struct = new Struct(schema)
1017+
.put("firstname", "John")
1018+
.put("lastname", "Doe");
1019+
1020+
Map<String, Object> prodProps = new HashMap<>();
1021+
try (KafkaProducer<byte[], byte[]> producer = connect.kafka().createProducer(prodProps)) {
1022+
java.util.Date birth = new Calendar.Builder()
1023+
.setTimeZone(TimeZone.getTimeZone("UTC"))
1024+
.setDate(2022, 9, 23) // note: month is 0-based
1025+
.setTimeOfDay(13, 53, 59, 123)
1026+
.build().getTime();
1027+
long kafkaTimestamp = birth.getTime();
1028+
ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(topicName, null, kafkaTimestamp, "key".getBytes(), new String(converter.fromConnectData(topicName, schema, struct)).getBytes());
1029+
producer.send(producerRecord);
1030+
}
1031+
1032+
QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"timestamp\"\r\n"
1033+
+ "\"John\",\"Doe\",\"2022-10-23T13:53:59.123000Z\"\r\n",
1034+
"select * from " + topicName,
1035+
httpPort);
1036+
}
1037+
1038+
9991039
@ParameterizedTest
10001040
@ValueSource(booleans = {true, false})
10011041
public void testJsonNoSchema(boolean useHttp) {

0 commit comments

Comments
 (0)