Skip to content

Commit 4cb82b6

Browse files
committed
debug value removed
1 parent 7b387b6 commit 4cb82b6

File tree

5 files changed

+20
-15
lines changed

5 files changed

+20
-15
lines changed

connector/src/main/java/io/questdb/kafka/ClientConfUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import io.questdb.std.str.StringSink;
99
import org.apache.kafka.common.config.ConfigException;
1010

11+
import java.util.concurrent.TimeUnit;
12+
1113
final class ClientConfUtils {
1214
private ClientConfUtils() {
1315
}
@@ -53,7 +55,7 @@ static boolean patchConfStr(String confStr, StringSink sink, FlushConfig flushCo
5355
throw new ConfigException("QuestDB Kafka connector cannot have auto_flush_interval disabled");
5456
}
5557
try {
56-
flushConfig.autoFlushNanos = Numbers.parseLong(tmpSink);
58+
flushConfig.autoFlushNanos = TimeUnit.MILLISECONDS.toNanos(Numbers.parseLong(tmpSink));
5759
} catch (NumericException e) {
5860
throw new ConfigException("Invalid auto_flush_interval value [auto_flush_interval=" + tmpSink + ']');
5961
}

connector/src/main/java/io/questdb/kafka/FlushConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import java.util.concurrent.TimeUnit;
44

5-
class FlushConfig {
5+
final class FlushConfig {
66
int autoFlushRows;
77
long autoFlushNanos;
88

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ public final class QuestDBSinkTask extends SinkTask {
2828
private static final char STRUCT_FIELD_SEPARATOR = '_';
2929
private static final String PRIMITIVE_KEY_FALLBACK_NAME = "key";
3030
private static final String PRIMITIVE_VALUE_FALLBACK_NAME = "value";
31-
private static final long FLUSH_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(1);
3231

3332
private static final Logger log = LoggerFactory.getLogger(QuestDBSinkTask.class);
3433
private Sender sender;
@@ -46,8 +45,7 @@ public final class QuestDBSinkTask extends SinkTask {
4645
private int allowedLag;
4746
private long nextFlushNanos;
4847
private int pendingRows;
49-
private final int maxPendingRows = 75_000;
50-
private FlushConfig flushConfig = new FlushConfig();
48+
private final FlushConfig flushConfig = new FlushConfig();
5149

5250
@Override
5351
public String version() {
@@ -84,7 +82,7 @@ public void start(Map<String, String> map) {
8482
this.kafkaTimestampsEnabled = config.isDesignatedTimestampKafkaNative();
8583
this.timestampUnits = config.getTimestampUnitsOrNull();
8684
this.allowedLag = config.getAllowedLag();
87-
this.nextFlushNanos = System.nanoTime() + FLUSH_INTERVAL_NANOS;
85+
this.nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
8886
}
8987

9088
private Sender createRawSender() {
@@ -98,9 +96,12 @@ private Sender createRawSender() {
9896
log.debug("Using client configuration string");
9997
StringSink sink = new StringSink();
10098
httpTransport = ClientConfUtils.patchConfStr(confStr, sink, flushConfig);
99+
if (!httpTransport) {
100+
log.info("Using TCP transport, consider using HTTP transport for improved fault tolerance and error handling");
101+
}
101102
return Sender.fromConfig(sink);
102103
}
103-
log.debug("Using legacy client configuration");
104+
log.warn("Configuration options 'host', 'tsl', 'token' and 'username' are deprecated and will be removed in the future. Use 'client.conf.string' instead. See: https://questdb.io/docs/third-party-tools/kafka/questdb-kafka/#configuration-options");
104105
Sender.LineSenderBuilder builder = Sender.builder(Sender.Transport.TCP).address(config.getHost());
105106
if (config.isTls()) {
106107
builder.enableTls();
@@ -159,9 +160,9 @@ public void put(Collection<SinkRecord> collection) {
159160
}
160161

161162
if (httpTransport) {
162-
if (pendingRows >= maxPendingRows) {
163+
if (pendingRows >= flushConfig.autoFlushRows) {
163164
log.debug("Flushing data to QuestDB due to auto_flush_rows limit [pending-rows={}, max-pending-rows={}]",
164-
pendingRows, maxPendingRows);
165+
pendingRows, flushConfig.autoFlushRows);
165166
flushAndResetCounters();
166167
} else {
167168
long remainingNanos = nextFlushNanos - System.nanoTime();
@@ -205,7 +206,7 @@ private void flushAndResetCounters() {
205206
log.debug("Flushing data to QuestDB");
206207
try {
207208
sender.flush();
208-
nextFlushNanos = System.nanoTime() + FLUSH_INTERVAL_NANOS;
209+
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
209210
pendingRows = 0;
210211
} catch (LineSenderException | HttpClientException e) {
211212
onSenderException(e);
@@ -215,7 +216,7 @@ private void flushAndResetCounters() {
215216
private void onSenderException(Exception e) {
216217
if (httpTransport) {
217218
closeSenderSilently();
218-
nextFlushNanos = System.nanoTime() + FLUSH_INTERVAL_NANOS;
219+
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
219220
pendingRows = 0;
220221
throw new ConnectException("Failed to send data to QuestDB", e);
221222
}

connector/src/test/java/io/questdb/kafka/ClientConfUtilsTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ private static void assertConfStringIsPatched(String confStr, String expectedPat
6565
ClientConfUtils.patchConfStr(confStr, sink, flushConfig);
6666

6767
Assert.assertEquals(expectedPatchedConfStr, sink.toString());
68+
Assert.assertEquals(expectedMaxPendingRows, flushConfig.autoFlushRows);
69+
Assert.assertEquals(expectedFlushNanos, flushConfig.autoFlushNanos);
6870
}
6971

7072
private static void assertConfStringIsNotPatched(String confStr) {

connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,13 @@ static Map<String, String> baseConnectorProps(GenericContainer<?> questDBContain
5252
props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
5353

5454
String confString;
55-
if (!useHttp) {
56-
String ilpIUrl = host + ":" + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT);
57-
props.put("host", ilpIUrl);
58-
} else {
55+
if (useHttp) {
5956
int port = questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT);
6057
confString = "http::addr=" + host + ":" + port + ";";
6158
props.put("client.conf.string", confString);
59+
} else {
60+
String ilpIUrl = host + ":" + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT);
61+
props.put("host", ilpIUrl);
6262
}
6363
return props;
6464
}

0 commit comments

Comments
 (0)