Skip to content

Commit 0cf4745

Browse files
authored
feat: disable interval-based auto-flushes by default (#18)
the connector has its own mechanism to flush on inactivity, we don't need the client to do this as well. we disable it only when it's not set explicitly. since if a user sets an explicit flush interval then we can assume they know what they are doing.
1 parent 0c08aac commit 0cf4745

File tree

6 files changed

+140
-18
lines changed

6 files changed

+140
-18
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package io.questdb.kafka;
2+
3+
import io.questdb.client.impl.ConfStringParser;
4+
import io.questdb.std.Chars;
5+
import io.questdb.std.str.StringSink;
6+
7+
final class ClientConfUtils {
8+
private ClientConfUtils() {
9+
}
10+
11+
static boolean patchConfStr(String confStr, StringSink sink) {
12+
int pos = ConfStringParser.of(confStr, sink);
13+
if (pos < 0) {
14+
sink.clear();
15+
sink.put(confStr);
16+
return false;
17+
}
18+
19+
boolean isHttpTransport = Chars.equals(sink, "http") || Chars.equals(sink, "https");
20+
boolean intervalFlushSetExplicitly = false;
21+
boolean flushesDisabled = false;
22+
boolean parseError = false;
23+
boolean hasAtLeastOneParam = false;
24+
25+
// disable interval based flushes
26+
// unless they are explicitly set or auto_flush is entirely off
27+
// why? the connector has its own mechanism to flush data in a timely manner
28+
while (ConfStringParser.hasNext(confStr, pos)) {
29+
hasAtLeastOneParam = true;
30+
pos = ConfStringParser.nextKey(confStr, pos, sink);
31+
if (pos < 0) {
32+
parseError = true;
33+
break;
34+
}
35+
if (Chars.equals(sink, "auto_flush_interval")) {
36+
intervalFlushSetExplicitly = true;
37+
pos = ConfStringParser.value(confStr, pos, sink);
38+
} else if (Chars.equals(sink, "auto_flush")) {
39+
pos = ConfStringParser.value(confStr, pos, sink);
40+
flushesDisabled = Chars.equals(sink, "off");
41+
} else {
42+
pos = ConfStringParser.value(confStr, pos, sink); // skip other values
43+
}
44+
if (pos < 0) {
45+
parseError = true;
46+
break;
47+
}
48+
}
49+
sink.clear();
50+
sink.put(confStr);
51+
if (!parseError // we don't want to mess with the config if there was a parse error
52+
&& isHttpTransport // we only want to patch http transport
53+
&& !flushesDisabled // if auto-flush is disabled we don't need to do anything
54+
&& !intervalFlushSetExplicitly // if auto_flush_interval is set explicitly we don't want to override it
55+
&& hasAtLeastOneParam // no parameter is also an error since at least address should be set. we let client throw exception in this case
56+
) {
57+
// if everything is ok, we set auto_flush_interval to max value
58+
// this will effectively disable interval based flushes
59+
// and the connector will flush data only when it is told to do so by Connector
60+
// or if a row count limit is reached
61+
sink.put("auto_flush_interval=").put(Integer.MAX_VALUE).put(';');
62+
}
63+
64+
return isHttpTransport;
65+
}
66+
}

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
import io.questdb.client.Sender;
44
import io.questdb.cutlass.http.client.HttpClientException;
55
import io.questdb.cutlass.line.LineSenderException;
6-
import io.questdb.cutlass.line.http.LineHttpSender;
76
import io.questdb.std.NumericException;
87
import io.questdb.std.datetime.DateFormat;
98
import io.questdb.std.datetime.microtime.Timestamps;
109
import io.questdb.std.datetime.millitime.DateFormatUtils;
10+
import io.questdb.std.str.StringSink;
1111
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
1212
import org.apache.kafka.common.TopicPartition;
1313
import org.apache.kafka.common.config.types.Password;
@@ -89,9 +89,9 @@ private Sender createRawSender() {
8989
}
9090
if (confStr != null && !confStr.isEmpty()) {
9191
log.debug("Using client configuration string");
92-
Sender s = Sender.fromConfig(confStr);
93-
httpTransport = s instanceof LineHttpSender;
94-
return s;
92+
StringSink sink = new StringSink();
93+
httpTransport = ClientConfUtils.patchConfStr(confStr, sink);
94+
return Sender.fromConfig(sink);
9595
}
9696
log.debug("Using legacy client configuration");
9797
Sender.LineSenderBuilder builder = Sender.builder(Sender.Transport.TCP).address(config.getHost());
@@ -128,8 +128,8 @@ public void put(Collection<SinkRecord> collection) {
128128
if (httpTransport) {
129129
log.debug("Received empty collection, let's flush the buffer");
130130
// Ok, there are no new records to send. Let's flush! Why?
131-
// We do not want locally buffered row to be stuck in the buffer for too long. Increases latency
132-
// between the time the record is produced and the time it is visible in QuestDB.
131+
// We do not want locally buffered row to be stuck in the buffer for too long. It increases
132+
// latency between the time the record is produced and the time it is visible in QuestDB.
133133
// If the local buffer is empty then flushing is a cheap no-op.
134134
try {
135135
sender.flush();
@@ -140,11 +140,6 @@ public void put(Collection<SinkRecord> collection) {
140140
log.debug("Received empty collection, nothing to do");
141141
}
142142
return;
143-
} if (httpTransport) {
144-
// there are some records to send. good.
145-
// let's set a timeout so Kafka Connect will call us again in time
146-
// even if there are no new records to send. this gives us a chance to flush the buffer.
147-
context.timeout(allowedLag);
148143
}
149144

150145
if (log.isDebugEnabled()) {
@@ -181,6 +176,13 @@ public void put(Collection<SinkRecord> collection) {
181176
} catch (LineSenderException | HttpClientException e) {
182177
onSenderException(e);
183178
}
179+
180+
if (httpTransport) {
181+
// we successfully added some rows to the local buffer.
182+
// let's set a timeout so Kafka Connect will call us again in time even if there are
183+
// no new records to send. this gives us a chance to flush the buffer.
184+
context.timeout(allowedLag);
185+
}
184186
}
185187

186188
private void onSenderException(Exception e) {
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package io.questdb.kafka;
2+
3+
import io.questdb.std.Chars;
4+
import io.questdb.std.str.StringSink;
5+
import org.junit.Test;
6+
7+
import static org.junit.jupiter.api.Assertions.*;
8+
9+
public class ClientConfUtilsTest {
10+
11+
@Test
12+
public void testHttpTransportIsResolved() {
13+
StringSink sink = new StringSink();
14+
assertTrue(ClientConfUtils.patchConfStr("http::addr=localhost:9000;", sink));
15+
assertTrue(ClientConfUtils.patchConfStr("https::addr=localhost:9000;", sink));
16+
assertTrue(ClientConfUtils.patchConfStr("https::addr=localhost:9000;", sink));
17+
assertFalse(ClientConfUtils.patchConfStr("tcp::addr=localhost:9000;", sink));
18+
assertFalse(ClientConfUtils.patchConfStr("tcps::addr=localhost:9000;", sink));
19+
}
20+
21+
@Test
22+
public void testHttpTransportTimeBasedFlushesDisabledByDefault() {
23+
assertConfStringIsPatched("http::addr=localhost:9000;");
24+
assertConfStringIsPatched("https::addr=localhost:9000;foo=bar;");
25+
assertConfStringIsPatched("https::addr=localhost:9000;auto_flush_rows=1;");
26+
assertConfStringIsPatched("https::addr=localhost:9000;auto_flush=on;");
27+
28+
assertConfStringIsNotPatched("https::addr=localhost:9000;foo=bar;auto_flush_interval=100;");
29+
assertConfStringIsNotPatched("https::addr=localhost:9000;foo=bar;auto_flush=off;");
30+
assertConfStringIsNotPatched("https::addr=localhost:9000;foo=bar");
31+
assertConfStringIsNotPatched("https::addr");
32+
assertConfStringIsNotPatched("https");
33+
assertConfStringIsNotPatched("tcp::addr=localhost:9000;");
34+
assertConfStringIsNotPatched("tcps::addr=localhost:9000;foo=bar;");
35+
assertConfStringIsNotPatched("tcps::addr=localhost:9000;auto_flush_rows=1;");
36+
assertConfStringIsNotPatched("tcps::addr=localhost:9000;auto_flush=on;");
37+
assertConfStringIsNotPatched("unknown::addr=localhost:9000;auto_flush=on;");
38+
}
39+
40+
private static void assertConfStringIsPatched(String confStr) {
41+
StringSink sink = new StringSink();
42+
ClientConfUtils.patchConfStr(confStr, sink);
43+
44+
String expected = confStr + "auto_flush_interval=" + Integer.MAX_VALUE + ";";
45+
assertTrue(Chars.equals(expected, sink), "Conf string = " + confStr + ", expected = " + expected + ", actual = " + sink);
46+
}
47+
48+
private static void assertConfStringIsNotPatched(String confStr) {
49+
StringSink sink = new StringSink();
50+
ClientConfUtils.patchConfStr(confStr, sink);
51+
52+
assertEquals(confStr, sink.toString());
53+
}
54+
55+
}

connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ static Map<String, String> baseConnectorProps(GenericContainer<?> questDBContain
5757
props.put("host", ilpIUrl);
5858
} else {
5959
int port = questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT);
60-
confString = "http::addr="+host+":"+ port + ";";
60+
confString = "http::addr=" + host + ":" + port + ";";
6161
props.put("client.conf.string", confString);
6262
}
6363
return props;

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public final class QuestDBSinkConnectorEmbeddedTest {
6464

6565
@BeforeAll
6666
public static void createContainer() {
67-
questDBContainer = newQuestDbConnector();
67+
questDBContainer = newQuestDbContainer();
6868
}
6969

7070
@AfterAll
@@ -85,7 +85,7 @@ private static String questDBDirectory() {
8585

8686
private static GenericContainer<?> questDBContainer;
8787

88-
private static GenericContainer<?> newQuestDbConnector() {
88+
private static GenericContainer<?> newQuestDbContainer() {
8989
FixedHostPortGenericContainer<?> selfGenericContainer = new FixedHostPortGenericContainer<>(OFFICIAL_QUESTDB_DOCKER);
9090
if (httpPort != -1) {
9191
selfGenericContainer = selfGenericContainer.withFixedExposedPort(httpPort, QuestDBUtils.QUESTDB_HTTP_PORT);
@@ -120,7 +120,6 @@ public void setUp() {
120120

121121
Map<String, String> props = new HashMap<>();
122122
props.put("connector.client.config.override.policy", "All");
123-
props.put("offset.flush.interval.ms", "1000");
124123
connect = new EmbeddedConnectCluster.Builder()
125124
.name("questdb-connect-cluster")
126125
.workerProps(props)
@@ -300,7 +299,7 @@ public void testRetrying_recoversFromInfrastructureIssues(boolean useHttp) throw
300299
}
301300

302301
// restart QuestDB
303-
questDBContainer = newQuestDbConnector();
302+
questDBContainer = newQuestDbContainer();
304303
for (int i = 0; i < 50; i++) {
305304
connect.kafka().produce(topicName, "key3", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":" + i + "}");
306305
}
@@ -541,7 +540,7 @@ public void testExactlyOnce_withDedup() throws BrokenBarrierException, Interrupt
541540

542541
private static void restartQuestDB() {
543542
questDBContainer.stop();
544-
questDBContainer = newQuestDbConnector();
543+
questDBContainer = newQuestDbContainer();
545544
}
546545

547546
@ParameterizedTest

integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ private static void startKillingRandomContainers(CyclicBarrier barrier) {
297297
}
298298

299299
private static void startConnector() throws IOException, InterruptedException, URISyntaxException {
300-
String confString = "http::addr=questdb:9000;auto_flush_rows=10000;auto_flush_interval=" + Integer.MAX_VALUE + ";retry_timeout=60000;";
300+
String confString = "http::addr=questdb:9000;auto_flush_rows=10000;retry_timeout=60000;";
301301

302302
String payload = "{\"name\":\"my-connector\",\"config\":{" +
303303
"\"tasks.max\":\"4\"," +

0 commit comments

Comments
 (0)