@@ -172,7 +172,22 @@ public void put(Collection<SinkRecord> collection) {
172
172
if (httpTransport ) {
173
173
inflightSinkRecords .add (record );
174
174
}
175
- handleSingleRecord (record );
175
+ try {
176
+ handleSingleRecord (record );
177
+ } catch (InvalidDataException ex ) {
178
+ // data format error generated on client-side
179
+
180
+ if (httpTransport && reporter != null ) {
181
+ // we have DLQ set, let's report this single object
182
+
183
+ // remove the last item from in-flight records
184
+ inflightSinkRecords .setPos (inflightSinkRecords .size () - 1 );
185
+ context .errantRecordReporter ().report (record , ex );
186
+ } else {
187
+ // ok, no DQL, let's error the connector
188
+ throw ex ;
189
+ }
190
+ }
176
191
}
177
192
178
193
if (httpTransport ) {
@@ -257,7 +272,7 @@ private void onTcpSenderException(Exception e) {
257
272
private void onHttpSenderException (Exception e ) {
258
273
closeSenderSilently ();
259
274
if (
260
- (reporter != null && e .getMessage () != null ) // hack to detect data parsing errors
275
+ (reporter != null && e .getMessage () != null ) // hack to detect data parsing errors originating at server-side
261
276
&& (e .getMessage ().contains ("error in line" ) || e .getMessage ().contains ("failed to parse line protocol" ))
262
277
) {
263
278
// ok, we have a parsing error, let's try to send records one by one to find the problematic record
@@ -300,16 +315,27 @@ private void handleSingleRecord(SinkRecord record) {
300
315
assert timestampColumnValue == Long .MIN_VALUE ;
301
316
302
317
CharSequence tableName = recordToTable .apply (record );
318
+ if (tableName == null || tableName .equals ("" )) {
319
+ throw new InvalidDataException ("Table name cannot be empty" );
320
+ }
303
321
sender .table (tableName );
304
322
305
- if (config .isIncludeKey ()) {
306
- handleObject (config .getKeyPrefix (), record .keySchema (), record .key (), PRIMITIVE_KEY_FALLBACK_NAME );
323
+ try {
324
+ if (config .isIncludeKey ()) {
325
+ handleObject (config .getKeyPrefix (), record .keySchema (), record .key (), PRIMITIVE_KEY_FALLBACK_NAME );
326
+ }
327
+ handleObject (config .getValuePrefix (), record .valueSchema (), record .value (), PRIMITIVE_VALUE_FALLBACK_NAME );
328
+ } catch (InvalidDataException ex ) {
329
+ if (httpTransport ) {
330
+ sender .cancelRow ();
331
+ }
332
+ throw ex ;
307
333
}
308
- handleObject (config .getValuePrefix (), record .valueSchema (), record .value (), PRIMITIVE_VALUE_FALLBACK_NAME );
309
334
310
335
if (kafkaTimestampsEnabled ) {
311
336
timestampColumnValue = TimeUnit .MILLISECONDS .toNanos (record .timestamp ());
312
337
}
338
+
313
339
if (timestampColumnValue == Long .MIN_VALUE ) {
314
340
sender .atNow ();
315
341
} else {
@@ -338,7 +364,7 @@ private void handleMap(String name, Map<?, ?> value, String fallbackName) {
338
364
for (Map .Entry <?, ?> entry : value .entrySet ()) {
339
365
Object mapKey = entry .getKey ();
340
366
if (!(mapKey instanceof String )) {
341
- throw new ConnectException ("Map keys must be strings" );
367
+ throw new InvalidDataException ("Map keys must be strings" );
342
368
}
343
369
String mapKeyName = (String ) mapKey ;
344
370
String entryName = name .isEmpty () ? mapKeyName : name + STRUCT_FIELD_SEPARATOR + mapKeyName ;
@@ -365,7 +391,7 @@ private void handleObject(String name, Schema schema, Object value, String fallb
365
391
if (isDesignatedColumnName (name , fallbackName )) {
366
392
assert timestampColumnValue == Long .MIN_VALUE ;
367
393
if (value == null ) {
368
- throw new ConnectException ("Timestamp column value cannot be null" );
394
+ throw new InvalidDataException ("Timestamp column value cannot be null" );
369
395
}
370
396
timestampColumnValue = resolveDesignatedTimestampColumnValue (value , schema );
371
397
return ;
@@ -393,7 +419,7 @@ private long resolveDesignatedTimestampColumnValue(Object value, Schema schema)
393
419
return parseToMicros ((String ) value ) * 1000 ;
394
420
}
395
421
if (!(value instanceof Long )) {
396
- throw new ConnectException ("Unsupported timestamp column type: " + value .getClass ());
422
+ throw new InvalidDataException ("Unsupported timestamp column type: " + value .getClass ());
397
423
}
398
424
long longValue = (Long ) value ;
399
425
TimeUnit inputUnit ;
@@ -453,7 +479,7 @@ private long parseToMicros(String timestamp) {
453
479
try {
454
480
return dataFormat .parse (timestamp , DateFormatUtils .EN_LOCALE );
455
481
} catch (NumericException e ) {
456
- throw new ConnectException ("Cannot parse timestamp: " + timestamp + " with the configured format '" + config .getTimestampFormat () +"' use '"
482
+ throw new InvalidDataException ("Cannot parse timestamp: " + timestamp + " with the configured format '" + config .getTimestampFormat () +"' use '"
457
483
+ QuestDBSinkConnectorConfig .TIMESTAMP_FORMAT + "' to configure the right timestamp format. " +
458
484
"See https://questdb.io/docs/reference/function/date-time/#date-and-timestamp-format for timestamp parser documentation. " , e );
459
485
}
@@ -513,7 +539,7 @@ private void onUnsupportedType(String name, Object type) {
513
539
if (config .isSkipUnsupportedTypes ()) {
514
540
log .debug ("Skipping unsupported type: {}, name: {}" , type , name );
515
541
} else {
516
- throw new ConnectException ("Unsupported type: " + type + ", name: " + name );
542
+ throw new InvalidDataException ("Unsupported type: " + type + ", name: " + name );
517
543
}
518
544
}
519
545
0 commit comments