Skip to content

Commit 6127a39

Browse files
feat: Ensure data read order reflects commit sequence in Iceberg tables (#6341)
1 parent ae5b8c6 commit 6127a39

File tree

16 files changed

+391
-74
lines changed

16 files changed

+391
-74
lines changed

engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocationKey.java

+12
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,16 @@ <PARTITION_VALUE_TYPE extends Comparable<PARTITION_VALUE_TYPE>> PARTITION_VALUE_
6262
* with any live TableLocation.
6363
*/
6464
default void clear() {}
65+
66+
/**
67+
* By default, compare fully qualified class names of the implementing classes. This method is a fallback where the
68+
* implementing classes are not directly comparable, and should help establish a consistent ordering between
69+
* distinct implementations.
70+
* <p>
71+
* {@inheritDoc}
72+
*/
73+
@Override
74+
default int compareTo(@NotNull final TableLocationKey other) {
75+
return this.getClass().getName().compareTo(other.getClass().getName());
76+
}
6577
}

engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/PartitionedTableLocationKey.java

+46-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import io.deephaven.base.log.LogOutput;
77
import io.deephaven.base.log.LogOutputAppendable;
8+
import io.deephaven.engine.table.impl.locations.TableLocationKey;
89
import io.deephaven.util.compare.ObjectComparisons;
910
import io.deephaven.engine.table.impl.locations.ImmutableTableLocationKey;
1011
import io.deephaven.engine.table.impl.locations.UnknownPartitionKeyException;
@@ -15,9 +16,9 @@
1516

1617
/**
1718
* Base {@link ImmutableTableLocationKey} implementation for table locations that may be enclosed by partitions.
18-
* Sub-classes should be sure to invoke the partition-map comparator at higher priority than other comparisons when
19-
* implementing {@link #compareTo(Object)}, and to include the partitions in their {@link #equals(Object)}
20-
* implementations.
19+
* Subclasses should consider invoking the partition-map comparator at higher priority than other comparisons when
20+
* implementing {@link #compareTo(Object)}. Also, should include the partitions in their {@link #equals(Object)} and
21+
* {@link #hashCode()} implementations if not calling {@code super.equals()} and {@code super.hashCode()}.
2122
*/
2223
public abstract class PartitionedTableLocationKey implements ImmutableTableLocationKey {
2324

@@ -26,6 +27,8 @@ public abstract class PartitionedTableLocationKey implements ImmutableTableLocat
2627

2728
protected final Map<String, Comparable<?>> partitions;
2829

30+
private int cachedHashCode;
31+
2932
/**
3033
* Construct a new PartitionedTableLocationKey for the supplied {@code partitions}.
3134
*
@@ -55,6 +58,46 @@ public final Set<String> getPartitionKeys() {
5558
return partitions.keySet();
5659
}
5760

61+
@Override
62+
public int compareTo(@NotNull final TableLocationKey other) {
63+
if (other instanceof PartitionedTableLocationKey) {
64+
final PartitionedTableLocationKey otherTyped = (PartitionedTableLocationKey) other;
65+
final int partitionComparisonResult =
66+
PartitionsComparator.INSTANCE.compare(partitions, otherTyped.partitions);
67+
if (partitionComparisonResult != 0) {
68+
return partitionComparisonResult;
69+
}
70+
}
71+
return ImmutableTableLocationKey.super.compareTo(other);
72+
}
73+
74+
@Override
75+
public int hashCode() {
76+
if (cachedHashCode == 0) {
77+
final int computedHashCode = partitions.hashCode();
78+
// Don't use 0; that's used by StandaloneTableLocationKey, and also our sentinel for the need to compute
79+
if (computedHashCode == 0) {
80+
final int fallbackHashCode = PartitionedTableLocationKey.class.hashCode();
81+
cachedHashCode = fallbackHashCode == 0 ? 1 : fallbackHashCode;
82+
} else {
83+
cachedHashCode = computedHashCode;
84+
}
85+
}
86+
return cachedHashCode;
87+
}
88+
89+
@Override
90+
public boolean equals(@Nullable final Object other) {
91+
if (this == other) {
92+
return true;
93+
}
94+
if (!(other instanceof PartitionedTableLocationKey)) {
95+
return false;
96+
}
97+
final PartitionedTableLocationKey otherTyped = (PartitionedTableLocationKey) other;
98+
return partitions.equals(otherTyped.partitions);
99+
}
100+
58101
/**
59102
* Formats a map of partitions as key-value pairs.
60103
*/

engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/StandaloneTableLocationKey.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public int compareTo(@NotNull final TableLocationKey other) {
4848
if (other instanceof StandaloneTableLocationKey) {
4949
return 0;
5050
}
51-
throw new ClassCastException("Cannot compare " + getClass() + " to " + other.getClass());
51+
return ImmutableTableLocationKey.super.compareTo(other);
5252
}
5353

5454
@Override

engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/FileTableLocationKey.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,11 @@ public String toString() {
6666
}
6767

6868
/**
69-
* Precedence-wise this implementation compares {@code order}, then applies a {@link PartitionsComparator} to
70-
* {@code partitions}, then compares {@code file}.
71-
*
72-
* @inheritDoc
69+
* When comparing with another {@link FileTableLocationKey}, precedence-wise this implementation compares
70+
* {@code order}, then applies a {@link PartitionsComparator} to {@code partitions}, then compares {@code file}.
71+
* Otherwise, it delegates to parent class.
72+
* <p>
73+
* {@inheritDoc}
7374
*/
7475
@Override
7576
public int compareTo(@NotNull final TableLocationKey other) {
@@ -86,7 +87,7 @@ public int compareTo(@NotNull final TableLocationKey other) {
8687
}
8788
return file.compareTo(otherTyped.file);
8889
}
89-
throw new ClassCastException("Cannot compare " + getClass() + " to " + other.getClass());
90+
return super.compareTo(other);
9091
}
9192

