Skip to content

feat: validate_deleted_data_files #1938

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
May 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
beff92b
feat: validation history
jayceslesar Apr 18, 2025
c369720
format
jayceslesar Apr 18, 2025
41bb8a4
almost a working test
jayceslesar Apr 18, 2025
763e9f4
allow content_filter in snapshot.manifests
jayceslesar Apr 18, 2025
f200beb
simplify order of arguments to validation_history
jayceslesar Apr 18, 2025
7f6bf9d
simplify return in snapshot.manifests
jayceslesar Apr 18, 2025
c63cc55
tests passing
jayceslesar Apr 19, 2025
f2f3a88
correct ancestors_between
jayceslesar Apr 19, 2025
74d5569
fix to/from logic and allow optional `to_snapshot` arg in `validation…
jayceslesar Apr 19, 2025
167f9e4
remove a level of nesting with smarter clause
jayceslesar Apr 19, 2025
efe50b4
fix bad accessor
jayceslesar Apr 19, 2025
0793713
fix docstring
jayceslesar Apr 19, 2025
89120aa
[wip] feat: `validate_deleted_data_files`
jayceslesar Apr 19, 2025
a39abd2
first dummy test case
jayceslesar Apr 19, 2025
cf5b061
first working tests (needs cleanup)
jayceslesar Apr 19, 2025
645e8df
Merge branch 'main' into feat/validate-deleted-data-files
jayceslesar May 1, 2025
caf7e36
bring back all code from silly merge
jayceslesar May 1, 2025
a52c422
last tweaks
jayceslesar May 1, 2025
d3df4a7
Merge branch 'main' into feat/validate-deleted-data-files
jayceslesar May 10, 2025
9eca29f
fix order of args in deleted_data_files
jayceslesar May 10, 2025
fc83d42
make `deleted_data_files` private
jayceslesar May 10, 2025
ee3959a
Update pyiceberg/table/update/validate.py
jayceslesar May 10, 2025
a51c2da
ise inclusive metrics evaluator and add rows cannot match check
jayceslesar May 10, 2025
ec86695
show what snapshot IDs conflict
jayceslesar May 10, 2025
0a6b781
maybe correct partition_spec impl?
jayceslesar May 10, 2025
a96da01
fix ordering to match
jayceslesar May 10, 2025
03b2913
make private and add test
jayceslesar May 15, 2025
aae22e6
Merge branch 'main' into feat/validate-deleted-data-files
jayceslesar May 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 82 additions & 1 deletion pyiceberg/table/update/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Iterator, Optional

from pyiceberg.exceptions import ValidationException
from pyiceberg.manifest import ManifestContent, ManifestFile
from pyiceberg.expressions import BooleanExpression
from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, _InclusiveMetricsEvaluator
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
from pyiceberg.table import Table
from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
from pyiceberg.typedef import Record

VALIDATE_DATA_FILES_EXIST_OPERATIONS = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE}


def validation_history(
Expand Down Expand Up @@ -69,3 +75,78 @@ def validation_history(
raise ValidationException("No matching snapshot found.")

return manifests_files, snapshots


def _deleted_data_files(
table: Table,
starting_snapshot: Snapshot,
data_filter: Optional[BooleanExpression],
partition_set: Optional[dict[int, set[Record]]],
parent_snapshot: Optional[Snapshot],
) -> Iterator[ManifestEntry]:
"""Find deleted data files matching a filter since a starting snapshot.

Args:
table: Table to validate
starting_snapshot: Snapshot current at the start of the operation
data_filter: Expression used to find deleted data files
partition_set: dict of {spec_id: set[partition]} to filter on
parent_snapshot: Ending snapshot on the branch being validated

Returns:
List of conflicting manifest-entries
"""
# if there is no current table state, no files have been deleted
if parent_snapshot is None:
return

manifests, snapshot_ids = validation_history(
table,
parent_snapshot,
starting_snapshot,
VALIDATE_DATA_FILES_EXIST_OPERATIONS,
ManifestContent.DATA,
)

if data_filter is not None:
evaluator = _InclusiveMetricsEvaluator(table.schema(), data_filter).eval

for manifest in manifests:
for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False):
if entry.snapshot_id not in snapshot_ids:
continue

if entry.status != ManifestEntryStatus.DELETED:
continue

if data_filter is not None and evaluator(entry.data_file) is ROWS_CANNOT_MATCH:
continue

if partition_set is not None:
spec_id = entry.data_file.spec_id
partition = entry.data_file.partition
if spec_id not in partition_set or partition not in partition_set[spec_id]:
continue

yield entry


def _validate_deleted_data_files(
table: Table,
starting_snapshot: Snapshot,
data_filter: Optional[BooleanExpression],
parent_snapshot: Snapshot,
) -> None:
"""Validate that no files matching a filter have been deleted from the table since a starting snapshot.

Args:
table: Table to validate
starting_snapshot: Snapshot current at the start of the operation
data_filter: Expression used to find deleted data files
parent_snapshot: Ending snapshot on the branch being validated

"""
conflicting_entries = _deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, I think style wise it is more elegant to switch over to keyword-arguments:

Suggested change
conflicting_entries = _deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot)
conflicting_entries = _deleted_data_files(table, starting_snapshot, data_filter, parent_snapshot=parent_snapshot)

if any(conflicting_entries):
conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries}
raise ValidationException(f"Deleted data files were found matching the filter for snapshots {conflicting_snapshots}!")
87 changes: 84 additions & 3 deletions tests/table/test_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@

from pyiceberg.exceptions import ValidationException
from pyiceberg.io import FileIO
from pyiceberg.manifest import ManifestContent, ManifestFile
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
from pyiceberg.table import Table
from pyiceberg.table.snapshots import Operation, Snapshot
from pyiceberg.table.update.validate import validation_history
from pyiceberg.table.snapshots import Operation, Snapshot, Summary
from pyiceberg.table.update.validate import _deleted_data_files, _validate_deleted_data_files, validation_history


@pytest.fixture
Expand Down Expand Up @@ -136,3 +136,84 @@ def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestF
{Operation.APPEND},
ManifestContent.DATA,
)


def test_deleted_data_files(
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
) -> None:
table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests

oldest_snapshot = table.snapshots()[0]
newest_snapshot = cast(Snapshot, table.current_snapshot())

def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]:
"""Mock the manifests method to use the snapshot_id for lookup."""
snapshot_id = self.snapshot_id
if snapshot_id in mock_manifests:
return mock_manifests[snapshot_id]
return []

# every snapshot is an append, so we should get nothing!
with patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect):
result = list(
_deleted_data_files(
table=table,
starting_snapshot=newest_snapshot,
data_filter=None,
parent_snapshot=oldest_snapshot,
partition_set=None,
)
)

assert result == []

# modify second to last snapshot to be a delete
snapshots = table.snapshots()
altered_snapshot = snapshots[-2]
altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=Operation.DELETE)})
snapshots[-2] = altered_snapshot

table.metadata = table.metadata.model_copy(
update={"snapshots": snapshots},
)

my_entry = ManifestEntry.from_args(
status=ManifestEntryStatus.DELETED,
snapshot_id=altered_snapshot.snapshot_id,
)

with (
patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect),
patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", return_value=[my_entry]),
):
result = list(
_deleted_data_files(
table=table,
starting_snapshot=newest_snapshot,
data_filter=None,
parent_snapshot=oldest_snapshot,
partition_set=None,
)
)

assert result == [my_entry]


def test_validate_deleted_data_files_raises_on_conflict(
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
) -> None:
table, _ = table_v2_with_extensive_snapshots_and_manifests
oldest_snapshot = table.snapshots()[0]
newest_snapshot = cast(Snapshot, table.current_snapshot())

class DummyEntry:
snapshot_id = 123

with patch("pyiceberg.table.update.validate._deleted_data_files", return_value=[DummyEntry()]):
with pytest.raises(ValidationException):
_validate_deleted_data_files(
table=table,
starting_snapshot=newest_snapshot,
data_filter=None,
parent_snapshot=oldest_snapshot,
)