Skip to content

Commit

Permalink
Python review Part 1
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Mar 7, 2025
1 parent dcc0b99 commit af24391
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ private static List<SortColumn> computeSortedColumns(
} else if (field.nullOrder() == NullOrder.NULLS_LAST && field.direction() == SortDirection.DESC) {
sortColumn = SortColumn.desc(columnName);
} else {
// TODO Check with Devin if this is okay, The assumption here is that deephaven sorts nulls first for
// ascending order and nulls last for descending, so if we don't have the correct nulls order, we
// cannot use the column as a sort column
break;
}
sortColumns.add(sortColumn);
Expand Down
100 changes: 52 additions & 48 deletions py/server/deephaven/experimental/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

class IcebergUpdateMode(JObjectWrapper):
"""
:class:`.IcebergUpdateMode` specifies the update mode for an Iceberg table to be loaded into Deephaven. The modes
`IcebergUpdateMode` specifies the update mode for an Iceberg table to be loaded into Deephaven. The modes
are:
- :py:func:`static() <IcebergUpdateMode.static>`: The table is loaded once and does not change
Expand Down Expand Up @@ -87,7 +87,7 @@ def j_object(self) -> jpy.JType:

class IcebergReadInstructions(JObjectWrapper):
"""
:class:`.IcebergReadInstructions` specifies the instructions for reading an Iceberg table into Deephaven. These
`IcebergReadInstructions` specifies the instructions for reading an Iceberg table into Deephaven. These
include column rename instructions and table definitions, as well as special data instructions for loading data
files from the cloud.
"""
Expand Down Expand Up @@ -149,7 +149,7 @@ def j_object(self) -> jpy.JType:

class IcebergWriteInstructions(JObjectWrapper):
"""
:class:`.IcebergWriteInstructions` provides instructions intended for writing deephaven tables as partitions to Iceberg
`IcebergWriteInstructions` provides instructions intended for writing deephaven tables as partitions to Iceberg
tables.
"""

Expand All @@ -164,13 +164,13 @@ def __init__(self,
Args:
tables (Union[Table, Sequence[Table]]): The deephaven tables to write.
partition_paths (Optional[Union[str, Sequence[str]]]): The partition paths where each table will be written.
For example, if the iceberg table is partitioned by "year" and "month", a partition path could be
For example, if the Iceberg table is partitioned by "year" and "month", a partition path could be
"year=2021/month=01".
If writing to a partitioned iceberg table, users must provide partition path for each table in tables
If writing to a partitioned Iceberg table, users must provide partition path for each table in tables
argument in the same order.
Else when writing to a non-partitioned table, users should not provide any partition paths.
Defaults to `None`, which means the deephaven tables will be written to the root data directory of the
iceberg table.
Iceberg table.
Raises:
DHError: If unable to build the instructions object.
Expand Down Expand Up @@ -204,7 +204,7 @@ def j_object(self) -> jpy.JType:

class SchemaProvider(JObjectWrapper):
"""
:class:`.SchemaProvider` is used to extract the schema from an Iceberg table. Users can specify multiple ways to do
`SchemaProvider` is used to extract the schema from an Iceberg table. Users can specify multiple ways to do
so, for example, by schema ID, snapshot ID, current schema, etc. This can be useful for passing a schema when
writing to an Iceberg table.
"""
Expand All @@ -213,10 +213,10 @@ class SchemaProvider(JObjectWrapper):

def __init__(self, _j_object: jpy.JType):
"""
Initializes the :class:`.SchemaProvider` object.
Initializes the `SchemaProvider` object.
Args:
_j_object (SchemaProvider): the Java :class:`.SchemaProvider` object.
_j_object (SchemaProvider): the Java `SchemaProvider` object.
"""
self._j_object = _j_object

Expand All @@ -230,7 +230,7 @@ def from_current(cls) -> 'SchemaProvider':
Used for extracting the current schema from the table.
Returns:
the SchemaProvider object.
the `SchemaProvider` object.
"""
return cls(_JSchemaProvider.fromCurrent())

