-
Notifications
You must be signed in to change notification settings - Fork 302
Fix projected fields predicate evaluation #2029
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
pyiceberg/expressions/visitors.py
Outdated
# Projected fields are only available for identity partition fields | ||
# Which mean that partition pruning excluded partition field which evaluates to false | ||
elif field_name in self.projected_missing_fields: | ||
return AlwaysTrue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the second look such an approach could lead to incorrect results in case of some complex predicates.
For example (P = x AND F = a) OR ( P = y AND F = b)
by substituting the term P = ...
we would get an incorrect predicate F = a OR F = b
.
The correct approach here should be substituting P
with concrete value extracted from partition.
Not sure how to implement this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the script which illustrate my point:
bug.py
#!/usr/bin/env python
import polars as pl
from pyiceberg.catalog import load_catalog
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg import expressions as expr
from pyiceberg.transforms import IdentityTransform
from pyiceberg.table import _parquet_files_to_data_files
from pyiceberg.table.name_mapping import NameMapping, MappedField
from pyiceberg.io.pyarrow import pyarrow_to_schema
catalog = load_catalog(
"default",
**{
"type": "in-memory",
"warehouse": "warehouse/",
}
)
catalog.create_namespace_if_not_exists("default")
df = pl.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet")
# write filtered data
file_vendor_1 = "warehouse/VendorID=1_yellow_tripdata_2025-01.parquet"
df.filter(pl.col("VendorID") == 1).drop("VendorID").write_parquet(file_vendor_1)
# create iceberg table
schema = df.to_arrow().schema
mapping = NameMapping([MappedField(field_id=i,names=[name]) for i, name in enumerate(schema.names, 1)])
schema = pyarrow_to_schema(schema, mapping)
table = catalog.create_table_if_not_exists(
"default.taxi",
schema=schema,
partition_spec=PartitionSpec(
PartitionField(source_id=schema.find_field("VendorID").field_id, field_id=schema.find_field("VendorID").field_id, transform=IdentityTransform(), name="VendorID"),
),
properties={"schema.name-mapping.default": mapping.model_dump_json()},
)
# add_files
files = [file_vendor_1]
with table.transaction() as tx:
with tx.update_snapshot().fast_append() as fast_append:
data_files = _parquet_files_to_data_files(
table_metadata=tx.table_metadata, file_paths=files, io=tx._table.io
)
for data_file in data_files:
# set partition for VendorID
# current file has only one partition anyway
# VendorID = 1
data_file.partition[0] = 1
fast_append.append_data_file(data_file)
# query with projected field in predicate
scan = table.scan(row_filter=expr.And(
expr.EqualTo("VendorID", 1),
expr.GreaterThanOrEqual("tpep_pickup_datetime", '2025-01-01T12:00:00'),
expr.LessThan("tpep_pickup_datetime", '2025-01-01T18:00:00'),
))
print(f"WITH PROJECTED COLUMN LEN: {len(scan.to_arrow())}")
# query without projected field in predicate
scan = table.scan(row_filter=expr.And(
expr.GreaterThanOrEqual("tpep_pickup_datetime", '2025-01-01T12:00:00'),
expr.LessThan("tpep_pickup_datetime", '2025-01-01T18:00:00'),
))
print(f"WITHOUT PROJECTED COLUM LEN: {len(scan.to_arrow())}")
# add second partition
file_vendor_2 = "warehouse/VendorID=2_yellow_tripdata_2025-01.parquet"
df.filter(pl.col("VendorID") == 2).drop("VendorID").write_parquet(file_vendor_2)
# add_files
files = [file_vendor_2]
with table.transaction() as tx:
with tx.update_snapshot().fast_append() as fast_append:
data_files = _parquet_files_to_data_files(
table_metadata=tx.table_metadata, file_paths=files, io=tx._table.io
)
for data_file in data_files:
# set partition for VendorID
# current file has only one partition anyway
# VendorID = 2
data_file.partition[0] = 2
fast_append.append_data_file(data_file)
expr_1 = expr.And(
expr.EqualTo("VendorID", 1),
expr.GreaterThanOrEqual("tpep_pickup_datetime", '2025-01-01T12:00:00'),
expr.LessThan("tpep_pickup_datetime", '2025-01-01T18:00:00'),
)
scan = table.scan(row_filter=expr_1)
len_1 = len(scan.to_arrow())
print(f"1: VendorID = 1 AND tpep_pickup_datetime BETWEEN 2025-01-01T12:00:00 AND 2025-01-01T18:00:00: {len_1}")
expr_2 = expr.And(
expr.EqualTo("VendorID", 2),
expr.GreaterThanOrEqual("tpep_pickup_datetime", '2025-01-02T12:00:00'),
expr.LessThan("tpep_pickup_datetime", '2025-01-02T18:00:00'),
)
scan = table.scan(row_filter=expr_2)
len_2 = len(scan.to_arrow())
print(f"2: VendorID = 2 AND tpep_pickup_datetime BETWEEN 2025-01-02T12:00:00 AND 2025-01-02T18:00:00: {len_2}")
expr_3 = expr.Or(expr_1, expr_2)
scan = table.scan(row_filter=expr_3)
len_3 = len(scan.to_arrow())
print(f"3: 1 OR 2: {len(scan.to_arrow())} (== {len_1 + len_2} is {len_3 == len_1 + len_2})")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Erigara Could you condens this in a minimal test that shows the issue? Happy to take a look at it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will try to minimize this further.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko here is a as small example as i could produce (it still quite big in terms of LoC due to setup).
Basically it works with the following iceberg table:
col1 | col2 |
---|---|
1 | 1 |
1 | 2 |
2 | 1 |
2 | 2 |
Where col1
is used for partitioning and absent from parquet data file.
So physical layout is following:
col1 = 1 -> warehouse/1.parquet
col2 = 2 -> warehouse/2.parquet
Both files 1.parquet
and 2.parquet
are the same with following structure {'col2: [1, 2]}
.
#!/usr/bin/env python
import pyarrow as pa
import pyarrow.parquet as pq
from pyiceberg import expressions as expr
from pyiceberg.catalog import load_catalog
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.io.pyarrow import parquet_file_to_data_file
from pyiceberg.transforms import IdentityTransform
from pyiceberg.schema import Schema
from pyiceberg.table.name_mapping import NameMapping, MappedField
from pyiceberg.types import (
NestedField,
LongType,
)
catalog = load_catalog(
"default",
**{
"type": "in-memory",
"warehouse": "warehouse/",
}
)
catalog.create_namespace("default")
# create iceberg table
schema = Schema(
NestedField(field_id=1, name="col1", field_type=LongType()),
NestedField(field_id=2, name="col2", field_type=LongType()),
)
mapping = NameMapping([
MappedField(field_id=1,names=["col1"]),
MappedField(field_id=2,names=["col2"]),
])
table = catalog.create_table_if_not_exists(
"default.table",
schema=schema,
partition_spec=PartitionSpec(
PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="col1"),
),
properties={"schema.name-mapping.default": mapping.model_dump_json()},
)
# write 2 parquet files: one for col1 partition values 1 and 2
file_1 = "warehouse/1.parquet"
file_2 = "warehouse/2.parquet"
df = pa.Table.from_arrays([[1, 2]], names=["col2"])
pq.write_table(df, file_1)
pq.write_table(df, file_2)
# add_files into iceberg table assign each file partition value
with table.transaction() as tx:
with tx.update_snapshot().fast_append() as fast_append:
data_file_1 = parquet_file_to_data_file(tx._table.io, tx.table_metadata, file_1)
data_file_1.partition[0] = 1
fast_append.append_data_file(data_file_1)
data_file_2 = parquet_file_to_data_file(tx._table.io, tx.table_metadata, file_1)
data_file_2.partition[0] = 2
fast_append.append_data_file(data_file_2)
expr_1 = expr.And(expr.EqualTo("col1", 1), expr.EqualTo("col2", 1))
scan_1 = table.scan(row_filter=expr_1)
len_1 = len(scan_1.to_arrow())
assert len_1 == 1
expr_2 = expr.And(expr.EqualTo("col1", 2), expr.EqualTo("col2", 2))
scan_2 = table.scan(row_filter=expr_2)
len_2 = len(scan_2.to_arrow())
assert len_2 == 1
expr_3 = expr.Or(expr_1, expr_2)
scan_3 = table.scan(row_filter=expr_3)
len_3 = len(scan_3.to_arrow())
assert len_3 == len_1 + len_2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko hi, will you have time to look at the pr any time soon?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @Erigara sorry for the long wait, and thanks for the elaborate example. While it is still quite a few lines of code, it shows the issue clearly. For the PR, the thing that's missing is a proper test. Do you see any chance to add a test that fails on current main, but passes after your patch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will try to came up with the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @Fokko, i've added a test which showcase what changed in term of predicate evaluation.
However strictly speaking it's not directly runnable on current main because in the patch i've extended changed signature of translate_column_names
method.
I've split my patch in 3 commits:
- just extend signature and rearrange code to pass arguments (no behavior differences)
- add test (test fails)
- add code to evaluate projected field (test passes)
So does it count? :)
3990a7a
to
5aa9940
Compare
5aa9940
to
3bb325f
Compare
Closes #2028
Rationale for this change
Provide expected result aligned with
spark
implementation.Are these changes tested?
I've checked it on script attached to issue + new test was added.
Are there any user-facing changes?
Kinda yes, because results of some scans now different.