9293
@Override

engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/URITableLocationKey.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public class URITableLocationKey extends PartitionedTableLocationKey {
2828
private static final String IMPLEMENTATION_NAME = URITableLocationKey.class.getSimpleName();
2929

3030
protected final URI uri;
31-
private final int order;
31+
protected final int order;
3232

3333
private int cachedHashCode;
3434

@@ -72,10 +72,11 @@ public String toString() {
7272
}
7373

7474
/**
75-
* Precedence-wise this implementation compares {@code order}, then applies a {@link PartitionsComparator} to
76-
* {@code partitions}, then compares {@code uri}.
77-
*
78-
* @inheritDoc
75+
* When comparing with another {@link URITableLocationKey}, precedence-wise this implementation compares
76+
* {@code order}, then applies a {@link PartitionsComparator} to {@code partitions}, then compares {@code uri}.
77+
* Otherwise, it delegates to parent class.
78+
* <p>
79+
* {@inheritDoc}
7980
*/
8081
@Override
8182
public int compareTo(@NotNull final TableLocationKey other) {
@@ -92,7 +93,7 @@ public int compareTo(@NotNull final TableLocationKey other) {
9293
}
9394
return uri.compareTo(otherTyped.uri);
9495
}
95-
throw new ClassCastException("Cannot compare " + getClass() + " to " + other.getClass());
96+
return super.compareTo(other);
9697
}
9798

9899
@Override

engine/table/src/test/java/io/deephaven/engine/table/impl/locations/impl/SimpleTableLocationKey.java

-21
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
package io.deephaven.engine.table.impl.locations.impl;
55

66
import io.deephaven.base.log.LogOutput;
7-
import io.deephaven.engine.table.impl.locations.TableLocationKey;
8-
import org.jetbrains.annotations.NotNull;
97
import org.jetbrains.annotations.Nullable;
108

119
import java.util.Map;
@@ -31,23 +29,4 @@ public LogOutput append(LogOutput logOutput) {
3129
return logOutput.append(getImplementationName()).append("[partitions=")
3230
.append(PartitionsFormatter.INSTANCE, partitions).append(']');
3331
}
34-
35-
@Override
36-
public int compareTo(@NotNull final TableLocationKey other) {
37-
if (other instanceof SimpleTableLocationKey) {
38-
return PartitionsComparator.INSTANCE.compare(partitions, ((SimpleTableLocationKey) other).partitions);
39-
}
40-
throw new ClassCastException("Cannot compare " + getClass() + " to " + other.getClass());
41-
}
42-
43-
@Override
44-
public int hashCode() {
45-
return partitions.hashCode();
46-
}
47-
48-
@Override
49-
public boolean equals(final Object other) {
50-
return other == this || (other instanceof SimpleTableLocationKey
51-
&& partitions.equals(((SimpleTableLocationKey) other).partitions));
52-
}
5332
}