Expand All @@ -243,7 +243,7 @@ def from_schema_id(cls, schema_id: int) -> 'SchemaProvider':
schema_id (int): the schema id to use.
Returns:
the :class:`.SchemaProvider` object.
the `SchemaProvider` object.
"""
return cls(_JSchemaProvider.fromSchemaId(schema_id))

Expand All @@ -256,7 +256,7 @@ def from_snapshot_id(cls, snapshot_id: int) -> 'SchemaProvider':
snapshot_id (int): the snapshot id to use.
Returns:
the :class:`.SchemaProvider` object.
the `SchemaProvider` object.
"""
return cls(_JSchemaProvider.fromSnapshotId(snapshot_id))

Expand All @@ -273,19 +273,20 @@ def from_current_snapshot(cls) -> 'SchemaProvider':

class SortOrderProvider(JObjectWrapper):
"""
:class:`.SortOrderProvider` is used for providing SortOrder to be used for sorting new data while writing to an
iceberg table using this writer. Users can specify multiple ways to do so, for example, by sort ID, table default,
etc.
`SortOrderProvider` is used to specify the sort order for new data when writing to an Iceberg table. More details
about sort order can be found in the Iceberg spec: https://iceberg.apache.org/spec/#sorting.
Users can specify the sort order in multiple ways, such as by providing a sort ID or using the table's default sort
order. This class consists of factory methods to create different sort order providers.
"""

j_object_type = _JSortOrderProvider

def __init__(self, _j_object: jpy.JType):
"""
Initializes the :class:`.SortOrderProvider` object.
Initializes the `SortOrderProvider` object.
Args:
_j_object (SortOrderProvider): the Java :class:`.SortOrderProvider` object.
_j_object (SortOrderProvider): the Java `SortOrderProvider` object.
"""
self._j_object = _j_object

Expand All @@ -296,10 +297,10 @@ def j_object(self) -> jpy.JType:
@classmethod
def unsorted(cls) -> 'SortOrderProvider':
"""
Used to disable sorting while writing new data to the iceberg table.
Used to disable sorting while writing new data to the Iceberg table.
Returns:
the SortOrderProvider object.
the `SortOrderProvider` object.
"""
return cls(_JSortOrderProvider.unsorted())

