Skip to content

Commit 4ee853f

Browse files
committed
feat: connectors uses auto flush configuration from client config string
1 parent 8fbe05a commit 4ee853f

File tree

6 files changed

+202
-80
lines changed

6 files changed

+202
-80
lines changed

connector/src/main/java/io/questdb/kafka/ClientConfUtils.java

Lines changed: 91 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,65 +2,117 @@
22

33
import io.questdb.client.impl.ConfStringParser;
44
import io.questdb.std.Chars;
5+
import io.questdb.std.Misc;
6+
import io.questdb.std.Numbers;
7+
import io.questdb.std.NumericException;
58
import io.questdb.std.str.StringSink;
9+
import org.apache.kafka.common.config.ConfigException;
610

711
final class ClientConfUtils {
812
private ClientConfUtils() {
913
}
1014

11-
static boolean patchConfStr(String confStr, StringSink sink) {
12-
int pos = ConfStringParser.of(confStr, sink);
15+
16+
static boolean patchConfStr(String confStr, StringSink sink, FlushConfig flushConfig) {
17+
flushConfig.reset();
18+
19+
sink.clear();
20+
StringSink tmpSink = Misc.getThreadLocalSink();
21+
int pos = ConfStringParser.of(confStr, tmpSink);
1322
if (pos < 0) {
14-
sink.clear();
1523
sink.put(confStr);
1624
return false;
1725
}
1826

19-
boolean isHttpTransport = Chars.equals(sink, "http") || Chars.equals(sink, "https");
20-
boolean intervalFlushSetExplicitly = false;
21-
boolean flushesDisabled = false;
22-
boolean parseError = false;
23-
boolean hasAtLeastOneParam = false;
27+
boolean isHttpTransport = Chars.equals(tmpSink, "http") || Chars.equals(tmpSink, "https");
28+
if (!isHttpTransport) {
29+
sink.put(confStr);
30+
// no patching for TCP transport
31+
return false;
32+
}
33+
sink.put(tmpSink).put("::");
2434

25-
// disable interval based flushes
26-
// unless they are explicitly set or auto_flush is entirely off
27-
// why? the connector has its own mechanism to flush data in a timely manner
35+
boolean hasAtLeastOneParam = false;
2836
while (ConfStringParser.hasNext(confStr, pos)) {
2937
hasAtLeastOneParam = true;
30-
pos = ConfStringParser.nextKey(confStr, pos, sink);
38+
pos = ConfStringParser.nextKey(confStr, pos, tmpSink);
3139
if (pos < 0) {
32-
parseError = true;
33-
break;
40+
sink.clear();
41+
sink.put(confStr);
42+
return true;
3443
}
35-
if (Chars.equals(sink, "auto_flush_interval")) {
36-
intervalFlushSetExplicitly = true;
37-
pos = ConfStringParser.value(confStr, pos, sink);
38-
} else if (Chars.equals(sink, "auto_flush")) {
39-
pos = ConfStringParser.value(confStr, pos, sink);
40-
flushesDisabled = Chars.equals(sink, "off");
44+
if (Chars.equals(tmpSink, "auto_flush_interval")) {
45+
pos = ConfStringParser.value(confStr, pos, tmpSink);
46+
if (pos < 0) {
47+
sink.clear();
48+
sink.put(confStr);
49+
// invalid config, let the real client parser to fail
50+
return true;
51+
}
52+
if (Chars.equals(tmpSink, "off")) {
53+
throw new ConfigException("QuestDB Kafka connector cannot have auto_flush_interval disabled");
54+
}
55+
try {
56+
flushConfig.autoFlushNanos = Numbers.parseLong(tmpSink);
57+
} catch (NumericException e) {
58+
throw new ConfigException("Invalid auto_flush_interval value [auto_flush_interval=" + tmpSink + ']');
59+
}
60+
} else if (Chars.equals(tmpSink, "auto_flush_rows")) {
61+
pos = ConfStringParser.value(confStr, pos, tmpSink);
62+
if (pos < 0) {
63+
sink.clear();
64+
sink.put(confStr);
65+
return true;
66+
}
67+
if (Chars.equals(tmpSink, "off")) {
68+
throw new ConfigException("QuestDB Kafka connector cannot have auto_flush_rows disabled");
69+
} else {
70+
try {
71+
flushConfig.autoFlushRows = Numbers.parseInt(tmpSink);
72+
} catch (NumericException e) {
73+
throw new ConfigException("Invalid auto_flush_rows value [auto_flush_rows=" + tmpSink + ']');
74+
}
75+
}
76+
} else if (Chars.equals(tmpSink, "auto_flush")) {
77+
pos = ConfStringParser.value(confStr, pos, tmpSink);
78+
if (pos < 0) {
79+
sink.clear();
80+
sink.put(confStr);
81+
return true;
82+
}
83+
if (Chars.equals(tmpSink, "off")) {
84+
throw new ConfigException("QuestDB Kafka connector cannot have auto_flush disabled");
85+
} else if (!Chars.equals(tmpSink, "on")) {
86+
throw new ConfigException("Unknown auto_flush value [auto_flush=" + tmpSink + ']');
87+
}
4188
} else {
42-
pos = ConfStringParser.value(confStr, pos, sink); // skip other values
43-
}
44-
if (pos < 0) {
45-
parseError = true;
46-
break;
89+
// copy other params
90+
sink.put(tmpSink).put('=');
91+
pos = ConfStringParser.value(confStr, pos, tmpSink);
92+
if (pos < 0) {
93+
sink.clear();
94+
sink.put(confStr);
95+
return true;
96+
}
97+
for (int i = 0; i < tmpSink.length(); i++) {
98+
char ch = tmpSink.charAt(i);
99+
sink.put(ch);
100+
// re-escape semicolon
101+
if (ch == ';') {
102+
sink.put(';');
103+
}
104+
}
105+
sink.put(';');
47106
}
48107
}
49-
sink.clear();
50-
sink.put(confStr);
51-
if (!parseError // we don't want to mess with the config if there was a parse error
52-
&& isHttpTransport // we only want to patch http transport
53-
&& !flushesDisabled // if auto-flush is disabled we don't need to do anything
54-
&& !intervalFlushSetExplicitly // if auto_flush_interval is set explicitly we don't want to override it
55-
&& hasAtLeastOneParam // no parameter is also an error since at least address should be set. we let client throw exception in this case
56-
) {
57-
// if everything is ok, we set auto_flush_interval to max value
58-
// this will effectively disable interval based flushes
59-
// and the connector will flush data only when it is told to do so by Connector
60-
// or if a row count limit is reached
61-
sink.put("auto_flush_interval=").put(Integer.MAX_VALUE).put(';');
108+
if (!hasAtLeastOneParam) {
109+
// this is invalid, let the real client parser to fail
110+
sink.clear();
111+
sink.put(confStr);
112+
return true;
62113
}
114+
sink.put("auto_flush=off;");
63115

64-
return isHttpTransport;
116+
return true;
65117
}
66118
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.questdb.kafka;
2+
3+
import java.util.concurrent.TimeUnit;
4+
5+
class FlushConfig {
6+
int autoFlushRows;
7+
long autoFlushNanos;
8+
9+
void reset() {
10+
autoFlushRows = 75_000;
11+
autoFlushNanos = TimeUnit.SECONDS.toNanos(1);
12+
}
13+
}

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public final class QuestDBSinkTask extends SinkTask {
2828
private static final char STRUCT_FIELD_SEPARATOR = '_';
2929
private static final String PRIMITIVE_KEY_FALLBACK_NAME = "key";
3030
private static final String PRIMITIVE_VALUE_FALLBACK_NAME = "value";
31+
private static final long FLUSH_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(1);
3132

3233
private static final Logger log = LoggerFactory.getLogger(QuestDBSinkTask.class);
3334
private Sender sender;
@@ -43,6 +44,10 @@ public final class QuestDBSinkTask extends SinkTask {
4344
private boolean kafkaTimestampsEnabled;
4445
private boolean httpTransport;
4546
private int allowedLag;
47+
private long nextFlushNanos;
48+
private int pendingRows;
49+
private final int maxPendingRows = 75_000;
50+
private FlushConfig flushConfig = new FlushConfig();
4651

4752
@Override
4853
public String version() {
@@ -79,6 +84,7 @@ public void start(Map<String, String> map) {
7984
this.kafkaTimestampsEnabled = config.isDesignatedTimestampKafkaNative();
8085
this.timestampUnits = config.getTimestampUnitsOrNull();
8186
this.allowedLag = config.getAllowedLag();
87+
this.nextFlushNanos = System.nanoTime() + FLUSH_INTERVAL_NANOS;
8288
}
8389

8490
private Sender createRawSender() {
@@ -91,7 +97,7 @@ private Sender createRawSender() {
9197
if (confStr != null && !confStr.isEmpty()) {
9298
log.debug("Using client configuration string");
9399
StringSink sink = new StringSink();
94-
httpTransport = ClientConfUtils.patchConfStr(confStr, sink);
100+
httpTransport = ClientConfUtils.patchConfStr(confStr, sink, flushConfig);
95101
return Sender.fromConfig(sink);
96102
}
97103
log.debug("Using legacy client configuration");
@@ -132,11 +138,7 @@ public void put(Collection<SinkRecord> collection) {
132138
// We do not want locally buffered row to be stuck in the buffer for too long. It increases
133139
// latency between the time the record is produced and the time it is visible in QuestDB.
134140
// If the local buffer is empty then flushing is a cheap no-op.
135-
try {
136-
sender.flush();
137-
} catch (LineSenderException | HttpClientException e) {
138-
onSenderException(e);
139-
}
141+
flushAndResetCounters();
140142
} else {
141143
log.debug("Received empty collection, nothing to do");
142144
}
@@ -156,7 +158,27 @@ public void put(Collection<SinkRecord> collection) {
156158
handleSingleRecord(record);
157159
}
158160

159-
if (!httpTransport) {
161+
if (httpTransport) {
162+
if (pendingRows >= maxPendingRows) {
163+
log.debug("Flushing data to QuestDB due to auto_flush_rows limit [pending-rows={}, max-pending-rows={}]",
164+
pendingRows, maxPendingRows);
165+
flushAndResetCounters();
166+
} else {
167+
long remainingNanos = nextFlushNanos - System.nanoTime();
168+
long remainingMs = TimeUnit.NANOSECONDS.toMillis(remainingNanos);
169+
if (remainingMs <= 0) {
170+
log.debug("Flushing data to QuestDB due to auto_flush_interval timeout");
171+
flushAndResetCounters();
172+
} if (allowedLag == 0) {
173+
log.debug("Flushing data to QuestDB due to zero allowed lag");
174+
flushAndResetCounters();
175+
} else {
176+
log.debug("Flushing data to QuestDB in {} ms", remainingMs);
177+
long maxWaitTime = Math.min(remainingMs, allowedLag);
178+
context.timeout(maxWaitTime);
179+
}
180+
}
181+
} else {
160182
log.debug("Sending {} records", collection.size());
161183
sender.flush();
162184
log.debug("Successfully sent {} records", collection.size());
@@ -177,18 +199,24 @@ public void put(Collection<SinkRecord> collection) {
177199
} catch (LineSenderException | HttpClientException e) {
178200
onSenderException(e);
179201
}
202+
}
180203

181-
if (httpTransport) {
182-
// we successfully added some rows to the local buffer.
183-
// let's set a timeout so Kafka Connect will call us again in time even if there are
184-
// no new records to send. this gives us a chance to flush the buffer.
185-
context.timeout(allowedLag);
204+
private void flushAndResetCounters() {
205+
log.debug("Flushing data to QuestDB");
206+
try {
207+
sender.flush();
208+
nextFlushNanos = System.nanoTime() + FLUSH_INTERVAL_NANOS;
209+
pendingRows = 0;
210+
} catch (LineSenderException | HttpClientException e) {
211+
onSenderException(e);
186212
}
187213
}
188214

189215
private void onSenderException(Exception e) {
190216
if (httpTransport) {
191217
closeSenderSilently();
218+
nextFlushNanos = System.nanoTime() + FLUSH_INTERVAL_NANOS;
219+
pendingRows = 0;
192220
throw new ConnectException("Failed to send data to QuestDB", e);
193221
}
194222

@@ -239,6 +267,7 @@ private void handleSingleRecord(SinkRecord record) {
239267
timestampColumnValue = Long.MIN_VALUE;
240268
}
241269
}
270+
pendingRows++;
242271
}
243272

244273
private void handleStruct(String parentName, Struct value, Schema schema) {
@@ -470,13 +499,7 @@ private boolean tryWriteLogicalType(String name, Schema schema, Object value) {
470499
@Override
471500
public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
472501
if (httpTransport) {
473-
try {
474-
log.debug("Flushing data to QuestDB");
475-
sender.flush();
476-
} catch (LineSenderException | HttpClientException e) {
477-
onSenderException(e);
478-
throw new ConnectException("Failed to flush data to QuestDB", e);
479-
}
502+
flushAndResetCounters();
480503
}
481504
// TCP transport flushes after each batch so no need to flush here
482505
}

0 commit comments

Comments
 (0)