Skip to content

Commit 2da9dd5

Browse files
feat: Don't fail when reading non-identity partitioning field (deephaven#6477)
1 parent f32a930 commit 2da9dd5

14 files changed

+211
-34
lines changed

extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java

+24-34
Original file line numberDiff line numberDiff line change
@@ -11,33 +11,31 @@
1111
import io.deephaven.iceberg.util.IcebergTableAdapter;
1212
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
1313
import io.deephaven.util.type.TypeUtils;
14-
import org.apache.commons.lang3.mutable.MutableInt;
1514
import org.apache.iceberg.*;
1615
import org.apache.iceberg.data.IdentityPartitionConverters;
1716
import org.jetbrains.annotations.NotNull;
1817

1918
import java.net.URI;
2019
import java.util.*;
21-
import java.util.stream.Collectors;
2220

2321
/**
2422
* Iceberg {@link TableLocationKeyFinder location finder} for tables with partitions that will discover data files from
2523
* a {@link Snapshot}
2624
*/
2725
public final class IcebergKeyValuePartitionedLayout extends IcebergBaseLayout {
28-
private static class ColumnData {
26+
private static class IdentityPartitioningColData {
2927
final String name;
3028
final Class<?> type;
31-
final int index;
29+
final int index; // position in the partition spec
3230

33-
public ColumnData(String name, Class<?> type, int index) {
31+
private IdentityPartitioningColData(String name, Class<?> type, int index) {
3432
this.name = name;
3533
this.type = type;
3634
this.index = index;
3735
}
3836
}
3937

40-
private final List<ColumnData> outputPartitioningColumns;
38+
private final List<IdentityPartitioningColData> identityPartitioningColumns;
4139

4240
/**
4341
* @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table.
@@ -53,33 +51,26 @@ public IcebergKeyValuePartitionedLayout(
5351

5452
// We can assume due to upstream validation that there are no duplicate names (after renaming) that are included
5553
// in the output definition, so we can ignore duplicates.
56-
final MutableInt icebergIndex = new MutableInt(0);
57-
final Map<String, Integer> availablePartitioningColumns = partitionSpec.fields().stream()
58-
.peek(partitionField -> {
59-
// TODO (deephaven-core#6438): Add support to handle non-identity transforms
60-
if (!partitionField.transform().isIdentity()) {
61-
throw new TableDataException("Partition field " + partitionField.name() + " has a " +
62-
"non-identity transform: " + partitionField.transform() + ", which is not supported");
63-
}
64-
})
65-
.map(PartitionField::name)
66-
.map(name -> instructions.columnRenames().getOrDefault(name, name))
67-
.collect(Collectors.toMap(
68-
name -> name,
69-
name -> icebergIndex.getAndIncrement(),
70-
(v1, v2) -> v1,
71-
LinkedHashMap::new));
54+
final List<PartitionField> partitionFields = partitionSpec.fields();
55+
final int numPartitionFields = partitionFields.size();
56+
identityPartitioningColumns = new ArrayList<>(numPartitionFields);
57+
for (int fieldId = 0; fieldId < numPartitionFields; ++fieldId) {
58+
final PartitionField partitionField = partitionFields.get(fieldId);
59+
if (!partitionField.transform().isIdentity()) {
60+
// TODO (DH-18160): Improve support for handling non-identity transforms
61+
continue;
62+
}
63+
final String icebergColName = partitionField.name();
64+
final String dhColName = instructions.columnRenames().getOrDefault(icebergColName, icebergColName);
65+
final ColumnDefinition<?> columnDef = tableDef.getColumn(dhColName);
66+
if (columnDef == null) {
67+
// Table definition provided by the user doesn't have this column, so skip.
68+
continue;
69+
}
70+
identityPartitioningColumns.add(new IdentityPartitioningColData(dhColName,
71+
TypeUtils.getBoxedType(columnDef.getDataType()), fieldId));
7272

73-
outputPartitioningColumns = tableDef.getColumnStream()
74-
.map((final ColumnDefinition<?> columnDef) -> {
75-
final Integer index = availablePartitioningColumns.get(columnDef.getName());
76-
if (index == null) {
77-
return null;
78-
}
79-
return new ColumnData(columnDef.getName(), TypeUtils.getBoxedType(columnDef.getDataType()), index);
80-
})
81-
.filter(Objects::nonNull)
82-
.collect(Collectors.toList());
73+
}
8374
}
8475

8576
@Override
@@ -95,12 +86,11 @@ IcebergTableLocationKey keyFromDataFile(
9586
final Map<String, Comparable<?>> partitions = new LinkedHashMap<>();
9687

9788
final PartitionData partitionData = (PartitionData) dataFile.partition();
98-
for (final ColumnData colData : outputPartitioningColumns) {
89+
for (final IdentityPartitioningColData colData : identityPartitioningColumns) {
9990
final String colName = colData.name;
10091
final Object colValue;
10192
final Object valueFromPartitionData = partitionData.get(colData.index);
10293
if (valueFromPartitionData != null) {
103-
// TODO (deephaven-core#6438): Assuming identity transform here
10494
colValue = IdentityPartitionConverters.convertConstant(
10595
partitionData.getType(colData.index), valueFromPartitionData);
10696
if (!colData.type.isAssignableFrom(colValue.getClass())) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
//
2+
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
3+
//
4+
package io.deephaven.iceberg;
5+
6+
import io.deephaven.engine.table.ColumnDefinition;
7+
import io.deephaven.engine.table.Table;
8+
import io.deephaven.engine.table.TableDefinition;
9+
import io.deephaven.engine.testutil.TstUtils;
10+
import io.deephaven.engine.util.TableTools;
11+
import io.deephaven.iceberg.sqlite.DbResource;
12+
import io.deephaven.iceberg.util.IcebergCatalogAdapter;
13+
import io.deephaven.iceberg.util.IcebergTableAdapter;
14+
import org.apache.iceberg.PartitionField;
15+
import org.apache.iceberg.PartitionSpec;
16+
import org.apache.iceberg.Snapshot;
17+
import org.apache.iceberg.catalog.Namespace;
18+
import org.apache.iceberg.catalog.TableIdentifier;
19+
import org.junit.jupiter.api.BeforeEach;
20+
import org.junit.jupiter.api.Tag;
21+
import org.junit.jupiter.api.Test;
22+
23+
import java.time.LocalDateTime;
24+
import java.net.URISyntaxException;
25+
import java.util.List;
26+
import static io.deephaven.util.QueryConstants.NULL_DOUBLE;
27+
28+
import static org.assertj.core.api.Assertions.assertThat;
29+
30+
/**
31+
* This test shows that we can integrate with data written by <a href="https://py.iceberg.apache.org/">pyiceberg</a>.
32+
* See TESTING.md and generate-pyiceberg-2.py for more details.
33+
*/
34+
@Tag("security-manager-allow")
35+
class Pyiceberg2Test {
36+
private static final Namespace NAMESPACE = Namespace.of("trading");
37+
private static final TableIdentifier TRADING_DATA = TableIdentifier.of(NAMESPACE, "data");
38+
39+
// This will need to be updated if the data is regenerated
40+
private static final long SNAPSHOT_1_ID = 2806418501596315192L;
41+
42+
private static final TableDefinition TABLE_DEFINITION = TableDefinition.of(
43+
ColumnDefinition.fromGenericType("datetime", LocalDateTime.class),
44+
ColumnDefinition.ofString("symbol").withPartitioning(),
45+
ColumnDefinition.ofDouble("bid"),
46+
ColumnDefinition.ofDouble("ask"));
47+
48+
private IcebergCatalogAdapter catalogAdapter;
49+
50+
@BeforeEach
51+
void setUp() throws URISyntaxException {
52+
catalogAdapter = DbResource.openCatalog("pyiceberg-2");
53+
}
54+
55+
@Test
56+
void catalogInfo() {
57+
assertThat(catalogAdapter.listNamespaces()).containsExactly(NAMESPACE);
58+
assertThat(catalogAdapter.listTables(NAMESPACE)).containsExactly(TRADING_DATA);
59+
60+
final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(TRADING_DATA);
61+
final List<Snapshot> snapshots = tableAdapter.listSnapshots();
62+
assertThat(snapshots).hasSize(1);
63+
{
64+
final Snapshot snapshot = snapshots.get(0);
65+
assertThat(snapshot.parentId()).isNull();
66+
assertThat(snapshot.schemaId()).isEqualTo(0);
67+
assertThat(snapshot.sequenceNumber()).isEqualTo(1L);
68+
assertThat(snapshot.snapshotId()).isEqualTo(SNAPSHOT_1_ID);
69+
}
70+
}
71+
72+
@Test
73+
void testDefinition() {
74+
final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(TRADING_DATA);
75+
final TableDefinition td = tableAdapter.definition();
76+
assertThat(td).isEqualTo(TABLE_DEFINITION);
77+
78+
// Check the partition spec
79+
final PartitionSpec partitionSpec = tableAdapter.icebergTable().spec();
80+
assertThat(partitionSpec.fields().size()).isEqualTo(2);
81+
final PartitionField firstPartitionField = partitionSpec.fields().get(0);
82+
assertThat(firstPartitionField.name()).isEqualTo("datetime_day");
83+
assertThat(firstPartitionField.transform().toString()).isEqualTo("day");
84+
85+
final PartitionField secondPartitionField = partitionSpec.fields().get(1);
86+
assertThat(secondPartitionField.name()).isEqualTo("symbol");
87+
assertThat(secondPartitionField.transform().toString()).isEqualTo("identity");
88+
}
89+
90+
@Test
91+
void testData() {
92+
final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(TRADING_DATA);
93+
final Table fromIceberg = tableAdapter.table();
94+
assertThat(fromIceberg.size()).isEqualTo(5);
95+
final Table expectedData = TableTools.newTable(TABLE_DEFINITION,
96+
TableTools.col("datetime",
97+
LocalDateTime.of(2024, 11, 27, 10, 0, 0),
98+
LocalDateTime.of(2024, 11, 27, 10, 0, 0),
99+
LocalDateTime.of(2024, 11, 26, 10, 1, 0),
100+
LocalDateTime.of(2024, 11, 26, 10, 2, 0),
101+
LocalDateTime.of(2024, 11, 28, 10, 3, 0)),
102+
TableTools.stringCol("symbol", "AAPL", "MSFT", "GOOG", "AMZN", "MSFT"),
103+
TableTools.doubleCol("bid", 150.25, 150.25, 2800.75, 3400.5, NULL_DOUBLE),
104+
TableTools.doubleCol("ask", 151.0, 151.0, 2810.5, 3420.0, 250.0));
105+
TstUtils.assertTableEquals(expectedData.sort("datetime", "symbol"),
106+
fromIceberg.sort("datetime", "symbol"));
107+
}
108+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
version https://git-lfs.github.com/spec/v1
2+
oid sha256:aef59cba214467bee2cb2d91118aadc6114718be02ab4f04d7742471f9436955
3+
size 1990
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
version https://git-lfs.github.com/spec/v1
2+
oid sha256:9481c165c6356f03b3d4fe5df967a03422557d8d2bfc1c58d8dc052fe18fec06
3+
size 1990
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
version https://git-lfs.github.com/spec/v1
2+
oid sha256:e1568c608054bbd0de727d4909b29a3bdb777e2b6723a74d1641c6326d3b35cd
3+
size 1990
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
version https://git-lfs.github.com/spec/v1
2+
oid sha256:79f26f9cb4e2b00548491122eb8a0efb04a8ff37d0f5bce65350c38b7b17af19
3+
size 1990
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
version https://git-lfs.github.com/spec/v1
2+
oid sha256:a6a8ee2286b0cdf702fef4745471375adf9b819883726d659129e0ba95e0c8dd
3+
size 1856
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"location":"catalogs/pyiceberg-2/trading.db/data","table-uuid":"d1f874d4-c065-432d-969b-39efb3e9eb1c","last-updated-ms":1733868694938,"last-column-id":4,"schemas":[{"type":"struct","fields":[{"id":1,"name":"datetime","type":"timestamp","required":false},{"id":2,"name":"symbol","type":"string","required":false},{"id":3,"name":"bid","type":"double","required":false},{"id":4,"name":"ask","type":"double","required":false}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"day","name":"datetime_day"},{"source-id":2,"field-id":1001,"transform":"identity","name":"symbol"}]}],"default-spec-id":0,"last-partition-id":1001,"properties":{},"snapshots":[],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":2,"last-sequence-number":0}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"location":"catalogs/pyiceberg-2/trading.db/data","table-uuid":"d1f874d4-c065-432d-969b-39efb3e9eb1c","last-updated-ms":1733868695120,"last-column-id":4,"schemas":[{"type":"struct","fields":[{"id":1,"name":"datetime","type":"timestamp","required":false},{"id":2,"name":"symbol","type":"string","required":false},{"id":3,"name":"bid","type":"double","required":false},{"id":4,"name":"ask","type":"double","required":false}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"day","name":"datetime_day"},{"source-id":2,"field-id":1001,"transform":"identity","name":"symbol"}]}],"default-spec-id":0,"last-partition-id":1001,"properties":{},"current-snapshot-id":2806418501596315192,"snapshots":[{"snapshot-id":2806418501596315192,"sequence-number":1,"timestamp-ms":1733868695120,"manifest-list":"catalogs/pyiceberg-2/trading.db/data/metadata/snap-2806418501596315192-0-d9c06748-9892-404f-a744-7bbfd06d0eeb.avro","summary":{"operation":"append","added-files-size":"9816","added-data-files":"5","added-records":"5","changed-partition-count":"5","total-data-files":"5","total-delete-files":"0","total-records":"5","total-files-size":"9816","total-position-deletes":"0","total-equality-deletes":"0"},"schema-id":0}],"snapshot-log":[{"snapshot-id":2806418501596315192,"timestamp-ms":1733868695120}],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{"main":{"snapshot-id":2806418501596315192,"type":"branch"}},"format-version":2,"last-sequence-number":1}

extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource/generate-pyiceberg-1.py

+4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
'''
2+
See TESTING.md for how to run this script.
3+
'''
4+
15
from pyiceberg.schema import Schema
26
from pyiceberg.types import NestedField, StringType, DoubleType
37
from pyiceberg.catalog.sql import SqlCatalog
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
'''
2+
See TESTING.md for how to run this script.
3+
'''
4+
5+
import pyarrow as pa
6+
from datetime import datetime
7+
from pyiceberg.catalog.sql import SqlCatalog
8+
from pyiceberg.schema import Schema
9+
from pyiceberg.types import TimestampType, FloatType, DoubleType, StringType, NestedField, StructType
10+
from pyiceberg.partitioning import PartitionSpec, PartitionField
11+
from pyiceberg.transforms import DayTransform, IdentityTransform
12+
13+
catalog = SqlCatalog(
14+
"pyiceberg-2",
15+
**{
16+
"uri": f"sqlite:///dh-iceberg-test.db",
17+
"warehouse": f"catalogs/pyiceberg-2",
18+
},
19+
)
20+
21+
schema = Schema(
22+
NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=False),
23+
NestedField(field_id=2, name="symbol", field_type=StringType(), required=False),
24+
NestedField(field_id=3, name="bid", field_type=DoubleType(), required=False),
25+
NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False),
26+
)
27+
28+
partition_spec = PartitionSpec(
29+
PartitionField(
30+
source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day",
31+
),
32+
PartitionField(
33+
source_id=2, field_id=1001, transform=IdentityTransform(), name="symbol",
34+
)
35+
)
36+
37+
catalog.create_namespace("trading")
38+
39+
tbl = catalog.create_table(
40+
identifier="trading.data",
41+
schema=schema,
42+
partition_spec=partition_spec,
43+
)
44+
45+
# Define the data according to your Iceberg schema
46+
data = [
47+
{"datetime": datetime(2024, 11, 27, 10, 0, 0), "symbol": "AAPL", "bid": 150.25, "ask": 151.0},
48+
{"datetime": datetime(2024, 11, 27, 10, 0, 0), "symbol": "MSFT", "bid": 150.25, "ask": 151.0},
49+
{"datetime": datetime(2024, 11, 26, 10, 1, 0), "symbol": "GOOG", "bid": 2800.75, "ask": 2810.5},
50+
{"datetime": datetime(2024, 11, 26, 10, 2, 0), "symbol": "AMZN", "bid": 3400.5, "ask": 3420.0},
51+
{"datetime": datetime(2024, 11, 28, 10, 3, 0), "symbol": "MSFT", "bid": None, "ask": 250.0},
52+
]
53+
54+
# Create a PyArrow Table
55+
table = pa.Table.from_pylist(data)
56+
57+
# Append the table to the Iceberg table
58+
tbl.append(table)

0 commit comments

Comments
 (0)