Expand All @@ -310,54 +311,55 @@ def use_table_default(cls) -> 'SortOrderProvider':
will be done.
Returns:
the :class:`.SortOrderProvider` object.
the `SortOrderProvider` object.
"""
return cls(_JSortOrderProvider.useTableDefault())

@classmethod
def from_sort_id(cls, sort_order_id: int) -> 'SortOrderProvider':
"""
Use the sort order with the given ID to sort new data while writing to the iceberg table.
Use the sort order with the given ID to sort new data while writing to the Iceberg table.
Args:
sort_order_id (int): the id of the sort order to use.
Returns:
the :class:`.SortOrderProvider` object.
the `.SortOrderProvider` object.
"""
return cls(_JSortOrderProvider.fromSortId(sort_order_id))

def with_id(self, sort_order_id: int) -> 'SortOrderProvider':
"""
Returns a sort order provider that delegates to this provider for computing the columns to sort on, but writes a
different sort order ID to the iceberg table.
For example, this provider might return fields {A, B, C} to sort on, but the ID written to iceberg corresponds
to sort order with fields {A, B}.
Returns a sort order provider that uses the current provider to determine the columns to sort on, but writes a
different sort order ID to the Iceberg table.
For example, this provider might sort by columns {A, B, C}, but the ID written to Iceberg corresponds to a sort
order with columns {A, B}.
Args:
sort_order_id (int): the sort order ID to write to the iceberg table.
sort_order_id (int): the sort order ID to write to the Iceberg table.
Returns:
the :class:`.SortOrderProvider` object.
the `SortOrderProvider` object.
"""
return SortOrderProvider(self._j_object.withId(sort_order_id))

def with_fail_on_unmapped(self, fail_on_unmapped: bool) -> 'SortOrderProvider':
"""
Returns a sort order provider which will fail, if for any reason, the sort order cannot be applied to the
tables being written. By default, the provider will not fail if the sort order cannot be applied.
Returns a sort order provider that will fail if the sort order cannot be applied to the tables being written.
By default, if the sort order cannot be applied, the tables will be written without sorting.
Args:
fail_on_unmapped: whether to fail if the sort order cannot be applied to the tables being written
Returns:
the :class:`.SortOrderProvider` object.
the `SortOrderProvider` object.
"""
return SortOrderProvider(self._j_object.withFailOnUnmapped(fail_on_unmapped))


class TableParquetWriterOptions(JObjectWrapper):
"""
:class:`.TableParquetWriterOptions` provides specialized instructions for configuring :class:`.IcebergTableWriter`
`TableParquetWriterOptions` provides specialized instructions for configuring `IcebergTableWriter`
instances.
"""

Expand All @@ -380,7 +382,7 @@ def __init__(self,
table_definition: TableDefinitionLike: The table definition to use when writing Iceberg data files using
this writer instance. This definition can be used to skip some columns or add additional columns with
null values. The provided definition should have at least one column.
schema_provider: Optional[SchemaProvider]: Used to extract a Schema from a iceberg table. This schema will
schema_provider: Optional[SchemaProvider]: Used to extract a Schema from an Iceberg table. This schema will
be used in conjunction with the field_id_to_column_name to map Deephaven columns from table_definition
to Iceberg columns.
Defaults to `None`, which means use the current schema from the table.
Expand All @@ -398,10 +400,12 @@ def __init__(self,
`None`, which means use 2^20 (1,048,576)
target_page_size (Optional[int]): the target Parquet file page size in bytes, if not specified. Defaults to
`None`, which means use 2^20 bytes (1 MiB)
sort_order_provider: Optional[SortOrderProvider]: Used to provide SortOrder to be used for sorting new data
while writing to an iceberg table using this writer. Note that we select the sort order of the Table at
the time the writer is constructed, and it does not change if the table's sort order changes. Defaults
to `None`, which means use the table's default sort order.
sort_order_provider (Optional[SortOrderProvider]): Specifies the sort order to use for sorting new data
when writing to an Iceberg table with this writer. The sort order is determined at the time the writer
is created and does not change if the table's sort order changes later. Defaults to `None`, which means
the table's default sort order is used. More details about sort order can be found in the Iceberg
spec: https://iceberg.apache.org/spec/#sorting
Raises:
DHError: If unable to build the object.
Expand Down Expand Up @@ -449,7 +453,7 @@ def j_object(self) -> jpy.JType:

class IcebergTable(Table):
"""
:class:`.IcebergTable` is a subclass of Table that allows users to dynamically update the table with new snapshots
`IcebergTable` is a subclass of Table that allows users to dynamically update the table with new snapshots
from the Iceberg catalog.
"""
j_object_type = _JIcebergTable
Expand Down Expand Up @@ -488,8 +492,8 @@ def j_object(self) -> jpy.JType:

class IcebergTableWriter(JObjectWrapper):
"""
:class:`.IcebergTableWriter` is responsible for writing Deephaven tables to an Iceberg table. Each
:class:`.IcebergTableWriter` instance associated with a single :class:`.IcebergTableAdapter` and can be used to
`IcebergTableWriter` is responsible for writing Deephaven tables to an Iceberg table. Each
`IcebergTableWriter` instance associated with a single `IcebergTableAdapter` and can be used to
write multiple Deephaven tables to this Iceberg table.
"""
j_object_type = _JIcebergTableWriter or type(None)
Expand All @@ -504,7 +508,7 @@ def append(self, instructions: IcebergWriteInstructions):
partition paths where each table will be written using the :attr:`.IcebergWriteInstructions.partition_paths`
parameter.
This method will not perform any compatibility checks between the existing schema and the provided Deephaven
tables. All such checks happen at the time of creation of the :class:`.IcebergTableWriter` instance.
tables. All such checks happen at the time of creation of the `IcebergTableWriter` instance.
Args:
instructions (IcebergWriteInstructions): the customization instructions for write.
Expand All @@ -518,7 +522,7 @@ def j_object(self) -> jpy.JType:

class IcebergTableAdapter(JObjectWrapper):
"""
:class:`.IcebergTableAdapter` provides an interface for interacting with Iceberg tables. It allows the user to list
`IcebergTableAdapter` provides an interface for interacting with Iceberg tables. It allows the user to list
snapshots, retrieve table definitions and reading Iceberg tables into Deephaven tables.
"""
j_object_type = _JIcebergTableAdapter or type(None)
Expand Down Expand Up @@ -579,7 +583,7 @@ def table(self, instructions: Optional[IcebergReadInstructions] = None) -> Icebe

def table_writer(self, writer_options: TableParquetWriterOptions) -> IcebergTableWriter:
"""
Create a new :class:`.IcebergTableWriter` for this Iceberg table using the provided writer options.
Create a new `IcebergTableWriter` for this Iceberg table using the provided writer options.
This method will perform schema validation to ensure that the provided table definition from the writer options
is compatible with the Iceberg table schema. All further writes performed by the returned writer will not be
validated against the table's schema, and thus will be faster.
Expand All @@ -599,7 +603,7 @@ def j_object(self) -> jpy.JType:

