Skip to content

Commit 9650135

Browse files
authored
fix: DH-18740: Make the table_type argument default to None (deephaven#6656)
Fixes DH-18740 1. changed the default values to None 2. fixed a bug in cdc.py 3. black formatted consumer.py Before: <img width="1203" alt="image" src="https://github.com/user-attachments/assets/e67e049b-f8e1-403a-a0a6-77d681cdbf55" /> After: <img width="1143" alt="image" src="https://github.com/user-attachments/assets/33d05fe7-1f20-4158-933d-361e660e0ba8" />
1 parent b32641f commit 9650135

File tree

1 file changed

+74
-61
lines changed

1 file changed

+74
-61
lines changed

py/server/deephaven/stream/kafka/consumer.py

+74-61
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ class TableType(JObjectWrapper):
7272
j_object_type = jpy.get_type("io.deephaven.kafka.KafkaTools$TableType")
7373

7474
@staticmethod
75-
def blink():
75+
def blink() -> 'TableType':
7676
""" Consume all partitions into a single interleaved blink table, which will present only newly-available rows
7777
to downstream operations and visualizations."""
7878
return TableType(TableType.j_object_type.blink())
@@ -85,12 +85,12 @@ def stream():
8585
return TableType.blink()
8686

8787
@staticmethod
88-
def append():
88+
def append() -> 'TableType':
8989
""" Consume all partitions into a single interleaved in-memory append-only table."""
9090
return TableType(TableType.j_object_type.append())
9191

9292
@staticmethod
93-
def ring(capacity: int):
93+
def ring(capacity: int) -> 'TableType':
9494
""" Consume all partitions into a single in-memory ring table."""
9595
return TableType(TableType.j_object_type.ring(capacity))
9696

@@ -101,14 +101,18 @@ def __init__(self, j_table_type: jpy.JType):
101101
def j_object(self) -> jpy.JType:
102102
return self._j_table_type
103103

104+
104105
# TODO (https://github.com/deephaven/deephaven-core/issues/3853): Delete this attribute
105106
TableType.Stream = TableType.blink()
106107
""" Deprecated, prefer TableType.blink(). Consume all partitions into a single interleaved blink table, which will
107108
present only newly-available rows to downstream operations and visualizations."""
108109

109110
# TODO (https://github.com/deephaven/deephaven-core/issues/3853): Delete this attribute
110111
TableType.Append = TableType.append()
111-
""" Deprecated, prefer TableType.append(). Consume all partitions into a single interleaved in-memory append-only table."""
112+
""" Deprecated, prefer TableType.append(). Consume all partitions into a single interleaved in-memory append-only
113+
table."""
114+
115+
TableType.Blink = TableType.blink()
112116

113117

114118
def j_partitions(partitions):
@@ -134,8 +138,8 @@ def consume(
134138
offsets: Dict[int, int] = None,
135139
key_spec: KeyValueSpec = None,
136140
value_spec: KeyValueSpec = None,
137-
table_type: TableType = TableType.blink(),
138-
) -> Table:
141+
table_type: TableType = None,
142+
) -> Table:
139143
"""Consume from Kafka to a Deephaven table.
140144
141145
Args:
@@ -158,17 +162,19 @@ def consume(
158162
It can be the result of calling one of the functions: simple_spec(),avro_spec() or json_spec() in this
159163
module, or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which
160164
works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values
161-
for dictionary keys 'deephaven.value.column.name' and 'deephaven.value.column.type', for the single resulting
165+
for dictionary keys 'deephaven.value.column.name' and 'deephaven.value.column.type', for the single
166+
resulting
162167
column name and type
163-
table_type (TableType): a TableType, default is TableType.blink()
168+
table_type (TableType): a TableType, default is None, meaning to use TableType.blink()
164169
165170
Returns:
166171
a Deephaven live table that will update based on Kafka messages consumed for the given topic
167172
168173
Raises:
169174
DHError
170175
"""
171-
176+
if table_type is None:
177+
table_type = TableType.blink()
172178
return _consume(kafka_config, topic, partitions, offsets, key_spec, value_spec, table_type, to_partitioned=False)
173179

174180

