Skip to content

Commit ffcb177

Browse files
committed
catch more cases of bad data
also - improved test
1 parent d15b910 commit ffcb177

File tree

2 files changed

+26
-13
lines changed

2 files changed

+26
-13
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ private void onTcpSenderException(Exception e) {
247247

248248
private void onHttpSenderException(Exception e) {
249249
closeSenderSilently();
250-
if (e.getMessage().contains("failed to parse line protocol")) { // hack to detect data parsing errors
250+
if (e.getMessage() != null && e.getMessage().contains("failed to parse line protocol") || e.getMessage().contains("cast error")) { // hack to detect data parsing errors
251251
// ok, we have a parsing error, let's try to send records one by one to find the problematic record
252252
// and we will report it to the error handler. the rest of the records will make it to QuestDB
253253
sender = createSender();

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -265,18 +265,31 @@ public void testDeadLetterQueue_badColumnType() {
265265
httpPort,
266266
QuestDBUtils.Endpoint.EXEC);
267267

268-
connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}");
269-
connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}");
270-
271-
ConsumerRecords<byte[], byte[]> fetchedRecords = connect.kafka().consume(1, 60_000, "dlq");
272-
Assertions.assertEquals(1, fetchedRecords.count());
273-
ConsumerRecord<byte[], byte[]> dqlRecord = fetchedRecords.iterator().next();
274-
Assertions.assertEquals("{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}", new String(dqlRecord.value()));
275-
276-
QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n"
277-
+ "\"John\",\"Doe\",42\r\n",
278-
"select firstname,lastname,age from " + topicName,
279-
1000, httpPort);
268+
String goodRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}";
269+
String goodRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d042\"}";
270+
String goodRecordC = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d043\"}";
271+
String badRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}";
272+
String badRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":\"not a number\",\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}";
273+
274+
// interleave good and bad records
275+
connect.kafka().produce(topicName, "key", goodRecordA);
276+
connect.kafka().produce(topicName, "key", badRecordA);
277+
connect.kafka().produce(topicName, "key", goodRecordB);
278+
connect.kafka().produce(topicName, "key", badRecordB);
279+
connect.kafka().produce(topicName, "key", goodRecordC);
280+
281+
ConsumerRecords<byte[], byte[]> fetchedRecords = connect.kafka().consume(2, 60_000, "dlq");
282+
Assertions.assertEquals(2, fetchedRecords.count());
283+
Iterator<ConsumerRecord<byte[], byte[]>> iterator = fetchedRecords.iterator();
284+
Assertions.assertEquals(badRecordA, new String(iterator.next().value()));
285+
Assertions.assertEquals(badRecordB, new String(iterator.next().value()));
286+
287+
QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\",\"id\"\r\n"
288+
+ "\"John\",\"Doe\",42,ad956a45-a55b-441e-b80d-023a2bf5d041\r\n"
289+
+ "\"John\",\"Doe\",42,ad956a45-a55b-441e-b80d-023a2bf5d042\r\n"
290+
+ "\"John\",\"Doe\",42,ad956a45-a55b-441e-b80d-023a2bf5d043\r\n",
291+
"select firstname,lastname,age, id from " + topicName,
292+
httpPort);
280293

281294
}
282295

0 commit comments

Comments
 (0)