Skip to content

Commit a3c3aad

Browse files
authored
fix: Fix NPE when Kafka Connect requests a commit right after an error (#23)
1 parent 8985184 commit a3c3aad

File tree

1 file changed

+12
-0
lines changed

1 file changed

+12
-0
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,18 @@ private boolean tryWriteLogicalType(String name, Schema schema, Object value) {
467467
return false;
468468
}
469469

470+
@Override
471+
public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
472+
if (sender != null) {
473+
flush(currentOffsets);
474+
return currentOffsets;
475+
} else {
476+
// null sender indicates there was an error and we cannot guarantee that the data was actually sent
477+
// returning empty map will cause the task to avoid committing offsets to Kafka
478+
return Collections.emptyMap();
479+
}
480+
}
481+
470482
@Override
471483
public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
472484
if (httpTransport) {

0 commit comments

Comments
 (0)