Skip to content

Commit 703f527

Browse files
feat!: Updated API for Iceberg read operations (deephaven#6268)
1 parent fa27a8e commit 703f527

File tree

11 files changed

+313
-336
lines changed

11 files changed

+313
-336
lines changed

extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java

+42-40
Large diffs are not rendered by default.

extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java

+6-9
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import io.deephaven.iceberg.location.IcebergTableLocationKey;
1111
import io.deephaven.iceberg.location.IcebergTableParquetLocationKey;
1212
import io.deephaven.iceberg.relative.RelativeFileIO;
13-
import io.deephaven.iceberg.util.IcebergInstructions;
13+
import io.deephaven.iceberg.util.IcebergReadInstructions;
1414
import io.deephaven.iceberg.util.IcebergTableAdapter;
1515
import io.deephaven.parquet.table.ParquetInstructions;
1616
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
@@ -40,7 +40,7 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder<Iceber
4040
/**
4141
* The instructions for customizations while reading.
4242
*/
43-
final IcebergInstructions instructions;
43+
final IcebergReadInstructions instructions;
4444

4545
/**
4646
* A cache of {@link IcebergTableLocationKey IcebergTableLocationKeys} keyed by the URI of the file they represent.
@@ -83,7 +83,7 @@ protected IcebergTableLocationKey locationKey(
8383
}
8484
}
8585

86-
// Add the data instructions if provided as part of the IcebergInstructions.
86+
// Add the data instructions if provided as part of the IcebergReadInstructions.
8787
if (instructions.dataInstructions().isPresent()) {
8888
builder.setSpecialInstructions(instructions.dataInstructions().get());
8989
} else {
@@ -104,20 +104,17 @@ protected IcebergTableLocationKey locationKey(
104104

105105
/**
106106
* @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table.
107-
* @param tableSnapshot The {@link Snapshot} from which to discover data files.
108107
* @param instructions The instructions for customizations while reading.
109108
*/
110109
public IcebergBaseLayout(
111110
@NotNull final IcebergTableAdapter tableAdapter,
112-
@Nullable final Snapshot tableSnapshot,
113-
@NotNull final IcebergInstructions instructions,
111+
@NotNull final IcebergReadInstructions instructions,
114112
@NotNull final DataInstructionsProviderLoader dataInstructionsProvider) {
115113
this.tableAdapter = tableAdapter;
116-
this.snapshot = tableSnapshot;
114+
this.snapshot = tableAdapter.getSnapshot(instructions);
117115
this.instructions = instructions;
118116
this.dataInstructionsProvider = dataInstructionsProvider;
119-
120-
this.tableDef = tableAdapter.definition(tableSnapshot, instructions);
117+
this.tableDef = tableAdapter.definition(instructions);
121118

122119
this.cache = new HashMap<>();
123120
}

extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,11 @@
55

66
import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder;
77
import io.deephaven.iceberg.location.IcebergTableLocationKey;
8-
import io.deephaven.iceberg.util.IcebergInstructions;
8+
import io.deephaven.iceberg.util.IcebergReadInstructions;
99
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
1010
import io.deephaven.iceberg.util.IcebergTableAdapter;
1111
import org.apache.iceberg.*;
1212
import org.jetbrains.annotations.NotNull;
13-
import org.jetbrains.annotations.Nullable;
1413

1514
import java.net.URI;
1615

@@ -21,15 +20,13 @@
2120
public final class IcebergFlatLayout extends IcebergBaseLayout {
2221
/**
2322
* @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table.
24-
* @param tableSnapshot The {@link Snapshot} from which to discover data files.
2523
* @param instructions The instructions for customizations while reading.
2624
*/
2725
public IcebergFlatLayout(
2826
@NotNull final IcebergTableAdapter tableAdapter,
29-
@Nullable final Snapshot tableSnapshot,
30-
@NotNull final IcebergInstructions instructions,
27+
@NotNull final IcebergReadInstructions instructions,
3128
@NotNull final DataInstructionsProviderLoader dataInstructionsProvider) {
32-
super(tableAdapter, tableSnapshot, instructions, dataInstructionsProvider);
29+
super(tableAdapter, instructions, dataInstructionsProvider);
3330
}
3431

3532
@Override

extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,13 @@
77
import io.deephaven.engine.table.impl.locations.TableDataException;
88
import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder;
99
import io.deephaven.iceberg.location.IcebergTableLocationKey;
10-
import io.deephaven.iceberg.util.IcebergInstructions;
10+
import io.deephaven.iceberg.util.IcebergReadInstructions;
1111
import io.deephaven.iceberg.util.IcebergTableAdapter;
1212
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
1313
import io.deephaven.util.type.TypeUtils;
1414
import org.apache.commons.lang3.mutable.MutableInt;
1515
import org.apache.iceberg.*;
1616
import org.jetbrains.annotations.NotNull;
17-
import org.jetbrains.annotations.Nullable;
1817

1918
import java.net.URI;
2019
import java.util.*;
@@ -41,17 +40,15 @@ public ColumnData(String name, Class<?> type, int index) {
4140

4241
/**
4342
* @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table.
44-
* @param tableSnapshot The {@link Snapshot} from which to discover data files.
4543
* @param partitionSpec The Iceberg {@link PartitionSpec partition spec} for the table.
4644
* @param instructions The instructions for customizations while reading.
4745
*/
4846
public IcebergKeyValuePartitionedLayout(
4947
@NotNull final IcebergTableAdapter tableAdapter,
50-
@Nullable final Snapshot tableSnapshot,
5148
@NotNull final PartitionSpec partitionSpec,
52-
@NotNull final IcebergInstructions instructions,
49+
@NotNull final IcebergReadInstructions instructions,
5350
@NotNull final DataInstructionsProviderLoader dataInstructionsProvider) {
54-
super(tableAdapter, tableSnapshot, instructions, dataInstructionsProvider);
51+
super(tableAdapter, instructions, dataInstructionsProvider);
5552

5653
// We can assume due to upstream validation that there are no duplicate names (after renaming) that are included
5754
// in the output definition, so we can ignore duplicates.

extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java

-81
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
//
2+
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
3+
//
4+
package io.deephaven.iceberg.util;
5+
6+
import io.deephaven.annotations.CopyableStyle;
7+
import io.deephaven.engine.table.TableDefinition;
8+
import org.apache.iceberg.Snapshot;
9+
import org.immutables.value.Value;
10+
import org.immutables.value.Value.Immutable;
11+
12+
import java.util.Map;
13+
import java.util.Optional;
14+
import java.util.OptionalLong;
15+
16+
/**
17+
* This class provides instructions intended for reading Iceberg catalogs and tables. The default values documented in
18+
* this class may change in the future. As such, callers may wish to explicitly set the values.
19+
*/
20+
@Immutable
21+
@CopyableStyle
22+
public abstract class IcebergReadInstructions {
23+
/**
24+
* The default {@link IcebergReadInstructions} to use when reading Iceberg data files. Providing this will use
25+
* system defaults for cloud provider-specific parameters
26+
*/
27+
@SuppressWarnings("unused")
28+
public static final IcebergReadInstructions DEFAULT = builder().build();
29+
30+
public static Builder builder() {
31+
return ImmutableIcebergReadInstructions.builder();
32+
}
33+
34+
/**
35+
* The {@link TableDefinition} to use when reading Iceberg data files.
36+
*/
37+
public abstract Optional<TableDefinition> tableDefinition();
38+
39+
/**
40+
* The data instructions to use for reading the Iceberg data files (might be S3Instructions or other cloud
41+
* provider-specific instructions).
42+
*/
43+
public abstract Optional<Object> dataInstructions();
44+
45+
/**
46+
* A {@link Map map} of rename instructions from Iceberg to Deephaven column names to use when reading the Iceberg
47+
* data files.
48+
*/
49+
public abstract Map<String, String> columnRenames();
50+
51+
/**
52+
* Return a copy of this instructions object with the column renames replaced by {@code entries}.
53+
*/
54+
public abstract IcebergReadInstructions withColumnRenames(Map<String, ? extends String> entries);
55+
56+
/**
57+
* The {@link IcebergUpdateMode} mode to use when reading the Iceberg data files. Default is
58+
* {@link IcebergUpdateMode#staticMode()}.
59+
*/
60+
@Value.Default
61+
public IcebergUpdateMode updateMode() {
62+
return IcebergUpdateMode.staticMode();
63+
}
64+
65+
/**
66+
* The identifier of the snapshot to load for reading. If both this and {@link #snapshot()} are provided, the
67+
* {@link Snapshot#snapshotId()} should match this. Otherwise, only one of them should be provided. If neither is
68+
* provided, the latest snapshot will be loaded.
69+
*/
70+
public abstract OptionalLong snapshotId();
71+
72+
/**
73+
* Return a copy of this instructions object with the snapshot ID replaced by {@code value}.
74+
*/
75+
public abstract IcebergReadInstructions withSnapshotId(long value);
76+
77+
/**
78+
* The snapshot to load for reading. If both this and {@link #snapshotId()} are provided, the
79+
* {@link Snapshot#snapshotId()} should match the {@link #snapshotId()}. Otherwise, only one of them should be
80+
* provided. If neither is provided, the latest snapshot will be loaded.
81+
*/
82+
public abstract Optional<Snapshot> snapshot();
83+
84+
/**
85+
* Return a copy of this instructions object with the snapshot replaced by {@code value}.
86+
*/
87+
public abstract IcebergReadInstructions withSnapshot(Snapshot value);
88+
89+
public interface Builder {
90+
Builder tableDefinition(TableDefinition tableDefinition);
91+
92+
Builder dataInstructions(Object s3Instructions);
93+
94+
Builder putColumnRenames(String key, String value);
95+
96+
Builder putAllColumnRenames(Map<String, ? extends String> entries);
97+
98+
Builder updateMode(IcebergUpdateMode updateMode);
99+
100+
Builder snapshotId(long snapshotId);
101+
102+
Builder snapshot(Snapshot snapshot);
103+
104+
IcebergReadInstructions build();
105+
}
106+
107+
@Value.Check
108+
final void checkSnapshotId() {
109+
if (snapshotId().isPresent() && snapshot().isPresent() &&
110+
snapshotId().getAsLong() != snapshot().get().snapshotId()) {
111+
throw new IllegalArgumentException("If both snapshotID and snapshot are provided, the snapshot Ids " +
112+
"must match, found " + snapshotId().getAsLong() + " and " + snapshot().get().snapshotId());
113+
}
114+
}
115+
}

extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTable.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
public interface IcebergTable extends Table {
1111
/**
12-
* When the {@link IcebergInstructions#updateMode() update mode} for this table is
12+
* When the {@link IcebergReadInstructions#updateMode() update mode} for this table is
1313
* {@link IcebergUpdateMode#manualRefreshingMode()}, this call will update the table with the latest snapshot from
1414
* the catalog.
1515
* <p>
@@ -18,7 +18,7 @@ public interface IcebergTable extends Table {
1818
void update();
1919

2020
/**
21-
* When the {@link IcebergInstructions#updateMode() update mode} for this table is
21+
* When the {@link IcebergReadInstructions#updateMode() update mode} for this table is
2222
* {@link IcebergUpdateMode#manualRefreshingMode()}, this call will update the table with a specific snapshot from
2323
* the catalog. If the {@code snapshotId} is not found in the list of snapshots for the table, an
2424
* {@link IllegalArgumentException} is thrown. The input snapshot must also be newer (higher in sequence number)
@@ -31,7 +31,7 @@ public interface IcebergTable extends Table {
3131
void update(final long snapshotId);
3232

3333
/**
34-
* When the {@link IcebergInstructions#updateMode() update mode} for this table is
34+
* When the {@link IcebergReadInstructions#updateMode() update mode} for this table is
3535
* {@link IcebergUpdateMode#manualRefreshingMode()}, this call will update the table with a specific snapshot from
3636
* the catalog. The input snapshot must be newer (higher in sequence number) than the current snapshot or an
3737
* {@link IllegalArgumentException} is thrown.

0 commit comments

Comments
 (0)