Skip to content

Commit eec4dfa

Browse files
committed
(feat): skip tombstone records
1 parent e104300 commit eec4dfa

File tree

2 files changed

+43
-1
lines changed

2 files changed

+43
-1
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,12 @@ private void closeSenderSilently() {
314314
private void handleSingleRecord(SinkRecord record) {
315315
assert timestampColumnValue == Long.MIN_VALUE;
316316

317+
Object recordValue = record.value();
318+
if (recordValue == null) {
319+
// ignore tombstones
320+
return;
321+
}
322+
317323
CharSequence tableName = recordToTable.apply(record);
318324
if (tableName == null || tableName.equals("")) {
319325
throw new InvalidDataException("Table name cannot be empty");
@@ -324,7 +330,7 @@ private void handleSingleRecord(SinkRecord record) {
324330
if (config.isIncludeKey()) {
325331
handleObject(config.getKeyPrefix(), record.keySchema(), record.key(), PRIMITIVE_KEY_FALLBACK_NAME);
326332
}
327-
handleObject(config.getValuePrefix(), record.valueSchema(), record.value(), PRIMITIVE_VALUE_FALLBACK_NAME);
333+
handleObject(config.getValuePrefix(), record.valueSchema(), recordValue, PRIMITIVE_VALUE_FALLBACK_NAME);
328334
} catch (InvalidDataException ex) {
329335
if (httpTransport) {
330336
sender.cancelRow();

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,42 @@ public void testTableTemplateWithKey_withSchema(boolean useHttp) {
198198
httpPort);
199199
}
200200

201+
@Test
202+
public void testTombstoneRecordFilter() {
203+
connect.kafka().createTopic(topicName, 1);
204+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
205+
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
206+
207+
// this FILTER transform is no longer needed since the connector filters out tombstone records by default
208+
// props.put("transforms", "filterTombstones");
209+
// props.put("transforms.filterTombstones.type", "org.apache.kafka.connect.transforms.Filter");
210+
// props.put("transforms.filterTombstones.predicate", "isTombstone");
211+
// props.put("predicates", "isTombstone");
212+
// props.put("predicates.isTombstone.type", "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone");
213+
214+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
215+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
216+
217+
Schema schema = SchemaBuilder.struct().name("com.example.Person")
218+
.field("firstname", Schema.STRING_SCHEMA)
219+
.field("lastname", Schema.STRING_SCHEMA)
220+
.field("age", Schema.INT8_SCHEMA)
221+
.build();
222+
223+
Struct struct = new Struct(schema)
224+
.put("firstname", "John")
225+
.put("lastname", "Doe")
226+
.put("age", (byte) 42);
227+
228+
connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct)));
229+
connect.kafka().produce(topicName, "key", null);
230+
231+
QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
232+
+ "\"John\",\"Doe\",42\r\n",
233+
"select firstname,lastname,age from " + topicName,
234+
httpPort);
235+
}
236+
201237
@ParameterizedTest
202238
@ValueSource(booleans = {true, false})
203239
public void testTableTemplateWithKey_schemaless(boolean useHttp) {

0 commit comments

Comments
 (0)