Skip to content

Commit cdb7448

Browse files
Added support for column renames
1 parent b6b76d1 commit cdb7448

File tree

2 files changed

+134
-7
lines changed

2 files changed

+134
-7
lines changed

extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocation.java

+16-5
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
import io.deephaven.api.ColumnName;
77
import io.deephaven.api.SortColumn;
8+
import io.deephaven.engine.table.ColumnDefinition;
9+
import io.deephaven.engine.table.TableDefinition;
810
import io.deephaven.engine.table.impl.locations.TableKey;
911
import io.deephaven.engine.table.impl.locations.TableLocation;
1012
import io.deephaven.iceberg.util.IcebergTableAdapter;
@@ -34,7 +36,7 @@ public IcebergTableParquetLocation(
3436
@NotNull final IcebergTableParquetLocationKey tableLocationKey,
3537
@NotNull final ParquetInstructions readInstructions) {
3638
super(tableKey, tableLocationKey, readInstructions);
37-
sortedColumns = computeSortedColumns(tableAdapter, tableLocationKey.dataFile());
39+
sortedColumns = computeSortedColumns(tableAdapter, tableLocationKey.dataFile(), readInstructions);
3840
}
3941

4042
@Override
@@ -46,7 +48,8 @@ public List<SortColumn> getSortedColumns() {
4648
@Nullable
4749
private static List<SortColumn> computeSortedColumns(
4850
@NotNull final IcebergTableAdapter tableAdapter,
49-
@NotNull final DataFile dataFile) {
51+
@NotNull final DataFile dataFile,
52+
@NotNull final ParquetInstructions readInstructions) {
5053
final Integer sortOrderId = dataFile.sortOrderId();
5154
// If sort order is missing or unknown, we cannot determine the sorted columns from the metadata and will
5255
// check the underlying parquet file for the sorted columns, when the user asks for them.
@@ -67,12 +70,20 @@ private static List<SortColumn> computeSortedColumns(
6770
// TODO (DH-18160): Improve support for handling non-identity transforms
6871
break;
6972
}
70-
final ColumnName columnName = ColumnName.of(schema.findColumnName(field.sourceId()));
73+
final String icebergColName = schema.findColumnName(field.sourceId());
74+
final String dhColName = readInstructions.getColumnNameFromParquetColumnNameOrDefault(icebergColName);
75+
final TableDefinition tableDefinition = readInstructions.getTableDefinition().orElseThrow(
76+
() -> new IllegalStateException("Table definition is required for reading from Iceberg tables"));
77+
final ColumnDefinition<?> columnDef = tableDefinition.getColumn(dhColName);
78+
if (columnDef == null) {
79+
// Table definition provided by the user doesn't have this column, so stop here
80+
break;
81+
}
7182
final SortColumn sortColumn;
7283
if (field.nullOrder() == NullOrder.NULLS_FIRST && field.direction() == SortDirection.ASC) {
73-
sortColumn = SortColumn.asc(columnName);
84+
sortColumn = SortColumn.asc(ColumnName.of(dhColName));
7485
} else if (field.nullOrder() == NullOrder.NULLS_LAST && field.direction() == SortDirection.DESC) {
75-
sortColumn = SortColumn.desc(columnName);
86+
sortColumn = SortColumn.desc(ColumnName.of(dhColName));
7687
} else {
7788
break;
7889
}

extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java

+118-2
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.iceberg.types.Types;
4949
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
5050
import org.apache.parquet.schema.MessageType;
51+
import org.jetbrains.annotations.NotNull;
5152
import org.jetbrains.annotations.Nullable;
5253
import org.junit.jupiter.api.AfterEach;
5354
import org.junit.jupiter.api.BeforeEach;
@@ -1061,6 +1062,15 @@ private void verifySortOrder(
10611062
final IcebergTableAdapter tableAdapter,
10621063
final TableIdentifier tableIdentifier,
10631064
final List<List<SortColumn>> expectedSortOrders) {
1065+
verifySortOrder(tableAdapter, tableIdentifier, expectedSortOrders,
1066+
ParquetInstructions.EMPTY.withTableDefinition(tableAdapter.definition()));
1067+
}
1068+
1069+
private void verifySortOrder(
1070+
@NotNull final IcebergTableAdapter tableAdapter,
1071+
@NotNull final TableIdentifier tableIdentifier,
1072+
@NotNull final List<List<SortColumn>> expectedSortOrders,
1073+
@NotNull final ParquetInstructions readInstructions) {
10641074
final org.apache.iceberg.Table icebergTable = tableAdapter.icebergTable();
10651075
final String uriScheme = locationUri(icebergTable).getScheme();
10661076
final SeekableChannelsProvider seekableChannelsProvider =
@@ -1078,9 +1088,9 @@ private void verifySortOrder(
10781088
StandaloneTableKey.getInstance(),
10791089
new IcebergTableParquetLocationKey(
10801090
null, null, tableIdentifier, manifestFile, dataFile,
1081-
dataFileUri(icebergTable, dataFile), 0, Map.of(), ParquetInstructions.EMPTY,
1091+
dataFileUri(icebergTable, dataFile), 0, Map.of(), readInstructions,
10821092
seekableChannelsProvider),
1083-
ParquetInstructions.EMPTY);
1093+
readInstructions);
10841094
actualSortOrders.add(tableLocation.getSortedColumns());
10851095
}
10861096
}
@@ -1347,6 +1357,112 @@ void testFailIfSortOrderUnmapped() {
13471357
assertTableEquals(source, fromIceberg);
13481358
}
13491359

