@@ -74,7 +74,7 @@ def consume_raw(
74
74
kafka_config : dict ,
75
75
cdc_spec : CDCSpec ,
76
76
partitions = None ,
77
- table_type : TableType = None ,
77
+ table_type : TableType = TableType . stream () ,
78
78
) -> Table :
79
79
""" Consume the raw events from a Change Data Capture (CDC) Kafka stream to a Deephaven table.
80
80
@@ -85,7 +85,7 @@ def consume_raw(
85
85
and/or value Avro necessary schemas are stored.
86
86
cdc_spec (CDCSpec): a CDCSpec obtained from calling either the cdc_long_spec or the cdc_short_spec function
87
87
partitions (List[int]): a list of integer partition numbers, default is None indicating all partitions
88
- table_type (TableType): a TableType, default is None, meaning to use TableType.blink ()
88
+ table_type (TableType): a TableType enum , default is TableType.stream ()
89
89
90
90
Returns:
91
91
a Deephaven live table for the raw CDC events
@@ -96,9 +96,8 @@ def consume_raw(
96
96
try :
97
97
partitions = j_partitions (partitions )
98
98
kafka_config = j_properties (kafka_config )
99
- if table_type is None :
100
- table_type = TableType .blink ()
101
- return Table (j_table = _JCdcTools .consumeRawToTable (kafka_config , cdc_spec .j_object , partitions , table_type .j_object ))
99
+ table_type_enum = table_type .value
100
+ return Table (j_table = _JCdcTools .consumeRawToTable (kafka_config , cdc_spec .j_object , partitions , table_type_enum ))
102
101
except Exception as e :
103
102
raise DHError (e , "failed to consume a raw CDC stream." ) from e
104
103
0 commit comments