Skip to content

Commit

Permalink
Review with Devin Part 1
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Mar 5, 2025
1 parent 1b7fbda commit e029f67
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

public class IcebergTableParquetLocation extends ParquetTableLocation implements TableLocation {

private volatile List<SortColumn> sortedColumns;
@Nullable
private final List<SortColumn> sortedColumns;

public IcebergTableParquetLocation(
@NotNull final IcebergTableAdapter tableAdapter,
Expand All @@ -39,18 +40,7 @@ public IcebergTableParquetLocation(
@Override
@NotNull
public List<SortColumn> getSortedColumns() {
List<SortColumn> local;
if ((local = sortedColumns) != null) {
return local;
}
synchronized (this) {
if ((local = sortedColumns) != null) {
return local;
}
local = Collections.unmodifiableList(super.getSortedColumns());
sortedColumns = local;
return local;
}
return sortedColumns == null ? super.getSortedColumns() : sortedColumns;
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,39 @@
public interface SchemaProvider {

// Static factory methods for creating SchemaProvider instances

/**
* Use the current schema from the table.
*/
static SchemaProvider fromCurrent() {
return new SchemaProviderInternal.CurrentSchemaProvider();
return SchemaProviderInternal.CurrentSchemaProvider.INSTANCE;
}

/**
* Use the schema with the given ID from the table.
*/
static SchemaProvider fromSchemaId(final int id) {
return new SchemaProviderInternal.IdSchemaProvider(id);
}

/**
* Use the given schema directly.
*/
static SchemaProvider fromSchema(final Schema schema) {
return new SchemaProviderInternal.DirectSchemaProvider(schema);
}

/**
* Use the schema from the snapshot with the given ID.
*/
static SchemaProvider fromSnapshotId(final int snapshotId) {
return new SchemaProviderInternal.SnapshotIdSchemaProvider(snapshotId);
}

/**
* Use the schema from the current snapshot of the table.
*/
static SchemaProvider fromCurrentSnapshot() {
return new SchemaProviderInternal.CurrentSnapshotSchemaProvider();
return SchemaProviderInternal.CurrentSnapshotSchemaProvider.INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ interface SchemaProviderImpl {
}

// Implementations of SchemaProvider
static class CurrentSchemaProvider implements SchemaProvider, SchemaProviderImpl {
enum CurrentSchemaProvider implements SchemaProvider, SchemaProviderImpl {
INSTANCE;

@Override
public Schema getSchema(final Table table) {
return getCurrentSchema(table);
Expand Down Expand Up @@ -66,7 +68,9 @@ public Schema getSchema(final Table table) {
}
}

static class CurrentSnapshotSchemaProvider implements SchemaProvider, SchemaProviderImpl {
enum CurrentSnapshotSchemaProvider implements SchemaProvider, SchemaProviderImpl {
INSTANCE;

@Override
public Schema getSchema(final Table table) {
return getSchemaForCurrentSnapshot(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@ public interface SortOrderProvider {
/**
* Do not sort the data while writing new data to the iceberg table.
*/
static SortOrderProvider disableSorting() {
return SortOrderProviderInternal.DisableSorting.getInstance();
static SortOrderProvider unsorted() {
return SortOrderProviderInternal.DisableSorting.INSTANCE;
}

/**
* Use the default {@link org.apache.iceberg.Table#sortOrder()} of the table while writing new data.
* Use the default {@link org.apache.iceberg.Table#sortOrder()} of the table while writing new data. If no sort
* order is set on the table, no sorting will be done.
*/
static SortOrderProvider useTableDefault() {
return SortOrderProviderInternal.TableDefaultSortOrderProvider.getInstance();
return SortOrderProviderInternal.TableDefaultSortOrderProvider.INSTANCE;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.jetbrains.annotations.NotNull;

/**
* Internal class containing the implementations of {@link SortOrderProvider}.
Expand All @@ -15,39 +16,29 @@ interface SortOrderProviderImpl {
/**
* Returns the {@link SortOrder} to use when writing to the given table based on this {@link SortOrderProvider}.
*/
@NotNull
SortOrder getSortOrder(Table table);
}

// Implementations of SortOrderProvider
static class DisableSorting implements SortOrderProvider, SortOrderProviderImpl {

private static final DisableSorting INSTANCE = new DisableSorting();

private DisableSorting() {}

static DisableSorting getInstance() {
return INSTANCE;
}
enum DisableSorting implements SortOrderProvider, SortOrderProviderImpl {
INSTANCE;

@Override
@NotNull
public SortOrder getSortOrder(final Table table) {
return SortOrder.unsorted();
}
}

static class TableDefaultSortOrderProvider implements SortOrderProvider, SortOrderProviderImpl {

private static final TableDefaultSortOrderProvider INSTANCE = new TableDefaultSortOrderProvider();

private TableDefaultSortOrderProvider() {}

static TableDefaultSortOrderProvider getInstance() {
return INSTANCE;
}
enum TableDefaultSortOrderProvider implements SortOrderProvider, SortOrderProviderImpl {
INSTANCE;

@Override
@NotNull
public SortOrder getSortOrder(final Table table) {
return table.sortOrder();
final SortOrder sortOrder = table.sortOrder();
return sortOrder != null ? sortOrder : SortOrder.unsorted();
}
}

Expand All @@ -59,6 +50,7 @@ static class IdSortOrderProvider implements SortOrderProvider, SortOrderProvider
}

@Override
@NotNull
public SortOrder getSortOrder(final Table table) {
if (!table.sortOrders().containsKey(sortOrderId)) {
throw new IllegalArgumentException("Sort order with ID " + sortOrderId + " not found for table " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ public abstract class TableWriterOptions {
* {@link #fieldIdToColumnName()} to map Deephaven columns from {@link #tableDefinition()} to Iceberg columns. If
* {@link #fieldIdToColumnName()} is not provided, the mapping is done by column name.
* <p>
* Users can specify how to extract the schema in multiple ways (by schema ID, snapshot ID, etc.).
* <p>
* Defaults to {@link SchemaProvider#fromCurrent()}, which means use the current schema from the table.
* Defaults to {@link SchemaProvider#fromCurrent()}.
*/
@Value.Default
public SchemaProvider schemaProvider() {
Expand All @@ -63,12 +61,10 @@ Map<String, Integer> dhColumnNameToFieldId() {

/**
* Used for providing {@link org.apache.iceberg.SortOrder} to sort new data while writing to an iceberg table using
* this writer.
* <p>
* Users can specify how to extract the schema in multiple ways (like disable sorting, use table default, by ID).
* this writer. Note that we select the sort order of the Table at the time the writer is constructed, and it does
* not change if the table's sort order changes.
* <p>
* Defaults to {@link SortOrderProvider#useTableDefault()}, which means sort new data using the table's default sort
* order.
* Defaults to {@link SortOrderProvider#useTableDefault()}.
*/
@Value.Default
public SortOrderProvider sortOrderProvider() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1195,7 +1195,7 @@ void testApplySortOrderByID() {
{
final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder()
.tableDefinition(source.getDefinition())
.sortOrderProvider(SortOrderProvider.disableSorting())
.sortOrderProvider(SortOrderProvider.unsorted())
.build());
tableWriter.append(IcebergWriteInstructions.builder()
.addTables(source)
Expand Down
13 changes: 7 additions & 6 deletions py/server/deephaven/experimental/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,19 +294,20 @@ def j_object(self) -> jpy.JType:
return self._j_object

@classmethod
def disable_sorting(cls) -> 'SortOrderProvider':
def unsorted(cls) -> 'SortOrderProvider':
"""
Used to disable sorting while writing new data to the iceberg table.
Returns:
the SortOrderProvider object.
"""
return cls(_JSortOrderProvider.disableSorting())
return cls(_JSortOrderProvider.unsorted())

@classmethod
def use_table_default(cls) -> 'SortOrderProvider':
"""
Use the default sort order of the table while writing new data.
Use the default sort order of the table while writing new data. If no sort order is set on the table, no sorting
will be done.
Returns:
the :class:`.SortOrderProvider` object.
Expand Down Expand Up @@ -355,7 +356,6 @@ def __init__(self,
schema_provider: Optional[SchemaProvider]: Used to extract a Schema from a iceberg table. This schema will
be used in conjunction with the field_id_to_column_name to map Deephaven columns from table_definition
to Iceberg columns.
Users can specify how to extract the schema in multiple ways (by ID, snapshot ID, initial schema, etc.).
Defaults to `None`, which means use the current schema from the table.
field_id_to_column_name: Optional[Dict[int, str]]: A one-to-one map from Iceberg field IDs from the
schema_spec to Deephaven column names from the table_definition.
Expand All @@ -372,8 +372,9 @@ def __init__(self,
target_page_size (Optional[int]): the target Parquet file page size in bytes, if not specified. Defaults to
`None`, which means use 2^20 bytes (1 MiB)
sort_order_provider: Optional[SortOrderProvider]: Used to provide SortOrder to be used for sorting new data
while writing to an iceberg table using this writer. Users can specify multiple ways to do so, for
example, by sort ID, table default, etc. Defaults to `None`, which means use the table's default sort order.
while writing to an iceberg table using this writer. Note that we select the sort order of the Table at
the time the writer is constructed, and it does not change if the table's sort order changes. Defaults
to `None`, which means use the table's default sort order.
Raises:
DHError: If unable to build the object.
Expand Down

0 comments on commit e029f67

Please sign in to comment.