1360+
@Test
1361+
void testSortOrderWithColumnRename() {
1362+
final Table source = TableTools.newTable(
1363+
intCol("intCol", 15, 0, 32, 33, 19),
1364+
doubleCol("doubleCol", 10.5, 2.5, 3.5, 40.5, 0.5),
1365+
longCol("longCol", 20L, 50L, 0L, 10L, 5L));
1366+
final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable");
1367+
final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, source.getDefinition());
1368+
1369+
// Update the default sort order of the underlying iceberg table
1370+
final org.apache.iceberg.Table icebergTable = tableAdapter.icebergTable();
1371+
icebergTable.replaceSortOrder().asc("intCol").desc("doubleCol").commit();
1372+
1373+
// Append data to the table
1374+
final IcebergTableWriter tableWriterWithSorting = tableAdapter.tableWriter(writerOptionsBuilder()
1375+
.tableDefinition(source.getDefinition())
1376+
.build());
1377+
tableWriterWithSorting.append(IcebergWriteInstructions.builder()
1378+
.addTables(source)
1379+
.build());
1380+
1381+
// Now read a table with a column rename
1382+
final IcebergReadInstructions readInstructions = IcebergReadInstructions.builder()
1383+
.putColumnRenames("intCol", "renamedIntCol")
1384+
.build();
1385+
final Table fromIceberg = tableAdapter.table(readInstructions);
1386+
final Table expected = source.renameColumns("renamedIntCol = intCol")
1387+
.sort(List.of(SortColumn.asc(ColumnName.of("renamedIntCol")),
1388+
SortColumn.desc(ColumnName.of("doubleCol"))));
1389+
assertTableEquals(expected, fromIceberg);
1390+
1391+
// Verify that the sort order is still applied
1392+
final ParquetInstructions parquetInstructions = ParquetInstructions.builder()
1393+
.addColumnNameMapping("intCol", "renamedIntCol")
1394+
.setTableDefinition(expected.getDefinition())
1395+
.build();
1396+
verifySortOrder(tableAdapter, tableIdentifier, List.of(
1397+
List.of(SortColumn.asc(ColumnName.of("renamedIntCol")), SortColumn.desc(ColumnName.of("doubleCol")))),
1398+
parquetInstructions);
1399+
}
1400+
1401+
@Test
1402+
void testSortOrderWithTableDefinition() {
1403+
final Table source = TableTools.newTable(
1404+
intCol("intCol", 15, 0, 32, 33, 19),
1405+
doubleCol("doubleCol", 10.5, 2.5, 3.5, 40.5, 0.5),
1406+
longCol("longCol", 20L, 50L, 0L, 10L, 5L));
1407+
final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable");
1408+
final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, source.getDefinition());
1409+
1410+
// Update the default sort order of the underlying iceberg table
1411+
final org.apache.iceberg.Table icebergTable = tableAdapter.icebergTable();
1412+
icebergTable.replaceSortOrder().asc("intCol").desc("doubleCol").commit();
1413+
1414+
// Append data to the table
1415+
final IcebergTableWriter tableWriterWithSorting = tableAdapter.tableWriter(writerOptionsBuilder()
1416+
.tableDefinition(source.getDefinition())
1417+
.build());
1418+
tableWriterWithSorting.append(IcebergWriteInstructions.builder()
1419+
.addTables(source)
1420+
.build());
1421+
1422+
{
1423+
// Now read a table with a different table definition skipping the "doubleCol"
1424+
final TableDefinition tableDefinition = TableDefinition.of(
1425+
ColumnDefinition.ofInt("intCol"),
1426+
ColumnDefinition.ofLong("longCol"));
1427+
final IcebergReadInstructions readInstructions = IcebergReadInstructions.builder()
1428+
.tableDefinition(tableDefinition)
1429+
.build();
1430+
final Table fromIceberg = tableAdapter.table(readInstructions);
1431+
final Table expected = source.dropColumns("doubleCol")
1432+
.sort(List.of(SortColumn.asc(ColumnName.of("intCol"))));
1433+
assertTableEquals(expected, fromIceberg);
1434+
1435+
// Verify that the sort order is still applied for the first column
1436+
final ParquetInstructions parquetInstructions = ParquetInstructions.builder()
1437+
.setTableDefinition(tableDefinition)
1438+
.build();
1439+
verifySortOrder(tableAdapter, tableIdentifier, List.of(
1440+
List.of(SortColumn.asc(ColumnName.of("intCol")))),
1441+
parquetInstructions);
1442+
}
1443+
1444+
{
1445+
// Now read the table with a different table definition skipping the "intCol"
1446+
final TableDefinition tableDefinition = TableDefinition.of(
1447+
ColumnDefinition.ofDouble("doubleCol"),
1448+
ColumnDefinition.ofLong("longCol"));
1449+
final IcebergReadInstructions readInstructions = IcebergReadInstructions.builder()
1450+
.tableDefinition(tableDefinition)
1451+
.build();
1452+
final Table fromIceberg = tableAdapter.table(readInstructions);
1453+
final Table expected = source
1454+
.sort(List.of(SortColumn.asc(ColumnName.of("intCol")), SortColumn.desc(ColumnName.of("doubleCol"))))
1455+
.dropColumns("intCol");
1456+
assertTableEquals(expected, fromIceberg);
1457+
1458+
// Verify that the sort order is not applied for any columns since the first sorted column is skipped
1459+
final ParquetInstructions parquetInstructions = ParquetInstructions.builder()
1460+
.setTableDefinition(tableDefinition)
1461+
.build();
1462+
verifySortOrder(tableAdapter, tableIdentifier, List.of(List.of()), parquetInstructions);
1463+
}
1464+
}
1465+
13501466
@Test
13511467
void appendTableWithAndWithoutDataInstructionsTest() {
13521468
final Table source = TableTools.newTable(

0 commit comments

Comments
 (0)