Skip to content

Fix projected fields predicate evaluation #2029

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

Erigara
Copy link

@Erigara Erigara commented May 20, 2025

Closes #2028

Rationale for this change

Provide expected result aligned with spark implementation.

Are these changes tested?

I've checked it on script attached to issue + new test was added.

Are there any user-facing changes?

Kinda yes, because results of some scans now different.

# Projected fields are only available for identity partition fields
# Which mean that partition pruning excluded partition field which evaluates to false
elif field_name in self.projected_missing_fields:
return AlwaysTrue()
Copy link
Author

@Erigara Erigara May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the second look such an approach could lead to incorrect results in case of some complex predicates.
For example (P = x AND F = a) OR ( P = y AND F = b) by substituting the term P = ... we would get an incorrect predicate F = a OR F = b.

The correct approach here should be substituting P with concrete value extracted from partition.

Not sure how to implement this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the script which illustrate my point:

bug.py
#!/usr/bin/env python
import polars as pl
from pyiceberg.catalog import load_catalog
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg import expressions as expr
from pyiceberg.transforms import IdentityTransform
from pyiceberg.table import _parquet_files_to_data_files
from pyiceberg.table.name_mapping import NameMapping, MappedField
from pyiceberg.io.pyarrow import pyarrow_to_schema

catalog = load_catalog(
    "default",
    **{
        "type": "in-memory",
        "warehouse": "warehouse/",
    }
)

catalog.create_namespace_if_not_exists("default")

df = pl.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet")

# write filtered data 
file_vendor_1 = "warehouse/VendorID=1_yellow_tripdata_2025-01.parquet"
df.filter(pl.col("VendorID") == 1).drop("VendorID").write_parquet(file_vendor_1)

# create iceberg table
schema = df.to_arrow().schema
mapping = NameMapping([MappedField(field_id=i,names=[name]) for i, name in enumerate(schema.names, 1)])
schema = pyarrow_to_schema(schema, mapping)
table = catalog.create_table_if_not_exists(
    "default.taxi",
    schema=schema,
    partition_spec=PartitionSpec(
        PartitionField(source_id=schema.find_field("VendorID").field_id, field_id=schema.find_field("VendorID").field_id, transform=IdentityTransform(), name="VendorID"),
    ),
    properties={"schema.name-mapping.default": mapping.model_dump_json()},
)

# add_files
files = [file_vendor_1]
with table.transaction() as tx:
    with tx.update_snapshot().fast_append() as fast_append:
        data_files = _parquet_files_to_data_files(
            table_metadata=tx.table_metadata, file_paths=files, io=tx._table.io
        )
        for data_file in data_files:
            # set partition for VendorID
            # current file has only one partition anyway
            # VendorID = 1
            data_file.partition[0] = 1
            fast_append.append_data_file(data_file)

# query with projected field in predicate
scan = table.scan(row_filter=expr.And(
        expr.EqualTo("VendorID", 1),
        expr.GreaterThanOrEqual("tpep_pickup_datetime", '2025-01-01T12:00:00'),
        expr.LessThan("tpep_pickup_datetime", '2025-01-01T18:00:00'),
    ))
print(f"WITH PROJECTED COLUMN LEN: {len(scan.to_arrow())}")

# query without projected field in predicate
scan = table.scan(row_filter=expr.And(
        expr.GreaterThanOrEqual("tpep_pickup_datetime", '2025-01-01T12:00:00'),
        expr.LessThan("tpep_pickup_datetime", '2025-01-01T18:00:00'),
    ))
print(f"WITHOUT PROJECTED COLUM LEN: {len(scan.to_arrow())}")

# add second partition
file_vendor_2 = "warehouse/VendorID=2_yellow_tripdata_2025-01.parquet"
df.filter(pl.col("VendorID") == 2).drop("VendorID").write_parquet(file_vendor_2)

# add_files
files = [file_vendor_2]
with table.transaction() as tx:
    with tx.update_snapshot().fast_append() as fast_append:
        data_files = _parquet_files_to_data_files(
            table_metadata=tx.table_metadata, file_paths=files, io=tx._table.io
        )
        for data_file in data_files:
            # set partition for VendorID
            # current file has only one partition anyway
            # VendorID = 2
            data_file.partition[0] = 2
            fast_append.append_data_file(data_file)

expr_1 = expr.And(
    expr.EqualTo("VendorID", 1),
    expr.GreaterThanOrEqual("tpep_pickup_datetime", '2025-01-01T12:00:00'),
    expr.LessThan("tpep_pickup_datetime", '2025-01-01T18:00:00'),
)
scan = table.scan(row_filter=expr_1)
len_1 = len(scan.to_arrow())
print(f"1: VendorID = 1 AND tpep_pickup_datetime BETWEEN 2025-01-01T12:00:00 AND 2025-01-01T18:00:00: {len_1}")