@@ -179,8 +185,8 @@ def consume_to_partitioned_table(
179185
offsets: Dict[int, int] = None,
180186
key_spec: KeyValueSpec = None,
181187
value_spec: KeyValueSpec = None,
182-
table_type: TableType = TableType.blink(),
183-
) -> PartitionedTable:
188+
table_type: TableType = None,
189+
) -> PartitionedTable:
184190
"""Consume from Kafka to a Deephaven partitioned table.
185191
186192
Args:
@@ -203,10 +209,11 @@ def consume_to_partitioned_table(
203209
It can be the result of calling one of the functions: simple_spec(),avro_spec() or json_spec() in this
204210
module, or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which
205211
works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values
206-
for dictionary keys 'deephaven.value.column.name' and 'deephaven.value.column.type', for the single resulting
212+
for dictionary keys 'deephaven.value.column.name' and 'deephaven.value.column.type', for the single
213+
resulting
207214
column name and type
208215
table_type (TableType): a TableType, specifying the type of the expected result's constituent tables,
209-
default is TableType.blink()
216+
default is None, meaning to use TableType.blink()
210217
211218
Returns:
212219
a Deephaven live partitioned table that will update based on Kafka messages consumed for the given topic,
@@ -216,7 +223,8 @@ def consume_to_partitioned_table(
216223
Raises:
217224
DHError
218225
"""
219-
226+
if table_type is None:
227+
table_type = TableType.blink()
220228
return _consume(kafka_config, topic, partitions, offsets, key_spec, value_spec, table_type, to_partitioned=True)
221229

222230

@@ -229,7 +237,7 @@ def _consume(
229237
value_spec: KeyValueSpec = None,
230238
table_type: TableType = TableType.blink(),
231239
to_partitioned: bool = False,
232-
) -> Union[Table, PartitionedTable]:
240+
) -> Union[Table, PartitionedTable]:
233241
try:
234242
partitions = j_partitions(partitions)
235243

@@ -243,8 +251,8 @@ def _consume(
243251
partitions_array = jpy.array("int", list(offsets.keys()))
244252
offsets_array = jpy.array("long", list(offsets.values()))
245253
offsets = _JKafkaTools.partitionToOffsetFromParallelArrays(
246-
partitions_array, offsets_array
247-
)
254+
partitions_array, offsets_array
255+
)
248256

249257
key_spec = KeyValueSpec.FROM_PROPERTIES if key_spec is None else key_spec
250258
value_spec = KeyValueSpec.FROM_PROPERTIES if value_spec is None else value_spec
@@ -255,29 +263,32 @@ def _consume(
255263
kafka_config = j_properties(kafka_config)
256264
if not to_partitioned:
257265
return Table(
258-
j_table=_JKafkaTools.consumeToTable(
259-
kafka_config,
260-
topic,
261-
partitions,
262-
offsets,
263-
key_spec.j_object,
264-
value_spec.j_object,
265-
table_type.j_object,
266-
)
267-
)
266+
j_table=_JKafkaTools.consumeToTable(
267+
kafka_config,
268+
topic,
269+
partitions,
270+
offsets,
271+
key_spec.j_object,
272+
value_spec.j_object,
273+
table_type.j_object,
274+
)
275+
)
268276
else:
269-
return PartitionedTable(j_partitioned_table=_JKafkaTools.consumeToPartitionedTable(
270-
kafka_config,
271-
topic,
272-
partitions,
273-
offsets,
274-
key_spec.j_object,
275-
value_spec.j_object,
276-
table_type.j_object,
277-
))
277+
return PartitionedTable(
278+
j_partitioned_table=_JKafkaTools.consumeToPartitionedTable(
279+
kafka_config,
280+
topic,
281+
partitions,
282+
offsets,
283+
key_spec.j_object,
284+
value_spec.j_object,
285+
table_type.j_object,
286+
)
287+
)
278288
except Exception as e:
279289
raise DHError(e, "failed to consume a Kafka stream.") from e
280290

291+
281292
class ProtobufProtocol(JObjectWrapper):
282293
"""The protobuf serialization / deserialization protocol."""
283294

