Skip to content

Commit 0c08aac

Browse files
authored
feat: decrease latency when some rows are locally buffered (#17)
* feat: decrease latency when some rows are locally buffered we instruct Kafka Connect to call us in a reasonable time even if no new message is delivered. this gives us a chance to flush whenever we have buffered locally. this means users no longer have to decrease offset.commit.interval.ms explicitly.
1 parent f0350f5 commit 0c08aac

File tree

5 files changed

+39
-9
lines changed

5 files changed

+39
-9
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig {
6464
public static final String CONFIGURATION_STRING_CONFIG = "client.conf.string";
6565
public static final String CONFIGURATION_STRING_DOC = "Configuration string for QuestDB client";
6666

67+
public static final String ALLOWED_LAG_CONFIG = "allowed.lag";
68+
public static final String ALLOWED_LAG_DOC = "The maximum lag in milliseconds allowed for the connector to keep buffered data in memory " +
69+
"if there are no new records in Kafka topics. Higher lag allows more batching and improves throughput, but increase the time " +
70+
"it takes to detect new data in Kafka topics. Low lag reduces the time it takes to detect new data in Kafka topics, but may " +
71+
"reduce throughput. The default value is 1000 ms.";
72+
6773
public static final String RETRY_BACKOFF_MS = "retry.backoff.ms";
6874
private static final String RETRY_BACKOFF_MS_DOC = "The time in milliseconds to wait following an error before a retry attempt is made";
6975

@@ -104,13 +110,18 @@ public static ConfigDef conf() {
104110
.define(TIMESTAMP_STRING_FIELDS, Type.STRING, null, Importance.MEDIUM, TIMESTAMP_STRING_FIELDS_DOC)
105111
.define(DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, DESIGNATED_TIMESTAMP_KAFKA_NATIVE_DOC)
106112
.define(TLS_VALIDATION_MODE_CONFIG, Type.STRING, "default", ConfigDef.ValidString.in("default", "insecure"), Importance.LOW, TLS_VALIDATION_MODE_DOC)
107-
.define(CONFIGURATION_STRING_CONFIG, Type.PASSWORD, null, Importance.HIGH, CONFIGURATION_STRING_DOC);
113+
.define(CONFIGURATION_STRING_CONFIG, Type.PASSWORD, null, Importance.HIGH, CONFIGURATION_STRING_DOC)
114+
.define(ALLOWED_LAG_CONFIG, Type.INT, 1000, ConfigDef.Range.between(1, Integer.MAX_VALUE), Importance.LOW, ALLOWED_LAG_DOC);
108115
}
109116

110117
public Password getConfigurationString() {
111118
return getPassword(CONFIGURATION_STRING_CONFIG);
112119
}
113120

121+
public int getAllowedLag() {
122+
return getInt(ALLOWED_LAG_CONFIG);
123+
}
124+
114125
public String getTlsValidationMode() {
115126
return getString(TLS_VALIDATION_MODE_CONFIG).toLowerCase(Locale.ENGLISH);
116127
}

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public final class QuestDBSinkTask extends SinkTask {
4242
private DateFormat dataFormat;
4343
private boolean kafkaTimestampsEnabled;
4444
private boolean httpTransport;
45+
private int allowedLag;
4546

4647
@Override
4748
public String version() {
@@ -76,6 +77,7 @@ public void start(Map<String, String> map) {
7677
this.timestampColumnName = config.getDesignatedTimestampColumnName();
7778
this.kafkaTimestampsEnabled = config.isDesignatedTimestampKafkaNative();
7879
this.timestampUnits = config.getTimestampUnitsOrNull();
80+
this.allowedLag = config.getAllowedLag();
7981
}
8082

8183
private Sender createRawSender() {
@@ -123,9 +125,28 @@ private Sender createSender() {
123125
@Override
124126
public void put(Collection<SinkRecord> collection) {
125127
if (collection.isEmpty()) {
126-
log.debug("Received empty collection, ignoring");
128+
if (httpTransport) {
129+
log.debug("Received empty collection, let's flush the buffer");
130+
// Ok, there are no new records to send. Let's flush! Why?
131+
// We do not want locally buffered row to be stuck in the buffer for too long. Increases latency
132+
// between the time the record is produced and the time it is visible in QuestDB.
133+
// If the local buffer is empty then flushing is a cheap no-op.
134+
try {
135+
sender.flush();
136+
} catch (LineSenderException | HttpClientException e) {
137+
onSenderException(e);
138+
}
139+
} else {
140+
log.debug("Received empty collection, nothing to do");
141+
}
127142
return;
143+
} if (httpTransport) {
144+
// there are some records to send. good.
145+
// let's set a timeout so Kafka Connect will call us again in time
146+
// even if there are no new records to send. this gives us a chance to flush the buffer.
147+
context.timeout(allowedLag);
128148
}
149+
129150
if (log.isDebugEnabled()) {
130151
SinkRecord record = collection.iterator().next();
131152
log.debug("Received {} records. First record kafka coordinates:({}-{}-{}). ",

integration-tests/cp-server/src/test/java/io/questdb/kafka/QuestDBSinkConnectorIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public class QuestDBSinkConnectorIT {
6666
.withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1")
6767
.withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1")
6868
.withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1")
69-
.withEnv("QDB_CLIENT_CONF", "http::addr=questdb;auto_flush_rows=1;")
69+
.withEnv("QDB_CLIENT_CONF", "http::addr=questdb;auto_flush=off;") // intentionally disabled auto-flush
7070
.withNetwork(network)
7171
.withExposedPorts(8083)
7272
.withCopyFileToContainer(MountableFile.forHostPath(connectorJarResolver.getJarPath()), "/usr/share/java/kafka/questdb-connector.jar")
@@ -97,6 +97,7 @@ public void test() throws Exception {
9797
.with("key.converter", "org.apache.kafka.connect.storage.StringConverter")
9898
.with("value.converter", "org.apache.kafka.connect.storage.StringConverter")
9999
.with("topics", topicName);
100+
// ilp client conf string set via environment variable
100101

101102
connectContainer.registerConnector("my-connector", connector);
102103

integration-tests/debezium/src/test/java/kafka/DebeziumIT.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,11 @@ public class DebeziumIT {
7070
private final GenericContainer<?> questDBContainer = new GenericContainer<>("questdb/questdb:7.4.0")
7171
.withNetwork(network)
7272
.withExposedPorts(QuestDBUtils.QUESTDB_HTTP_PORT)
73-
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb")))
74-
.withEnv("QDB_CAIRO_COMMIT_LAG", "100")
75-
.withEnv("JAVA_OPTS", "-Djava.locale.providers=JRE,SPI");
73+
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb")));
7674

7775

7876
private ConnectorConfiguration newQuestSinkBaseConfig(String questTableName) {
79-
String confString = "http::addr=" + questDBContainer.getNetworkAliases().get(0) + ":9000;auto_flush_rows=1000;";
77+
String confString = "http::addr=" + questDBContainer.getNetworkAliases().get(0) + ":9000;";
8078
return ConnectorConfiguration.create()
8179
.with("connector.class", QuestDBSinkConnector.class.getName())
8280
.with("client.conf.string", confString)

integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,6 @@ private static GenericContainer<?> newConnectContainer(int id) {
172172
.withEnv("CONNECT_BOOTSTRAP_SERVERS", "kafka0:9092")
173173
.withEnv("CONNECT_GROUP_ID", "test")
174174
.withEnv("CONNECT_OFFSET_FLUSH_INTERVAL_MS", "5000")
175-
.withEnv("CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY", "All")
176175
.withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "connect-storage-topic")
177176
.withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "connect-config-topic")
178177
.withEnv("CONNECT_STATUS_STORAGE_TOPIC", "connect-status-topic")
@@ -298,7 +297,7 @@ private static void startKillingRandomContainers(CyclicBarrier barrier) {
298297
}
299298

300299
private static void startConnector() throws IOException, InterruptedException, URISyntaxException {
301-
String confString = "http::addr=questdb:9000;auto_flush_rows=10000;auto_flush_interval=1200000;retry_timeout=60000;";
300+
String confString = "http::addr=questdb:9000;auto_flush_rows=10000;auto_flush_interval=" + Integer.MAX_VALUE + ";retry_timeout=60000;";
302301

303302
String payload = "{\"name\":\"my-connector\",\"config\":{" +
304303
"\"tasks.max\":\"4\"," +

0 commit comments

Comments
 (0)