Skip to content

Commit bb38893

Browse files
authored
feat: use auto flush configuration from client config string (#22)
* feat: connectors uses auto flush configuration from client config string
1 parent a3c3aad commit bb38893

File tree

8 files changed

+213
-88
lines changed

8 files changed

+213
-88
lines changed

connector/src/main/java/io/questdb/kafka/ClientConfUtils.java

Lines changed: 93 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,65 +2,119 @@
22

33
import io.questdb.client.impl.ConfStringParser;
44
import io.questdb.std.Chars;
5+
import io.questdb.std.Misc;
6+
import io.questdb.std.Numbers;
7+
import io.questdb.std.NumericException;
58
import io.questdb.std.str.StringSink;
9+
import org.apache.kafka.common.config.ConfigException;
10+
11+
import java.util.concurrent.TimeUnit;
612

713
final class ClientConfUtils {
814
private ClientConfUtils() {
915
}
1016

11-
static boolean patchConfStr(String confStr, StringSink sink) {
12-
int pos = ConfStringParser.of(confStr, sink);
17+
18+
static boolean patchConfStr(String confStr, StringSink sink, FlushConfig flushConfig) {
19+
flushConfig.reset();
20+
21+
sink.clear();
22+
StringSink tmpSink = Misc.getThreadLocalSink();
23+
int pos = ConfStringParser.of(confStr, tmpSink);
1324
if (pos < 0) {
14-
sink.clear();
1525
sink.put(confStr);
1626
return false;
1727
}
1828

19-
boolean isHttpTransport = Chars.equals(sink, "http") || Chars.equals(sink, "https");
20-
boolean intervalFlushSetExplicitly = false;
21-
boolean flushesDisabled = false;
22-
boolean parseError = false;
23-
boolean hasAtLeastOneParam = false;
29+
boolean isHttpTransport = Chars.equals(tmpSink, "http") || Chars.equals(tmpSink, "https");
30+
if (!isHttpTransport) {
31+
sink.put(confStr);
32+
// no patching for TCP transport
33+
return false;
34+
}
35+
sink.put(tmpSink).put("::");
2436

25-
// disable interval based flushes
26-
// unless they are explicitly set or auto_flush is entirely off
27-
// why? the connector has its own mechanism to flush data in a timely manner
37+
boolean hasAtLeastOneParam = false;
2838
while (ConfStringParser.hasNext(confStr, pos)) {
2939
hasAtLeastOneParam = true;
30-
pos = ConfStringParser.nextKey(confStr, pos, sink);
40+
pos = ConfStringParser.nextKey(confStr, pos, tmpSink);
3141
if (pos < 0) {
32-
parseError = true;
33-
break;
42+
sink.clear();
43+
sink.put(confStr);
44+
return true;
3445
}
35-
if (Chars.equals(sink, "auto_flush_interval")) {
36-
intervalFlushSetExplicitly = true;
37-
pos = ConfStringParser.value(confStr, pos, sink);
38-
} else if (Chars.equals(sink, "auto_flush")) {
39-
pos = ConfStringParser.value(confStr, pos, sink);
40-
flushesDisabled = Chars.equals(sink, "off");
46+
if (Chars.equals(tmpSink, "auto_flush_interval")) {
47+
pos = ConfStringParser.value(confStr, pos, tmpSink);
48+
if (pos < 0) {
49+
sink.clear();
50+
sink.put(confStr);
51+
// invalid config, let the real client parser to fail
52+
return true;
53+
}
54+
if (Chars.equals(tmpSink, "off")) {
55+
throw new ConfigException("QuestDB Kafka connector cannot have auto_flush_interval disabled");
56+
}
57+
try {
58+
flushConfig.autoFlushNanos = TimeUnit.MILLISECONDS.toNanos(Numbers.parseLong(tmpSink));
59+
} catch (NumericException e) {
60+
throw new ConfigException("Invalid auto_flush_interval value [auto_flush_interval=" + tmpSink + ']');
61+
}
62+
} else if (Chars.equals(tmpSink, "auto_flush_rows")) {
63+
pos = ConfStringParser.value(confStr, pos, tmpSink);
64+
if (pos < 0) {
65+
sink.clear();
66+
sink.put(confStr);
67+
return true;
68+
}
69+
if (Chars.equals(tmpSink, "off")) {
70+
throw new ConfigException("QuestDB Kafka connector cannot have auto_flush_rows disabled");
71+
} else {
72+
try {
73+
flushConfig.autoFlushRows = Numbers.parseInt(tmpSink);
74+
} catch (NumericException e) {
75+
throw new ConfigException("Invalid auto_flush_rows value [auto_flush_rows=" + tmpSink + ']');
76+
}
77+
}
78+
} else if (Chars.equals(tmpSink, "auto_flush")) {
79+
pos = ConfStringParser.value(confStr, pos, tmpSink);
80+
if (pos < 0) {
81+
sink.clear();
82+
sink.put(confStr);
83+
return true;
84+
}
85+
if (Chars.equals(tmpSink, "off")) {
86+
throw new ConfigException("QuestDB Kafka connector cannot have auto_flush disabled");
87+
} else if (!Chars.equals(tmpSink, "on")) {
88+
throw new ConfigException("Unknown auto_flush value [auto_flush=" + tmpSink + ']');
89+
}
4190
} else {
42-
pos = ConfStringParser.value(confStr, pos, sink); // skip other values
43-
}
44-
if (pos < 0) {
45-
parseError = true;
46-
break;
91+
// copy other params
92+
sink.put(tmpSink).put('=');
93+
pos = ConfStringParser.value(confStr, pos, tmpSink);
94+
if (pos < 0) {
95+
sink.clear();
96+
sink.put(confStr);
97+
return true;
98+
}
99+
for (int i = 0; i < tmpSink.length(); i++) {
100+
char ch = tmpSink.charAt(i);
101+
sink.put(ch);
102+
// re-escape semicolon
103+
if (ch == ';') {
104+
sink.put(';');
105+
}
106+
}
107+
sink.put(';');
47108
}
48109
}
49-
sink.clear();
50-
sink.put(confStr);
51-
if (!parseError // we don't want to mess with the config if there was a parse error
52-
&& isHttpTransport // we only want to patch http transport
53-
&& !flushesDisabled // if auto-flush is disabled we don't need to do anything
54-
&& !intervalFlushSetExplicitly // if auto_flush_interval is set explicitly we don't want to override it
55-
&& hasAtLeastOneParam // no parameter is also an error since at least address should be set. we let client throw exception in this case
56-
) {
57-
// if everything is ok, we set auto_flush_interval to max value
58-
// this will effectively disable interval based flushes
59-
// and the connector will flush data only when it is told to do so by Connector
60-
// or if a row count limit is reached
61-
sink.put("auto_flush_interval=").put(Integer.MAX_VALUE).put(';');
110+
if (!hasAtLeastOneParam) {
111+
// this is invalid, let the real client parser to fail
112+
sink.clear();
113+
sink.put(confStr);
114+
return true;
62115
}
116+
sink.put("auto_flush=off;");
63117

64-
return isHttpTransport;
118+
return true;
65119
}
66120
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.questdb.kafka;
2+
3+
import java.util.concurrent.TimeUnit;
4+
5+
final class FlushConfig {
6+
int autoFlushRows;
7+
long autoFlushNanos;
8+
9+
void reset() {
10+
autoFlushRows = 75_000;
11+
autoFlushNanos = TimeUnit.SECONDS.toNanos(1);
12+
}
13+
}

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ public final class QuestDBSinkTask extends SinkTask {
4343
private boolean kafkaTimestampsEnabled;
4444
private boolean httpTransport;
4545
private int allowedLag;
46+
private long nextFlushNanos;
47+
private int pendingRows;
48+
private final FlushConfig flushConfig = new FlushConfig();
4649

4750
@Override
4851
public String version() {
@@ -79,6 +82,7 @@ public void start(Map<String, String> map) {
7982
this.kafkaTimestampsEnabled = config.isDesignatedTimestampKafkaNative();
8083
this.timestampUnits = config.getTimestampUnitsOrNull();
8184
this.allowedLag = config.getAllowedLag();
85+
this.nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
8286
}
8387

8488
private Sender createRawSender() {
@@ -91,10 +95,13 @@ private Sender createRawSender() {
9195
if (confStr != null && !confStr.isEmpty()) {
9296
log.debug("Using client configuration string");
9397
StringSink sink = new StringSink();
94-
httpTransport = ClientConfUtils.patchConfStr(confStr, sink);
98+
httpTransport = ClientConfUtils.patchConfStr(confStr, sink, flushConfig);
99+
if (!httpTransport) {
100+
log.info("Using TCP transport, consider using HTTP transport for improved fault tolerance and error handling");
101+
}
95102
return Sender.fromConfig(sink);
96103
}
97-
log.debug("Using legacy client configuration");
104+
log.warn("Configuration options 'host', 'tsl', 'token' and 'username' are deprecated and will be removed in the future. Use 'client.conf.string' instead. See: https://questdb.io/docs/third-party-tools/kafka/questdb-kafka/#configuration-options");
98105
Sender.LineSenderBuilder builder = Sender.builder(Sender.Transport.TCP).address(config.getHost());
99106
if (config.isTls()) {
100107
builder.enableTls();
@@ -132,11 +139,7 @@ public void put(Collection<SinkRecord> collection) {
132139
// We do not want locally buffered row to be stuck in the buffer for too long. It increases
133140
// latency between the time the record is produced and the time it is visible in QuestDB.
134141
// If the local buffer is empty then flushing is a cheap no-op.
135-
try {
136-
sender.flush();
137-
} catch (LineSenderException | HttpClientException e) {
138-
onSenderException(e);
139-
}
142+
flushAndResetCounters();
140143
} else {
141144
log.debug("Received empty collection, nothing to do");
142145
}
@@ -156,7 +159,27 @@ public void put(Collection<SinkRecord> collection) {
156159
handleSingleRecord(record);
157160
}
158161

159-
if (!httpTransport) {
162+
if (httpTransport) {
163+
if (pendingRows >= flushConfig.autoFlushRows) {
164+
log.debug("Flushing data to QuestDB due to auto_flush_rows limit [pending-rows={}, max-pending-rows={}]",
165+
pendingRows, flushConfig.autoFlushRows);
166+
flushAndResetCounters();
167+
} else {
168+
long remainingNanos = nextFlushNanos - System.nanoTime();
169+
long remainingMs = TimeUnit.NANOSECONDS.toMillis(remainingNanos);
170+
if (remainingMs <= 0) {
171+
log.debug("Flushing data to QuestDB due to auto_flush_interval timeout");
172+
flushAndResetCounters();
173+
} else if (allowedLag == 0) {
174+
log.debug("Flushing data to QuestDB due to zero allowed lag");
175+
flushAndResetCounters();
176+
} else {
177+
log.debug("Flushing data to QuestDB in {} ms", remainingMs);
178+
long maxWaitTime = Math.min(remainingMs, allowedLag);
179+
context.timeout(maxWaitTime);
180+
}
181+
}
182+
} else {
160183
log.debug("Sending {} records", collection.size());
161184
sender.flush();
162185
log.debug("Successfully sent {} records", collection.size());
@@ -177,18 +200,24 @@ public void put(Collection<SinkRecord> collection) {
177200
} catch (LineSenderException | HttpClientException e) {
178201
onSenderException(e);
179202
}
203+
}
180204

181-
if (httpTransport) {
182-
// we successfully added some rows to the local buffer.
183-
// let's set a timeout so Kafka Connect will call us again in time even if there are
184-
// no new records to send. this gives us a chance to flush the buffer.
185-
context.timeout(allowedLag);
205+
private void flushAndResetCounters() {
206+
log.debug("Flushing data to QuestDB");
207+
try {
208+
sender.flush();
209+
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
210+
pendingRows = 0;
211+
} catch (LineSenderException | HttpClientException e) {
212+
onSenderException(e);
186213
}
187214
}
188215

189216
private void onSenderException(Exception e) {
190217
if (httpTransport) {
191218
closeSenderSilently();
219+
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
220+
pendingRows = 0;
192221
throw new ConnectException("Failed to send data to QuestDB", e);
193222
}
194223

@@ -239,6 +268,7 @@ private void handleSingleRecord(SinkRecord record) {
239268
timestampColumnValue = Long.MIN_VALUE;
240269
}
241270
}
271+
pendingRows++;
242272
}
243273

244274
private void handleStruct(String parentName, Struct value, Schema schema) {
@@ -482,13 +512,7 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, Offs
482512
@Override
483513
public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
484514
if (httpTransport) {
485-
try {
486-
log.debug("Flushing data to QuestDB");
487-
sender.flush();
488-
} catch (LineSenderException | HttpClientException e) {
489-
onSenderException(e);
490-
throw new ConnectException("Failed to flush data to QuestDB", e);
491-
}
515+
flushAndResetCounters();
492516
}
493517
// TCP transport flushes after each batch so no need to flush here
494518
}

0 commit comments

Comments
 (0)