engine/test-utils/src/main/java/io/deephaven/engine/testutil/locations/TableBackedTableLocationKey.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,16 @@ public String toString() {
4646

4747
@Override
4848
public int compareTo(@NotNull final TableLocationKey other) {
49-
// noinspection DataFlowIssue
50-
return Integer.compare(
51-
(int) table.getAttribute("ID"),
52-
(int) ((TableBackedTableLocationKey) other).table.getAttribute("ID"));
49+
if (other instanceof TableBackedTableLocationKey) {
50+
final TableBackedTableLocationKey otherTyped = (TableBackedTableLocationKey) other;
51+
// noinspection DataFlowIssue
52+
final int idComparisonResult =
53+
Integer.compare((int) table.getAttribute("ID"), (int) otherTyped.table.getAttribute("ID"));
54+
if (idComparisonResult != 0) {
55+
return idComparisonResult;
56+
}
57+
}
58+
return ImmutableTableLocationKey.super.compareTo(other);
5359
}
5460

5561
@Override

extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/PythonTableDataService.java

-9
Original file line numberDiff line numberDiff line change
@@ -704,15 +704,6 @@ public int hashCode() {
704704
return cachedHashCode;
705705
}
706706

707-
@Override
708-
public int compareTo(@NotNull final TableLocationKey other) {
709-
if (getClass() != other.getClass()) {
710-
throw new ClassCastException(String.format("Cannot compare %s to %s", getClass(), other.getClass()));
711-
}
712-
final TableLocationKeyImpl otherTableLocationKey = (TableLocationKeyImpl) other;
713-
return PartitionsComparator.INSTANCE.compare(partitions, otherTableLocationKey.partitions);
714-
}
715-
716707
@Override
717708
public LogOutput append(@NotNull final LogOutput logOutput) {
718709
return logOutput.append(getImplementationName())

extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java

+56-15
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@
1717
import io.deephaven.util.channel.SeekableChannelsProvider;
1818
import io.deephaven.util.channel.SeekableChannelsProviderLoader;
1919
import org.apache.iceberg.*;
20+
import org.apache.iceberg.catalog.Catalog;
21+
import org.apache.iceberg.catalog.TableIdentifier;
2022
import org.apache.iceberg.io.FileIO;
2123
import org.jetbrains.annotations.NotNull;
2224
import org.jetbrains.annotations.Nullable;
2325

2426
import java.net.URI;
25-
import java.util.HashMap;
2627
import java.util.List;
2728
import java.util.Map;
29+
import java.util.UUID;
2830
import java.util.function.Consumer;
2931

3032
public abstract class IcebergBaseLayout implements TableLocationKeyFinder<IcebergTableLocationKey> {
@@ -33,6 +35,22 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder<Iceber
3335
*/
3436
final IcebergTableAdapter tableAdapter;
3537

38+
/**
39+
* The UUID of the table, if available.
40+
*/
41+
@Nullable
42+
private final UUID tableUuid;
43+
44+
/**
45+
* Name of the {@link Catalog} used to access this table, if available.
46+
*/
47+
@Nullable
48+
private final String catalogName;
49+
50+
/**
51+
* The table identifier used to access this table.
52+
*/
53+
private final TableIdentifier tableIdentifier;
3654
/**
3755
* The {@link TableDefinition} that will be used for life of this table. Although Iceberg table schema may change,
3856
* schema changes are not supported in Deephaven.
@@ -45,9 +63,9 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder<Iceber
4563
private final String uriScheme;
4664

4765
/**
48-
* A cache of {@link IcebergTableLocationKey IcebergTableLocationKeys} keyed by the URI of the file they represent.
66+
* The {@link Snapshot} from which to discover data files.
4967
*/
50-
private final Map<URI, IcebergTableLocationKey> cache;
68+
Snapshot snapshot;
5169

5270
/**
5371
* The {@link ParquetInstructions} object that will be used to read any Parquet data files in this table.
@@ -60,17 +78,28 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder<Iceber
6078
*/
6179
private final SeekableChannelsProvider channelsProvider;
6280

81+
6382
/**
64-
* The {@link Snapshot} from which to discover data files.
83+
* Create a new {@link IcebergTableLocationKey} for the given {@link ManifestFile}, {@link DataFile} and
84+
* {@link URI}.
85+
*
86+
* @param manifestFile The manifest file from which the data file was discovered
87+
* @param dataFile The data file that backs the keyed location
88+
* @param fileUri The {@link URI} for the file that backs the keyed location
89+
* @param partitions The table partitions enclosing the table location keyed by the returned key. If {@code null},
90+
* the location will be a member of no partitions.
91+
*
92+
* @return A new {@link IcebergTableLocationKey}
6593
*/
66-
Snapshot snapshot;
67-
6894
protected IcebergTableLocationKey locationKey(
69-
final org.apache.iceberg.FileFormat format,
70-
final URI fileUri,
95+
@NotNull final ManifestFile manifestFile,
96+
@NotNull final DataFile dataFile,
97+
@NotNull final URI fileUri,
7198
@Nullable final Map<String, Comparable<?>> partitions) {
99+
final org.apache.iceberg.FileFormat format = dataFile.format();
72100
if (format == org.apache.iceberg.FileFormat.PARQUET) {
73-
return new IcebergTableParquetLocationKey(fileUri, 0, partitions, parquetInstructions, channelsProvider);
101+
return new IcebergTableParquetLocationKey(catalogName, tableUuid, tableIdentifier, manifestFile, dataFile,
102+
fileUri, 0, partitions, parquetInstructions, channelsProvider);
74103
}
75104
throw new UnsupportedOperationException(String.format("%s:%d - an unsupported file format %s for URI '%s'",
76105
tableAdapter, snapshot.snapshotId(), format, fileUri));
@@ -85,6 +114,20 @@ public IcebergBaseLayout(
85114
@NotNull final IcebergReadInstructions instructions,
86115
@NotNull final DataInstructionsProviderLoader dataInstructionsProvider) {
87116
this.tableAdapter = tableAdapter;
117+
{
118+
UUID uuid;
119+
try {
120+
uuid = tableAdapter.icebergTable().uuid();
121+
} catch (final RuntimeException e) {
122+
// The UUID method is unsupported for v1 Iceberg tables since uuid is optional for v1 tables.
123+
uuid = null;
124+
}
125+
this.tableUuid = uuid;
126+
}
127+
128+
this.catalogName = tableAdapter.catalog().name();
129+
this.tableIdentifier = tableAdapter.tableIdentifier();
130+
88131
this.snapshot = tableAdapter.getSnapshot(instructions);
89132
this.tableDef = tableAdapter.definition(instructions);
90133
this.uriScheme = locationUri(tableAdapter.icebergTable()).getScheme();
@@ -111,10 +154,9 @@ public IcebergBaseLayout(
111154
this.parquetInstructions = builder.build();
112155
}
113156
this.channelsProvider = SeekableChannelsProviderLoader.getInstance().load(uriScheme, specialInstructions);
114-
this.cache = new HashMap<>();
115157
}
116158

117-
abstract IcebergTableLocationKey keyFromDataFile(DataFile df, URI fileUri);
159+
abstract IcebergTableLocationKey keyFromDataFile(ManifestFile manifestFile, DataFile dataFile, URI fileUri);
118160

119161
private static String path(String path, FileIO io) {
120162
return io instanceof RelativeFileIO ? ((RelativeFileIO) io).absoluteLocation(path) : path;
@@ -145,15 +187,14 @@ public synchronized void findKeys(@NotNull final Consumer<IcebergTableLocationKe
145187
table, snapshot.snapshotId(), manifestFile.content()));
146188
}
147189
try (final ManifestReader<DataFile> reader = ManifestFiles.read(manifestFile, table.io())) {
148-
for (DataFile df : reader) {
149-
final URI fileUri = dataFileUri(table, df);
190+
for (final DataFile dataFile : reader) {
191+
final URI fileUri = dataFileUri(table, dataFile);
150192
if (!uriScheme.equals(fileUri.getScheme())) {
151193
throw new TableDataException(String.format(
152194
"%s:%d - multiple URI schemes are not currently supported. uriScheme=%s, fileUri=%s",
153195
table, snapshot.snapshotId(), uriScheme, fileUri));
154196
}
155-
final IcebergTableLocationKey locationKey =
156-
cache.computeIfAbsent(fileUri, uri -> keyFromDataFile(df, fileUri));
197+
final IcebergTableLocationKey locationKey = keyFromDataFile(manifestFile, dataFile, fileUri);
157198
if (locationKey != null) {
158199
locationKeyObserver.accept(locationKey);
159200
}

extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ public String toString() {
3535
}
3636

3737
@Override
38-
IcebergTableLocationKey keyFromDataFile(DataFile df, URI fileUri) {
39-
return locationKey(df.format(), fileUri, null);
38+
IcebergTableLocationKey keyFromDataFile(
39+
@NotNull final ManifestFile manifestFile,
40+
@NotNull final DataFile dataFile,
41+
@NotNull final URI fileUri) {
42+
return locationKey(manifestFile, dataFile, fileUri, null);
4043
}
4144
}

0 commit comments

Comments
 (0)