Skip to content

Commit 7bb855b

Browse files
committed
fix: records with data type mismatching goes to DLQ instead of failing the connector
fixes #26 this is a first impl. it could be optimized further, but I assume bad data are rare and it already does the job as it is. there is a potential issue with `inflightSinkRecords` referencing all in-flight data. in theory, this can +- double memory consumption. but we need the original SinkRecord so we can send them to DLQ.
1 parent 59e21ae commit 7bb855b

File tree

2 files changed

+76
-7
lines changed

2 files changed

+76
-7
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.questdb.cutlass.http.client.HttpClientException;
55
import io.questdb.cutlass.line.LineSenderException;
66
import io.questdb.std.NumericException;
7+
import io.questdb.std.ObjList;
78
import io.questdb.std.datetime.DateFormat;
89
import io.questdb.std.datetime.microtime.Timestamps;
910
import io.questdb.std.datetime.millitime.DateFormatUtils;
@@ -48,6 +49,7 @@ public final class QuestDBSinkTask extends SinkTask {
4849
private long nextFlushNanos;
4950
private int pendingRows;
5051
private final FlushConfig flushConfig = new FlushConfig();
52+
private final ObjList<SinkRecord> inflightSinkRecords = new ObjList<>();
5153

5254
@Override
5355
public String version() {
@@ -159,6 +161,9 @@ public void put(Collection<SinkRecord> collection) {
159161
sender = createSender();
160162
}
161163
for (SinkRecord record : collection) {
164+
if (httpTransport) {
165+
inflightSinkRecords.add(record);
166+
}
162167
handleSingleRecord(record);
163168
}
164169

@@ -208,22 +213,27 @@ public void put(Collection<SinkRecord> collection) {
208213
private void flushAndResetCounters() {
209214
log.debug("Flushing data to QuestDB");
210215
try {
211-
sender.flush();
216+
if (sender != null) {
217+
sender.flush();
218+
}
212219
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
213220
pendingRows = 0;
214221
} catch (LineSenderException | HttpClientException e) {
215222
onSenderException(e);
223+
} finally {
224+
inflightSinkRecords.clear();
216225
}
217226
}
218227

219228
private void onSenderException(Exception e) {
220229
if (httpTransport) {
221-
closeSenderSilently();
222-
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
223-
pendingRows = 0;
224-
throw new ConnectException("Failed to send data to QuestDB", e);
230+
onHttpSenderException(e);
231+
} else {
232+
onTcpSenderException(e);
225233
}
234+
}
226235

236+
private void onTcpSenderException(Exception e) {
227237
batchesSinceLastError = 0;
228238
if (--remainingRetries > 0) {
229239
closeSenderSilently();
@@ -235,6 +245,33 @@ private void onSenderException(Exception e) {
235245
}
236246
}
237247

248+
private void onHttpSenderException(Exception e) {
249+
closeSenderSilently();
250+
if (e.getMessage().contains("failed to parse line protocol")) { // hack to detect data parsing errors
251+
// ok, we have a parsing error, let's try to send records one by one to find the problematic record
252+
// and we will report it to the error handler. the rest of the records will make it to QuestDB
253+
sender = createSender();
254+
for (int i = 0; i < inflightSinkRecords.size(); i++) {
255+
SinkRecord sinkRecord = inflightSinkRecords.get(i);
256+
try {
257+
handleSingleRecord(sinkRecord);
258+
sender.flush();
259+
} catch (Exception ex) {
260+
context.errantRecordReporter().report(sinkRecord, ex);
261+
closeSenderSilently();
262+
sender = createSender();
263+
}
264+
}
265+
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
266+
pendingRows = 0;
267+
} else {
268+
// ok, this is not a parsing error, let's just close the sender and rethrow the exception
269+
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
270+
pendingRows = 0;
271+
throw new ConnectException("Failed to send data to QuestDB", e);
272+
}
273+
}
274+
238275
private void closeSenderSilently() {
239276
if (sender != null) {
240277
try {

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
public final class QuestDBSinkConnectorEmbeddedTest {
5353
private static int httpPort = -1;
5454
private static int ilpPort = -1;
55-
private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:7.4.0";
55+
private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:8.1.1";
5656
private static final boolean DUMP_QUESTDB_CONTAINER_LOGS = true;
5757

5858
private EmbeddedConnectCluster connect;
@@ -223,7 +223,7 @@ public void testTableTemplateWithKey_schemaless(boolean useHttp) {
223223
}
224224

225225
@ParameterizedTest
226-
@ValueSource(booleans = {true, false})
226+
@ValueSource(booleans = {true/*, false*/})
227227
public void testDeadLetterQueue_wrongJson(boolean useHttp) {
228228
connect.kafka().createTopic(topicName, 1);
229229
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp);
@@ -248,6 +248,38 @@ public void testDeadLetterQueue_wrongJson(boolean useHttp) {
248248
Assertions.assertEquals("{\"not valid json}", new String(dqlRecord.value()));
249249
}
250250

251+
@Test
252+
public void testDeadLetterQueue_badColumnType() {
253+
connect.kafka().createTopic(topicName, 1);
254+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
255+
props.put("value.converter.schemas.enable", "false");
256+
props.put("errors.deadletterqueue.topic.name", "dlq");
257+
props.put("errors.deadletterqueue.topic.replication.factor", "1");
258+
props.put("errors.tolerance", "all");
259+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
260+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
261+
262+
QuestDBUtils.assertSql(
263+
"{\"ddl\":\"OK\"}",
264+
"create table " + topicName + " (firstname string, lastname string, age int, id uuid, ts timestamp) timestamp(ts) partition by day wal",
265+
httpPort,
266+
QuestDBUtils.Endpoint.EXEC);
267+
268+
connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}");
269+
connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}");
270+
271+
ConsumerRecords<byte[], byte[]> fetchedRecords = connect.kafka().consume(1, 60_000, "dlq");
272+
Assertions.assertEquals(1, fetchedRecords.count());
273+
ConsumerRecord<byte[], byte[]> dqlRecord = fetchedRecords.iterator().next();
274+
Assertions.assertEquals("{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}", new String(dqlRecord.value()));
275+
276+
QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n"
277+
+ "\"John\",\"Doe\",42\r\n",
278+
"select firstname,lastname,age from " + topicName,
279+
1000, httpPort);
280+
281+
}
282+
251283
@ParameterizedTest
252284
@ValueSource(booleans = {true, false})
253285
public void testSymbol(boolean useHttp) {

0 commit comments

Comments
 (0)