class IcebergCatalogAdapter(JObjectWrapper):
"""
:class:`.IcebergCatalogAdapter` provides an interface for interacting with Iceberg catalogs. It allows listing
`IcebergCatalogAdapter` provides an interface for interacting with Iceberg catalogs. It allows listing
namespaces, tables and snapshots, as well as reading Iceberg tables into Deephaven tables.
"""
j_object_type = _JIcebergCatalogAdapter or type(None)
Expand Down Expand Up @@ -660,7 +664,7 @@ def create_table(self, table_identifier: str, table_definition: TableDefinitionL
table_definition (TableDefinitionLike): the table definition of the new table.
Returns:
:class:`.IcebergTableAdapter`: the table adapter for the new Iceberg table.
`IcebergTableAdapter`: the table adapter for the new Iceberg table.
"""

return IcebergTableAdapter(self.j_object.createTable(table_identifier,
Expand Down Expand Up @@ -700,7 +704,7 @@ def adapter_s3_rest(
need to set this; it is most useful when connecting to non-AWS, S3-compatible APIs.
Returns:
:class:`.IcebergCatalogAdapter`: the catalog adapter for the provided S3 REST catalog.
`IcebergCatalogAdapter`: the catalog adapter for the provided S3 REST catalog.
Raises:
DHError: If unable to build the catalog adapter.
Expand Down Expand Up @@ -738,7 +742,7 @@ def adapter_aws_glue(
catalog URI.
Returns:
:class:`.IcebergCatalogAdapter`: the catalog adapter for the provided AWS Glue catalog.
`IcebergCatalogAdapter`: the catalog adapter for the provided AWS Glue catalog.
Raises:
DHError: If unable to build the catalog adapter.
Expand Down Expand Up @@ -834,7 +838,7 @@ def adapter(
hadoop_config (Optional[Dict[str, str]]): hadoop configuration properties for the catalog to load
s3_instructions (Optional[s3.S3Instructions]): the S3 instructions if applicable
Returns:
:class:`.IcebergCatalogAdapter`: the catalog adapter created from the provided properties
`IcebergCatalogAdapter`: the catalog adapter created from the provided properties
Raises:
DHError: If unable to build the catalog adapter
Expand Down

0 comments on commit af24391

Please sign in to comment.