expr_2 = expr.And(
    expr.EqualTo("VendorID", 2),
    expr.GreaterThanOrEqual("tpep_pickup_datetime", '2025-01-02T12:00:00'),
    expr.LessThan("tpep_pickup_datetime", '2025-01-02T18:00:00'),
)
scan = table.scan(row_filter=expr_2)
len_2 = len(scan.to_arrow())
print(f"2: VendorID = 2 AND tpep_pickup_datetime BETWEEN 2025-01-02T12:00:00 AND 2025-01-02T18:00:00: {len_2}")

expr_3 = expr.Or(expr_1, expr_2)
scan = table.scan(row_filter=expr_3)
len_3 = len(scan.to_arrow())
print(f"3: 1 OR 2: {len(scan.to_arrow())} (== {len_1 + len_2} is {len_3 == len_1 + len_2})")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Erigara Could you condens this in a minimal test that shows the issue? Happy to take a look at it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will try to minimize this further.

Copy link
Author

@Erigara Erigara May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko here is a as small example as i could produce (it still quite big in terms of LoC due to setup).

Basically it works with the following iceberg table:

col1 col2
1 1
1 2
2 1
2 2

Where col1 is used for partitioning and absent from parquet data file.

So physical layout is following:

  • col1 = 1 -> warehouse/1.parquet
  • col2 = 2 -> warehouse/2.parquet

Both files 1.parquet and 2.parquet are the same with following structure {'col2: [1, 2]}.

#!/usr/bin/env python
import pyarrow as pa
import pyarrow.parquet as pq
from pyiceberg import expressions as expr
from pyiceberg.catalog import load_catalog
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.io.pyarrow import parquet_file_to_data_file
from pyiceberg.transforms import IdentityTransform
from pyiceberg.schema import Schema
from pyiceberg.table.name_mapping import NameMapping, MappedField
from pyiceberg.types import (
    NestedField,
    LongType,
)

catalog = load_catalog(
    "default",
    **{
        "type": "in-memory",
        "warehouse": "warehouse/",
    }
)

catalog.create_namespace("default")

# create iceberg table
schema = Schema(
    NestedField(field_id=1, name="col1", field_type=LongType()),
    NestedField(field_id=2, name="col2", field_type=LongType()),
)
mapping = NameMapping([
    MappedField(field_id=1,names=["col1"]),
    MappedField(field_id=2,names=["col2"]),
])
table = catalog.create_table_if_not_exists(
    "default.table",
    schema=schema,
    partition_spec=PartitionSpec(
        PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="col1"),
    ),
    properties={"schema.name-mapping.default": mapping.model_dump_json()},
)

# write 2 parquet files: one for col1 partition values 1 and 2
file_1 = "warehouse/1.parquet"
file_2 = "warehouse/2.parquet"
df = pa.Table.from_arrays([[1, 2]], names=["col2"])
pq.write_table(df, file_1)
pq.write_table(df, file_2)

# add_files into iceberg table assign each file partition value
with table.transaction() as tx:
    with tx.update_snapshot().fast_append() as fast_append:
        data_file_1 = parquet_file_to_data_file(tx._table.io, tx.table_metadata, file_1)
        data_file_1.partition[0] = 1
        fast_append.append_data_file(data_file_1)
        data_file_2 = parquet_file_to_data_file(tx._table.io, tx.table_metadata, file_1)
        data_file_2.partition[0] = 2
        fast_append.append_data_file(data_file_2)

expr_1 = expr.And(expr.EqualTo("col1", 1), expr.EqualTo("col2", 1))
scan_1 = table.scan(row_filter=expr_1)
len_1 = len(scan_1.to_arrow())
assert len_1 == 1

expr_2 = expr.And(expr.EqualTo("col1", 2), expr.EqualTo("col2", 2))
scan_2 = table.scan(row_filter=expr_2)
len_2 = len(scan_2.to_arrow())
assert len_2 == 1

expr_3 = expr.Or(expr_1, expr_2)
scan_3 = table.scan(row_filter=expr_3)
len_3 = len(scan_3.to_arrow())
assert len_3 == len_1 + len_2

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko hi, will you have time to look at the pr any time soon?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @Erigara sorry for the long wait, and thanks for the elaborate example. While it is still quite a few lines of code, it shows the issue clearly. For the PR, the thing that's missing is a proper test. Do you see any chance to add a test that fails on current main, but passes after your patch?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try to came up with the test.

Copy link
Author

@Erigara Erigara Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @Fokko, i've added a test which showcase what changed in term of predicate evaluation.

However strictly speaking it's not directly runnable on current main because in the patch i've extended changed signature of translate_column_names method.
I've split my patch in 3 commits:

  1. just extend signature and rearrange code to pass arguments (no behavior differences)
  2. add test (test fails)
  3. add code to evaluate projected field (test passes)

So does it count? :)

@Erigara Erigara requested a review from Fokko May 21, 2025 17:57
@Erigara Erigara force-pushed the projected_field_predicate branch from 3990a7a to 5aa9940 Compare June 12, 2025 15:01
@Erigara Erigara force-pushed the projected_field_predicate branch from 5aa9940 to 3bb325f Compare June 12, 2025 15:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Scan with filtering on projected field rerurn empty table
2 participants