@@ -310,7 +321,7 @@ def protobuf_spec(
310321
message_class: Optional[str] = None,
311322
include: Optional[List[str]] = None,
312323
protocol: Optional[ProtobufProtocol] = None,
313-
) -> KeyValueSpec:
324+
) -> KeyValueSpec:
314325
"""Creates a spec for parsing a Kafka protobuf stream into a Deephaven table. Uses the schema, schema_version, and
315326
schema_message_name to fetch the schema from the schema registry; or uses message_class to to get the schema from
316327
the classpath.
@@ -344,13 +355,13 @@ def protobuf_spec(
344355
parser_options_builder = _JProtobufDescriptorParserOptions.builder()
345356
if include is not None:
346357
parser_options_builder.fieldOptions(
347-
_JFieldOptions.includeIf(
348-
_JFieldPath.anyMatches(j_array_list(include))
349-
)
350-
)
358+
_JFieldOptions.includeIf(
359+
_JFieldPath.anyMatches(j_array_list(include))
360+
)
361+
)
351362
pb_consume_builder = (
352-
_JProtobufConsumeOptions.builder()
353-
.parserOptions(parser_options_builder.build())
363+
_JProtobufConsumeOptions.builder()
364+
.parserOptions(parser_options_builder.build())
354365
)
355366
if message_class:
356367
if schema or schema_version or schema_message_name:
@@ -368,16 +379,16 @@ def protobuf_spec(
368379
if protocol:
369380
pb_consume_builder.protocol(protocol.j_object)
370381
return KeyValueSpec(
371-
j_spec=_JKafkaTools_Consume.protobufSpec(pb_consume_builder.build())
372-
)
382+
j_spec=_JKafkaTools_Consume.protobufSpec(pb_consume_builder.build())
383+
)
373384

374385

375386
def avro_spec(
376387
schema: str,
377388
schema_version: str = "latest",
378389
mapping: Dict[str, str] = None,
379390
mapped_only: bool = False,
380-
) -> KeyValueSpec:
391+
) -> KeyValueSpec:
381392
"""Creates a spec for how to use an Avro schema when consuming a Kafka stream to a Deephaven table.
382393
383394
Args:
@@ -388,7 +399,8 @@ def avro_spec(
388399
the value of the Schema Server URL for fetching the schema definition
389400
schema_version (str): the schema version to fetch from schema service, default is 'latest'
390401
mapping (Dict[str, str]): a mapping from Avro field name to Deephaven table column name; the fields specified in
391-
the mapping will have their column names defined by it; if 'mapped_only' parameter is False, any other fields
402+
the mapping will have their column names defined by it; if 'mapped_only' parameter is False,
403+
any other fields
392404
not mentioned in the mapping will use the same Avro field name for Deephaven table column; otherwise, these
393405
unmapped fields will be ignored and will not be present in the resulting table. default is None
394406
mapped_only (bool): whether to ignore Avro fields not present in the 'mapping' argument, default is False
@@ -407,22 +419,22 @@ def avro_spec(
407419
jschema = _JKafkaTools.getAvroSchema(schema);
408420
if mapping:
409421
return KeyValueSpec(
410-
j_spec=_JKafkaTools_Consume.avroSpec(jschema, mapping)
411-
)
422+
j_spec=_JKafkaTools_Consume.avroSpec(jschema, mapping)
423+
)
412424
else:
413425
return KeyValueSpec(
414-
j_spec=_JKafkaTools_Consume.avroSpec(jschema)
415-
)
426+
j_spec=_JKafkaTools_Consume.avroSpec(jschema)
427+
)
416428

417429
else:
418430
if mapping:
419431
return KeyValueSpec(
420-
j_spec=_JKafkaTools_Consume.avroSpec(schema, schema_version, mapping)
421-
)
432+
j_spec=_JKafkaTools_Consume.avroSpec(schema, schema_version, mapping)
433+
)
422434
else:
423435
return KeyValueSpec(
424-
j_spec=_JKafkaTools_Consume.avroSpec(schema, schema_version)
425-
)
436+
j_spec=_JKafkaTools_Consume.avroSpec(schema, schema_version)
437+
)
426438
except Exception as e:
427439
raise DHError(e, "failed to create a Kafka key/value spec") from e
428440

@@ -457,10 +469,11 @@ def json_spec(col_defs: Union[TableDefinitionLike, List[Tuple[str, DType]]], map
457469
col_defs = [col.j_column_definition for col in table_def.values()]
458470
else:
459471
warn(
460-
'json_spec col_defs for List[Tuple[str, DType]] is deprecated for removal, prefer TableDefinitionLike',
461-
DeprecationWarning,
462-
stacklevel=2,
463-
)
472+
'json_spec col_defs for List[Tuple[str, DType]] is deprecated for removal, '
473+
'prefer TableDefinitionLike',
474+
DeprecationWarning,
475+
stacklevel=2,
476+
)
464477
col_defs = [col_def(*t).j_column_definition for t in col_defs]
465478

466479
if mapping is None:
@@ -489,8 +502,8 @@ def simple_spec(col_name: str, data_type: DType = None) -> KeyValueSpec:
489502
if data_type is None:
490503
return KeyValueSpec(j_spec=_JKafkaTools_Consume.simpleSpec(col_name))
491504
return KeyValueSpec(
492-
j_spec=_JKafkaTools_Consume.simpleSpec(col_name, data_type.qst_type.clazz())
493-
)
505+
j_spec=_JKafkaTools_Consume.simpleSpec(col_name, data_type.qst_type.clazz())
506+
)
494507
except Exception as e:
495508
raise DHError(e, "failed to create a Kafka key/value spec") from e
496509

0 commit comments

Comments
 (0)