-
Notifications
You must be signed in to change notification settings - Fork 309
feat: validate_deleted_data_files
#1938
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: validate_deleted_data_files
#1938
Conversation
pyiceberg/table/update/validate.py
Outdated
if entry.snapshot_id not in new_snapshot_ids: | ||
continue | ||
|
||
if entry.status != ManifestEntryStatus.DELETED: | ||
continue | ||
|
||
if data_filter is not None and not evaluator(entry.data_file): | ||
continue | ||
|
||
if partition_set is not None and (entry.data_file.spec_id, entry.data_file.partition) not in partition_set: | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably cleaner to put these each on a line and wrap in an any
call
validate_deleted_data_files
validate_deleted_data_files
pyiceberg/table/update/validate.py
Outdated
parent_snapshot: Optional[Snapshot], | ||
partition_set: Optional[set[Record]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks out of order:
parent_snapshot: Optional[Snapshot], | |
partition_set: Optional[set[Record]], | |
partition_set: Optional[set[Record]], | |
parent_snapshot: Optional[Snapshot], |
It looks like the function call in validate_deleted_data_files
is assuming that parent_snapshot
is the last argument
conflicting_entries = deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs some more work. Next to @sungwy's comment, we in the code below:
(entry.data_file.spec_id, entry.data_file.partition) not in partition_set
Where it checks if a tuple is in a partition_set
, but the partition_set
only contains the Record
according to the signature.
This triggered me, because if you do:
ALTER TABLE prod.db.taxis REPLACE PARTITION FIELD pickup_timestamp WITH day(pickup_timestamp);
-- and then
ALTER TABLE prod.db.taxis REPLACE PARTITION FIELD pickup_timestamp WITH day(dropoff_timestamp);
Both of the partitioning strategies will produce a Record[int]
because it will contain the number of days since epoch. But the meaning is completely different.
pyiceberg/table/update/validate.py
Outdated
starting_snapshot, | ||
parent_snapshot, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks out of order to, because validation_history
has to_snapshot
as the second argument, and from_snapshot
as the third argument.
starting_snapshot, | |
parent_snapshot, | |
starting_snapshot, | |
parent_snapshot, |
Do you think it would be better to update validation_history
function to use the following function signature instead? I think it's a lot more expected to have from_snapshot
then to_snapshot
def validation_history(
table: Table,
from_snapshot: Snapshot,
to_snapshot: Snapshot,
matching_operations: set[Operation],
manifest_content_filter: ManifestContent,
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you clarify a little more? Are you saying we should move to the terms starting_snapshot
(which is from_snapshot
) and parent_snapshot
(to_snapshot
)
pyiceberg/table/update/validate.py
Outdated
) | ||
|
||
if data_filter is not None: | ||
evaluator = _StrictMetricsEvaluator(table.schema(), data_filter).eval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not too sure if this is correct, because ManifestGroup.entries
seems to be using inclusive projection.
Should we be using inclusive_projection
here instead?
Summoning @Fokko for a second review
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree that this should be inclusive projection, since we want to know if there are any matches. Inclusive projection returns rows_might_match
and rows_cannot_match
. If they cannot be matched, then we can skip it :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for following up on the previous PR @jayceslesar. This looks like a great start, but there are some missing elements. It would be good to add some more tests as well, since this is pretty critical code since it might affect correctness.
pyiceberg/table/update/validate.py
Outdated
parent_snapshot: Optional[Snapshot], | ||
partition_set: Optional[set[Record]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs some more work. Next to @sungwy's comment, we in the code below:
(entry.data_file.spec_id, entry.data_file.partition) not in partition_set
Where it checks if a tuple is in a partition_set
, but the partition_set
only contains the Record
according to the signature.
This triggered me, because if you do:
ALTER TABLE prod.db.taxis REPLACE PARTITION FIELD pickup_timestamp WITH day(pickup_timestamp);
-- and then
ALTER TABLE prod.db.taxis REPLACE PARTITION FIELD pickup_timestamp WITH day(dropoff_timestamp);
Both of the partitioning strategies will produce a Record[int]
because it will contain the number of days since epoch. But the meaning is completely different.
pyiceberg/table/update/validate.py
Outdated
) | ||
|
||
if data_filter is not None: | ||
evaluator = _StrictMetricsEvaluator(table.schema(), data_filter).eval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree that this should be inclusive projection, since we want to know if there are any matches. Inclusive projection returns rows_might_match
and rows_cannot_match
. If they cannot be matched, then we can skip it :)
Co-authored-by: Fokko Driesprong <fokko@apache.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @jayceslesar - thanks for adopting all of the review feedback!
I think this is good to merge, but I'd prefer to make validate_deleted_data_files
a private function
Waiting for the CI to pass |
parent_snapshot: Ending snapshot on the branch being validated | ||
|
||
""" | ||
conflicting_entries = _deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, I think style wise it is more elegant to switch over to keyword-arguments:
conflicting_entries = _deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot) | |
conflicting_entries = _deleted_data_files(table, starting_snapshot, data_filter, parent_snapshot=parent_snapshot) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, thanks @jayceslesar for working on this, and thanks @sungwy for the review 🙌
Closes #1928
Rationale for this change
Add
validate_deleted_data_files
which depends on #1935Are these changes tested?
Added a test!
References
Java
deletedDataFiles
impl:https://github.com/apache/iceberg/blob/3a29199e73f2e9ae0f8f92a1a0732a338c66aa0d/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L678
Java
ManifestGroup.entries
impl:https://github.com/apache/iceberg/blob/3a29199e73f2e9ae0f8f92a1a0732a338c66aa0d/core/src/main/java/org/apache/iceberg/ManifestGroup.java#L242