Open
Description
Apache Iceberg version
0.9.1 (latest release)
Please describe the bug 🐞
I've noticed than when including filters on projected fields (not included in parquet data files) scan return empty result.
Spark implementation produce correct result here.
I did a bit of digging and it looks like the problem is somewhere in ds.Scanner.from_fragment
function call here because it produces empty batch iterator.
I've noticed that translated_row_filter
and bound_file_filter
are AlwaysFalse
(due to VendorId
missing from file_schema
).
Maybe filters for projected columns could be excluded from filter somehow.
Second issue might be that batch is first filtred and only than projected column is added.
I would like to contribute but some directions would be nice.
Here is self-contained script to reproduce the issue:
#!/usr/bin/env python
import polars as pl
from pyiceberg.catalog import load_catalog
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg import expressions as expr
from pyiceberg.transforms import IdentityTransform
from pyiceberg.table import _parquet_files_to_data_files
from pyiceberg.table.name_mapping import NameMapping, MappedField
from pyiceberg.io.pyarrow import pyarrow_to_schema
catalog = load_catalog(
"default",
**{
"type": "in-memory",
"warehouse": "warehouse/",
}
)
catalog.create_namespace_if_not_exists("default")
# write filtered data
file = "warehouse/VendorID=1_yellow_tripdata_2025-01.parquet"
df = pl.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet")
df.filter(pl.col("VendorID") == 1).drop("VendorID").write_parquet(file)
# create iceberg table
df = df.to_arrow()
schema = df.schema
mapping = NameMapping([MappedField(field_id=i,names=[name]) for i, name in enumerate(schema.names, 1)])
schema = pyarrow_to_schema(schema, mapping)
table = catalog.create_table_if_not_exists(
"default.taxi",
schema=schema,
partition_spec=PartitionSpec(
PartitionField(source_id=schema.find_field("VendorID").field_id, field_id=schema.find_field("VendorID").field_id, transform=IdentityTransform(), name="VendorID"),
),
properties={"schema.name-mapping.default": mapping.model_dump_json()},
)
# add_files
files = [file]
with table.transaction() as tx:
with tx.update_snapshot().fast_append() as fast_append:
data_files = _parquet_files_to_data_files(
table_metadata=tx.table_metadata, file_paths=files, io=tx._table.io
)
for data_file in data_files:
# set partition for VendorID
# current file has only one partition anyway
# VendorID = 1
data_file.partition[0] = 1
fast_append.append_data_file(data_file)
# query with projected field in predicate
scan = table.scan(row_filter=expr.EqualTo("VendorID", 1))
print(f"WITH PROJECTED COLUMN LEN: {len(scan.to_arrow())}")
# query without projected field in predicate
scan = table.scan()
print(f"WITHOUT PROJECTED COLUM LEN: {len(scan.to_arrow())}")
Willingness to contribute
- I can contribute a fix for this bug independently
- I would be willing to contribute a fix for this bug with guidance from the Iceberg community
- I cannot contribute a fix for this bug at this time
Metadata
Metadata
Assignees
Labels
No labels