Skip to content

Commit 9b2ac7e

Browse files
Review with Devin contd.
1 parent 809292b commit 9b2ac7e

File tree

5 files changed

+88
-125
lines changed

5 files changed

+88
-125
lines changed

extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java

+57-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
//
44
package io.deephaven.iceberg.layout;
55

6+
import io.deephaven.api.ColumnName;
7+
import io.deephaven.api.SortColumn;
8+
import io.deephaven.engine.table.ColumnDefinition;
69
import io.deephaven.engine.table.TableDefinition;
710
import io.deephaven.engine.table.impl.locations.TableDataException;
811
import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder;
@@ -20,8 +23,11 @@
2023
import org.apache.iceberg.catalog.TableIdentifier;
2124
import org.jetbrains.annotations.NotNull;
2225
import org.jetbrains.annotations.Nullable;
26+
import org.jetbrains.annotations.VisibleForTesting;
2327

2428
import java.net.URI;
29+
import java.util.ArrayList;
30+
import java.util.Collections;
2531
import java.util.HashMap;
2632
import java.util.List;
2733
import java.util.Map;
@@ -107,8 +113,9 @@ protected IcebergTableLocationKey locationKey(
107113
@NotNull final SeekableChannelsProvider channelsProvider) {
108114
final org.apache.iceberg.FileFormat format = dataFile.format();
109115
if (format == org.apache.iceberg.FileFormat.PARQUET) {
110-
return new IcebergTableParquetLocationKey(catalogName, tableUuid, tableIdentifier, tableAdapter,
111-
manifestFile, dataFile, fileUri, 0, partitions, parquetInstructions, channelsProvider);
116+
return new IcebergTableParquetLocationKey(catalogName, tableUuid, tableIdentifier, manifestFile, dataFile,
117+
fileUri, 0, partitions, parquetInstructions, channelsProvider,
118+
computeSortedColumns(tableAdapter.icebergTable(), dataFile, parquetInstructions));
112119
}
113120
throw new UnsupportedOperationException(String.format("%s:%d - an unsupported file format %s for URI '%s'",
114121
tableAdapter, snapshot.snapshotId(), format, fileUri));
@@ -260,4 +267,52 @@ protected void updateSnapshot(@NotNull final Snapshot updateSnapshot) {
260267

261268
snapshot = updateSnapshot;
262269
}
270+
271+
@VisibleForTesting
272+
@NotNull
273+
public static List<SortColumn> computeSortedColumns(
274+
@NotNull final org.apache.iceberg.Table icebergTable,
275+
@NotNull final DataFile dataFile,
276+
@NotNull final ParquetInstructions readInstructions) {
277+
final Integer sortOrderId = dataFile.sortOrderId();
278+
// If sort order is missing or unknown, we cannot determine the sorted columns from the metadata and will
279+
// check the underlying parquet file for the sorted columns, when the user asks for them.
280+
if (sortOrderId == null) {
281+
return Collections.emptyList();
282+
}
283+
final SortOrder sortOrder = icebergTable.sortOrders().get(sortOrderId);
284+
if (sortOrder == null) {
285+
return Collections.emptyList();
286+
}
287+
if (sortOrder.isUnsorted()) {
288+
return Collections.emptyList();
289+
}
290+
final Schema schema = sortOrder.schema();
291+
final List<SortColumn> sortColumns = new ArrayList<>(sortOrder.fields().size());
292+
for (final SortField field : sortOrder.fields()) {
293+
if (!field.transform().isIdentity()) {
294+
// TODO (DH-18160): Improve support for handling non-identity transforms
295+
break;
296+
}
297+
final String icebergColName = schema.findColumnName(field.sourceId());
298+
final String dhColName = readInstructions.getColumnNameFromParquetColumnNameOrDefault(icebergColName);
299+
final TableDefinition tableDefinition = readInstructions.getTableDefinition().orElseThrow(
300+
() -> new IllegalStateException("Table definition is required for reading from Iceberg tables"));
301+
final ColumnDefinition<?> columnDef = tableDefinition.getColumn(dhColName);
302+
if (columnDef == null) {
303+
// Table definition provided by the user doesn't have this column, so stop here
304+
break;
305+
}
306+
final SortColumn sortColumn;
307+
if (field.nullOrder() == NullOrder.NULLS_FIRST && field.direction() == SortDirection.ASC) {
308+
sortColumn = SortColumn.asc(ColumnName.of(dhColName));
309+
} else if (field.nullOrder() == NullOrder.NULLS_LAST && field.direction() == SortDirection.DESC) {
310+
sortColumn = SortColumn.desc(ColumnName.of(dhColName));
311+
} else {
312+
break;
313+
}
314+
sortColumns.add(sortColumn);
315+
}
316+
return Collections.unmodifiableList(sortColumns);
317+
}
263318
}

extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableLocationFactory.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,9 @@ public TableLocation makeLocation(
2525
@NotNull final IcebergTableLocationKey locationKey,
2626
@Nullable final TableDataRefreshService refreshService) {
2727
if (locationKey instanceof IcebergTableParquetLocationKey) {
28-
final IcebergTableParquetLocationKey tableParquetLocationKey = (IcebergTableParquetLocationKey) locationKey;
2928
return new IcebergTableParquetLocation(
30-
tableParquetLocationKey.tableAdapter(),
3129
tableKey,
32-
tableParquetLocationKey,
30+
(IcebergTableParquetLocationKey) locationKey,
3331
(ParquetInstructions) locationKey.readInstructions());
3432
}
3533
throw new UnsupportedOperationException("Unsupported location key type: " + locationKey.getClass());

extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocation.java

+3-62
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,15 @@
33
//
44
package io.deephaven.iceberg.location;
55

6-
import io.deephaven.api.ColumnName;
76
import io.deephaven.api.SortColumn;
8-
import io.deephaven.engine.table.ColumnDefinition;
9-
import io.deephaven.engine.table.TableDefinition;
7+
import io.deephaven.base.verify.Require;
108
import io.deephaven.engine.table.impl.locations.TableKey;
119
import io.deephaven.engine.table.impl.locations.TableLocation;
12-
import io.deephaven.iceberg.util.IcebergTableAdapter;
1310
import io.deephaven.parquet.table.ParquetInstructions;
1411
import io.deephaven.parquet.table.location.ParquetTableLocation;
15-
import org.apache.iceberg.DataFile;
16-
import org.apache.iceberg.NullOrder;
17-
import org.apache.iceberg.Schema;
18-
import org.apache.iceberg.SortDirection;
19-
import org.apache.iceberg.SortField;
20-
import org.apache.iceberg.SortOrder;
2112
import org.jetbrains.annotations.NotNull;
2213
import org.jetbrains.annotations.Nullable;
2314

24-
import java.util.ArrayList;
25-
import java.util.Collections;
2615
import java.util.List;
2716

2817
public class IcebergTableParquetLocation extends ParquetTableLocation implements TableLocation {
@@ -31,64 +20,16 @@ public class IcebergTableParquetLocation extends ParquetTableLocation implements
3120
private final List<SortColumn> sortedColumns;
3221

3322
public IcebergTableParquetLocation(
34-
@NotNull final IcebergTableAdapter tableAdapter,
3523
@NotNull final TableKey tableKey,
3624
@NotNull final IcebergTableParquetLocationKey tableLocationKey,
3725
@NotNull final ParquetInstructions readInstructions) {
3826
super(tableKey, tableLocationKey, readInstructions);
39-
sortedColumns = computeSortedColumns(tableAdapter, tableLocationKey.dataFile(), readInstructions);
27+
sortedColumns = Require.neqNull(tableLocationKey.sortedColumns(), "tableLocationKey.sortedColumns()");
4028
}
4129

4230
@Override
4331
@NotNull
4432
public List<SortColumn> getSortedColumns() {
45-
return sortedColumns == null ? super.getSortedColumns() : sortedColumns;
46-
}
47-
48-
@Nullable
49-
private static List<SortColumn> computeSortedColumns(
50-
@NotNull final IcebergTableAdapter tableAdapter,
51-
@NotNull final DataFile dataFile,
52-
@NotNull final ParquetInstructions readInstructions) {
53-
final Integer sortOrderId = dataFile.sortOrderId();
54-
// If sort order is missing or unknown, we cannot determine the sorted columns from the metadata and will
55-
// check the underlying parquet file for the sorted columns, when the user asks for them.
56-
if (sortOrderId == null) {
57-
return null;
58-
}
59-
final SortOrder sortOrder = tableAdapter.icebergTable().sortOrders().get(sortOrderId);
60-
if (sortOrder == null) {
61-
return null;
62-
}
63-
if (sortOrder.isUnsorted()) {
64-
return Collections.emptyList();
65-
}
66-
final Schema schema = sortOrder.schema();
67-
final List<SortColumn> sortColumns = new ArrayList<>(sortOrder.fields().size());
68-
for (final SortField field : sortOrder.fields()) {
69-
if (!field.transform().isIdentity()) {
70-
// TODO (DH-18160): Improve support for handling non-identity transforms
71-
break;
72-
}
73-
final String icebergColName = schema.findColumnName(field.sourceId());
74-
final String dhColName = readInstructions.getColumnNameFromParquetColumnNameOrDefault(icebergColName);
75-
final TableDefinition tableDefinition = readInstructions.getTableDefinition().orElseThrow(
76-
() -> new IllegalStateException("Table definition is required for reading from Iceberg tables"));
77-
final ColumnDefinition<?> columnDef = tableDefinition.getColumn(dhColName);
78-
if (columnDef == null) {
79-
// Table definition provided by the user doesn't have this column, so stop here
80-
break;
81-
}
82-
final SortColumn sortColumn;
83-
if (field.nullOrder() == NullOrder.NULLS_FIRST && field.direction() == SortDirection.ASC) {
84-
sortColumn = SortColumn.asc(ColumnName.of(dhColName));
85-
} else if (field.nullOrder() == NullOrder.NULLS_LAST && field.direction() == SortDirection.DESC) {
86-
sortColumn = SortColumn.desc(ColumnName.of(dhColName));
87-
} else {
88-
break;
89-
}
90-
sortColumns.add(sortColumn);
91-
}
92-
return Collections.unmodifiableList(sortColumns);
33+
return sortedColumns;
9334
}
9435
}

extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocationKey.java

+12-21
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
//
44
package io.deephaven.iceberg.location;
55

6+
import io.deephaven.api.SortColumn;
67
import io.deephaven.base.verify.Require;
78
import io.deephaven.engine.table.impl.locations.TableLocationKey;
8-
import io.deephaven.iceberg.util.IcebergTableAdapter;
99
import io.deephaven.parquet.table.ParquetInstructions;
1010
import io.deephaven.parquet.table.location.ParquetTableLocationKey;
1111
import org.apache.iceberg.DataFile;
@@ -17,6 +17,7 @@
1717

1818
import java.net.URI;
1919
import java.util.Comparator;
20+
import java.util.List;
2021
import java.util.Map;
2122
import java.util.Objects;
2223
import java.util.UUID;
@@ -42,12 +43,6 @@ public class IcebergTableParquetLocationKey extends ParquetTableLocationKey impl
4243
@Nullable
4344
private final TableIdentifier tableIdentifier;
4445

45-
@NotNull
46-
private final IcebergTableAdapter tableAdapter;
47-
48-
@NotNull
49-
private final DataFile dataFile;
50-
5146
/**
5247
* The {@link DataFile#dataSequenceNumber()} of the data file backing this keyed location.
5348
*/
@@ -71,6 +66,9 @@ public class IcebergTableParquetLocationKey extends ParquetTableLocationKey impl
7166
@NotNull
7267
private final ParquetInstructions readInstructions;
7368

69+
@NotNull
70+
private final List<SortColumn> sortedColumns;
71+
7472
private int cachedHashCode;
7573

7674
/**
@@ -79,7 +77,6 @@ public class IcebergTableParquetLocationKey extends ParquetTableLocationKey impl
7977
* @param catalogName The name of the catalog using which the table is accessed
8078
* @param tableUuid The UUID of the table, or {@code null} if not available
8179
* @param tableIdentifier The table identifier used to access the table
82-
* @param tableAdapter The Iceberg table adapter for the table
8380
* @param manifestFile The manifest file from which the data file was discovered
8481
* @param dataFile The data file that backs the keyed location
8582
* @param fileUri The {@link URI} for the file that backs the keyed location
@@ -94,14 +91,14 @@ public IcebergTableParquetLocationKey(
9491
@Nullable final String catalogName,
9592
@Nullable final UUID tableUuid,
9693
@NotNull final TableIdentifier tableIdentifier,
97-
@NotNull final IcebergTableAdapter tableAdapter,
9894
@NotNull final ManifestFile manifestFile,
9995
@NotNull final DataFile dataFile,
10096
@NotNull final URI fileUri,
10197
final int order,
10298
@Nullable final Map<String, Comparable<?>> partitions,
10399
@NotNull final ParquetInstructions readInstructions,
104-
@NotNull final SeekableChannelsProvider channelsProvider) {
100+
@NotNull final SeekableChannelsProvider channelsProvider,
101+
@NotNull final List<SortColumn> sortedColumns) {
105102
super(fileUri, order, partitions, channelsProvider);
106103

107104
this.catalogName = catalogName;
@@ -111,10 +108,6 @@ public IcebergTableParquetLocationKey(
111108
// tableUUID was null
112109
this.tableIdentifier = tableUuid != null ? null : tableIdentifier;
113110

114-
this.tableAdapter = tableAdapter;
115-
116-
this.dataFile = dataFile;
117-
118111
// Files with unknown sequence numbers should be ordered first
119112
dataSequenceNumber = dataFile.dataSequenceNumber() != null ? dataFile.dataSequenceNumber() : Long.MIN_VALUE;
120113
fileSequenceNumber = dataFile.fileSequenceNumber() != null ? dataFile.fileSequenceNumber() : Long.MIN_VALUE;
@@ -125,6 +118,7 @@ public IcebergTableParquetLocationKey(
125118
manifestSequenceNumber = manifestFile.sequenceNumber();
126119

127120
this.readInstructions = readInstructions;
121+
this.sortedColumns = sortedColumns;
128122
}
129123

130124
@Override
@@ -138,13 +132,8 @@ public ParquetInstructions readInstructions() {
138132
}
139133

140134
@NotNull
141-
DataFile dataFile() {
142-
return dataFile;
143-
}
144-
145-
@NotNull
146-
IcebergTableAdapter tableAdapter() {
147-
return tableAdapter;
135+
List<SortColumn> sortedColumns() {
136+
return sortedColumns;
148137
}
149138

150139
/**
@@ -220,6 +209,7 @@ public boolean equals(@Nullable final Object other) {
220209
&& fileSequenceNumber == otherTyped.fileSequenceNumber
221210
&& dataFilePos == otherTyped.dataFilePos
222211
&& manifestSequenceNumber == otherTyped.manifestSequenceNumber
212+
&& sortedColumns.equals(otherTyped.sortedColumns)
223213
&& uri.equals(otherTyped.uri);
224214
}
225215

@@ -235,6 +225,7 @@ public int hashCode() {
235225
result = prime * result + Long.hashCode(fileSequenceNumber);
236226
result = prime * result + Long.hashCode(dataFilePos);
237227
result = prime * result + Long.hashCode(manifestSequenceNumber);
228+
result = prime * result + Objects.hashCode(sortedColumns);
238229
result = prime * result + uri.hashCode();
239230
// Don't use 0; that's used by StandaloneTableLocationKey, and also our sentinel for the need to compute
240231
if (result == 0) {

0 commit comments

Comments
 (0)