Skip to content

Commit 5c837dd

Browse files
Add blink_table option to input table factory method in pydeeephaven (deephaven#5317)
* Add blink_table to input table factory method * Update py/client/pydeephaven/session.py Co-authored-by: Chip Kent <5250374+chipkent@users.noreply.github.com> * Respond to review comments --------- Co-authored-by: Chip Kent <5250374+chipkent@users.noreply.github.com>
1 parent 2262acb commit 5c837dd

File tree

3 files changed

+94
-16
lines changed

3 files changed

+94
-16
lines changed

py/client/pydeephaven/_table_ops.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -606,17 +606,24 @@ def make_grpc_request_for_batch(self, result_id, source_id) -> Any:
606606

607607

608608
class CreateInputTableOp(TableOp):
609-
def __init__(self, schema: pa.schema, init_table: Any, key_cols: List[str] = None):
609+
def __init__(self, schema: pa.schema, init_table: Any, key_cols: List[str] = None, blink: bool = False):
610+
if blink and key_cols:
611+
raise ValueError("key columns are not supported for blink input tables.")
612+
610613
self.schema = schema
611614
self.init_table = init_table
612615
self.key_cols = key_cols
616+
self.blink = blink
613617

614618
@classmethod
615619
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
616620
return table_service_stub.CreateInputTable
617621

618622
def make_grpc_request(self, result_id, source_id) -> Any:
619-
if self.key_cols:
623+
if self.blink:
624+
blink_ = table_pb2.CreateInputTableRequest.InputTableKind.Blink()
625+
input_table_kind = table_pb2.CreateInputTableRequest.InputTableKind(blink=blink_)
626+
elif self.key_cols:
620627
key_backed = table_pb2.CreateInputTableRequest.InputTableKind.InMemoryKeyBacked(
621628
key_columns=self.key_cols)
622629
input_table_kind = table_pb2.CreateInputTableRequest.InputTableKind(in_memory_key_backed=key_backed)

py/client/pydeephaven/session.py

+9-4
Original file line numberDiff line numberDiff line change
@@ -514,14 +514,16 @@ def query(self, table: Table) -> Query:
514514
return Query(self, table)
515515

516516
def input_table(self, schema: pa.Schema = None, init_table: Table = None,
517-
key_cols: Union[str, List[str]] = None) -> InputTable:
518-
"""Creates an InputTable from either Arrow schema or initial table. When key columns are
519-
provided, the InputTable will be keyed, otherwise it will be append-only.
517+
key_cols: Union[str, List[str]] = None, blink_table: bool = False) -> InputTable:
518+
"""Creates an InputTable from either Arrow schema or initial table. When blink_table is True, the InputTable
519+
will be a blink table. When blink_table is False (default), the InputTable will be
520+
keyed if key columns are provided, otherwise it will be append-only.
520521
521522
Args:
522523
schema (pa.Schema): the schema for the InputTable
523524
init_table (Table): the initial table
524525
key_cols (Union[str, Sequence[str]): the name(s) of the key column(s)
526+
blink_table (bool): whether the InputTable should be a blink table, default is False
525527
526528
Returns:
527529
an InputTable
@@ -534,7 +536,10 @@ def input_table(self, schema: pa.Schema = None, init_table: Table = None,
534536
elif schema and init_table:
535537
raise ValueError("both arrow schema and init table are provided.")
536538

537-
table_op = CreateInputTableOp(schema=schema, init_table=init_table, key_cols=to_list(key_cols))
539+
if blink_table and key_cols:
540+
raise ValueError("key columns are not supported for blink input tables.")
541+
542+
table_op = CreateInputTableOp(schema=schema, init_table=init_table, key_cols=to_list(key_cols), blink=blink_table)
538543
input_table = self.table_service.grpc_table_op(None, table_op, table_class=InputTable)
539544
input_table.key_cols = key_cols
540545
return input_table

py/client/tests/test_session.py

+76-10
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#
22
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
33
#
4-
54
import unittest
65
from time import sleep
76

@@ -54,26 +53,21 @@ def test_time_table(self):
5453
t = session.time_table(period=100000)
5554
self.assertFalse(t.is_static)
5655
session.bind_table("t", t)
57-
session.run_script("""
56+
57+
console_script = ("""
5858
from deephaven import empty_table
5959
try:
6060
del t1
6161
except NameError:
6262
pass
6363
t1 = empty_table(0) if t.is_blink else None
6464
""")
65+
session.run_script(console_script)
6566
self.assertNotIn("t1", session.tables)
6667

6768
t = session.time_table(period=100000, blink_table=True)
6869
session.bind_table("t", t)
69-
session.run_script("""
70-
from deephaven import empty_table
71-
try:
72-
del t1
73-
except NameError:
74-
pass
75-
t1 = empty_table(0) if t.is_blink else None
76-
""")
70+
session.run_script(console_script)
7771
self.assertIn("t1", session.tables)
7872

7973
def test_merge_tables(self):
@@ -273,6 +267,78 @@ def test_auto_close(self):
273267
session = None
274268
self.assertIsNone(session)
275269

270+
def test_blink_input_table(self):
271+
pa_types = [
272+
pa.bool_(),
273+
pa.int8(),
274+
pa.int16(),
275+
pa.int32(),
276+
pa.int64(),
277+
pa.timestamp('ns', tz='UTC'),
278+
pa.float32(),
279+
pa.float64(),
280+
pa.string(),
281+
]
282+
pa_data = [
283+
pa.array([True, False]),
284+
pa.array([2 ** 7 - 1, -2 ** 7 + 1]),
285+
pa.array([2 ** 15 - 1, -2 ** 15 + 1]),
286+
pa.array([2 ** 31 - 1, -2 ** 31 + 1]),
287+
pa.array([2 ** 63 - 1, -2 ** 63 + 1]),
288+
pa.array([pd.Timestamp('2017-01-01T12:01:01', tz='UTC'),
289+
pd.Timestamp('2017-01-01T11:01:01', tz='Europe/Paris')]),
290+
pa.array([1.1, 2.2], pa.float32()),
291+
pa.array([1.1, 2.2], pa.float64()),
292+
pa.array(["foo", "bar"]),
293+
]
294+
fields = [pa.field(f"f{i}", ty) for i, ty in enumerate(pa_types)]
295+
schema = pa.schema(fields)
296+
pa_table = pa.table(pa_data, schema=schema)
297+
with Session() as session:
298+
dh_table = session.import_table(pa_table)
299+
300+
with self.subTest("Create blink Input Table"):
301+
with self.assertRaises(ValueError):
302+
session.input_table(schema=schema, key_cols="f1", blink_table=True)
303+
blink_input_table = session.input_table(schema=schema, blink_table=True)
304+
pa_table = blink_input_table.to_arrow()
305+
self.assertEqual(schema, pa_table.schema)
306+
session.bind_table("t", blink_input_table)
307+
console_script = ("""
308+
from deephaven import empty_table
309+
try:
310+
del t1
311+
except NameError:
312+
pass
313+
t1 = empty_table(0) if t.is_blink else None
314+
""")
315+
session.run_script(console_script)
316+
self.assertIn("t1", session.tables)
317+
318+
with self.assertRaises(ValueError):
319+
session.input_table(schema=schema, init_table=blink_input_table, blink_table=True)
320+
with self.assertRaises(ValueError):
321+
session.input_table(key_cols="f0", blink_table=True)
322+
323+
with self.subTest("blink InputTable ops"):
324+
session.bind_table("dh_table", dh_table)
325+
console_script = ("""
326+
from deephaven import empty_table
327+
try:
328+
del t1
329+
except NameError:
330+
pass
331+
t.add(dh_table)
332+
t.await_update()
333+
t1 = empty_table(0) if t.size == 2 else None
334+
""")
335+
session.run_script(console_script)
336+
self.assertIn("t1", session.tables)
337+
338+
with self.assertRaises(PermissionError):
339+
blink_input_table.delete(dh_table.select(["f1"]))
340+
341+
276342

277343
if __name__ == '__main__':
278344
unittest.main()

0 commit comments

Comments
 (0)