Skip to content

Scan with filtering on projected field rerurn empty table #2028

Open
@Erigara

Description

@Erigara

Apache Iceberg version

0.9.1 (latest release)

Please describe the bug 🐞

I've noticed than when including filters on projected fields (not included in parquet data files) scan return empty result.

Spark implementation produce correct result here.

I did a bit of digging and it looks like the problem is somewhere in ds.Scanner.from_fragment function call here because it produces empty batch iterator.
I've noticed that translated_row_filter and bound_file_filter are AlwaysFalse (due to VendorId missing from file_schema).
Maybe filters for projected columns could be excluded from filter somehow.

Second issue might be that batch is first filtred and only than projected column is added.

I would like to contribute but some directions would be nice.

Here is self-contained script to reproduce the issue:

#!/usr/bin/env python
import polars as pl
from pyiceberg.catalog import load_catalog
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg import expressions as expr
from pyiceberg.transforms import IdentityTransform
from pyiceberg.table import _parquet_files_to_data_files
from pyiceberg.table.name_mapping import NameMapping, MappedField
from pyiceberg.io.pyarrow import pyarrow_to_schema

catalog = load_catalog(
    "default",
    **{
        "type": "in-memory",
        "warehouse": "warehouse/",
    }
)

catalog.create_namespace_if_not_exists("default")

# write filtered data 
file = "warehouse/VendorID=1_yellow_tripdata_2025-01.parquet"
df = pl.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet")
df.filter(pl.col("VendorID") == 1).drop("VendorID").write_parquet(file)

# create iceberg table
df = df.to_arrow()
schema = df.schema
mapping = NameMapping([MappedField(field_id=i,names=[name]) for i, name in enumerate(schema.names, 1)])
schema = pyarrow_to_schema(schema, mapping)
table = catalog.create_table_if_not_exists(
    "default.taxi",
    schema=schema,
    partition_spec=PartitionSpec(
        PartitionField(source_id=schema.find_field("VendorID").field_id, field_id=schema.find_field("VendorID").field_id, transform=IdentityTransform(), name="VendorID"),
    ),
    properties={"schema.name-mapping.default": mapping.model_dump_json()},
)

# add_files
files = [file]
with table.transaction() as tx:
    with tx.update_snapshot().fast_append() as fast_append:
        data_files = _parquet_files_to_data_files(
            table_metadata=tx.table_metadata, file_paths=files, io=tx._table.io
        )
        for data_file in data_files:
            # set partition for VendorID
            # current file has only one partition anyway
            # VendorID = 1
            data_file.partition[0] = 1
            fast_append.append_data_file(data_file)

# query with projected field in predicate
scan = table.scan(row_filter=expr.EqualTo("VendorID", 1))
print(f"WITH PROJECTED COLUMN LEN: {len(scan.to_arrow())}")

# query without projected field in predicate
scan = table.scan()
print(f"WITHOUT PROJECTED COLUM LEN: {len(scan.to_arrow())}")

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions