From 5285cacfb832dc3639cd6bab1a61f8fe147c0150 Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Tue, 27 May 2025 19:56:52 -0400 Subject: [PATCH 1/6] add validations for added data files --- pyiceberg/table/update/validate.py | 55 ++++++++++++++++++++++++++++-- 1 file changed, 53 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 55c34676e3..d10e8747fc 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -14,7 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from typing import Iterator, Optional +from typing import Iterator, Optional, Set from pyiceberg.exceptions import ValidationException from pyiceberg.expressions import BooleanExpression @@ -24,7 +24,8 @@ from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between from pyiceberg.typedef import Record -VALIDATE_DATA_FILES_EXIST_OPERATIONS = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE} +VALIDATE_DATA_FILES_EXIST_OPERATIONS: Set[Operation] = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE} +VALIDATE_ADDED_FILES_OPERATIONS: Set[Operation] = {Operation.APPEND, Operation.OVERWRITE} def validation_history( @@ -150,3 +151,53 @@ def _validate_deleted_data_files( if any(conflicting_entries): conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries} raise ValidationException(f"Deleted data files were found matching the filter for snapshots {conflicting_snapshots}!") + +def _added_data_files( + table: Table, + starting_snapshot: Snapshot, + data_filter: Optional[BooleanExpression], + partition_set: Optional[dict[int, set[Record]]], + parent_snapshot: Optional[Snapshot], +) -> Iterator[ManifestEntry]: + if parent_snapshot is None: + return + + manifests, snapshot_ids = validation_history( + table, + parent_snapshot, + starting_snapshot, + VALIDATE_ADDED_FILES_OPERATIONS, + ManifestContent.DATA, + ) + + if data_filter is not None: + evaluator = _InclusiveMetricsEvaluator(table.schema(), data_filter) + + for manifest in manifests: + for entry in manifest.fetch_manifest_entry(table.io): + if entry.snapshot_id not in snapshot_ids: + continue + + if data_filter and evaluator.eval(entry.data_file) is ROWS_CANNOT_MATCH: + continue + + if partition_set is not None: + partition = entry.data_file.partition + spec_id = entry.data_file.spec_id + if spec_id not in partition_set or partition not in partition_set[spec_id]: + continue + + yield entry + + +def validate_added_data_files( + table: Table, + starting_snapshot: Snapshot, + data_filter: Optional[BooleanExpression], + parent_snapshot: Optional[Snapshot], +) -> None: + conflicting_entries = _added_data_files(table, starting_snapshot, data_filter, None, parent_snapshot) + + if any(conflicting_entries): + conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries if entry.snapshot_id is not None} + raise ValidationException(f"Added data files were found matching the filter for snapshots {conflicting_snapshots}!") From 3dd8a74645e1635fcbf6b706d48b16d647fde637 Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Tue, 27 May 2025 21:40:26 -0400 Subject: [PATCH 2/6] make function hidden in validate --- pyiceberg/table/update/validate.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index d10e8747fc..18b217a681 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -152,6 +152,7 @@ def _validate_deleted_data_files( conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries} raise ValidationException(f"Deleted data files were found matching the filter for snapshots {conflicting_snapshots}!") + def _added_data_files( table: Table, starting_snapshot: Snapshot, @@ -190,7 +191,7 @@ def _added_data_files( yield entry -def validate_added_data_files( +def _validate_added_data_files( table: Table, starting_snapshot: Snapshot, data_filter: Optional[BooleanExpression], From 38a4cdf224f1ac670730b461476208560c75bf2a Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Tue, 27 May 2025 21:40:50 -0400 Subject: [PATCH 3/6] add tests for added data files validations --- tests/table/test_validate.py | 135 ++++++++++++++++++++++++++++++++++- 1 file changed, 134 insertions(+), 1 deletion(-) diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py index 74a0b59566..15bda8eb7c 100644 --- a/tests/table/test_validate.py +++ b/tests/table/test_validate.py @@ -25,7 +25,13 @@ from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Snapshot, Summary -from pyiceberg.table.update.validate import _deleted_data_files, _validate_deleted_data_files, validation_history +from pyiceberg.table.update.validate import ( + _added_data_files, + _deleted_data_files, + _validate_added_data_files, + _validate_deleted_data_files, + validation_history, +) @pytest.fixture @@ -217,3 +223,130 @@ class DummyEntry: data_filter=None, parent_snapshot=oldest_snapshot, ) + + +@pytest.mark.parametrize("operation", [Operation.APPEND, Operation.OVERWRITE]) +def test_validate_added_data_files_conflicting_count( + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], + operation: Operation, +) -> None: + table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests + + snapshot_history = 100 + snapshots = table.snapshots() + for i in range(1, snapshot_history + 1): + altered_snapshot = snapshots[-i] + altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)}) + snapshots[-i] = altered_snapshot + + table.metadata = table.metadata.model_copy( + update={"snapshots": snapshots}, + ) + + oldest_snapshot = table.snapshots()[-snapshot_history] + newest_snapshot = cast(Snapshot, table.current_snapshot()) + + def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]: + """Mock the manifests method to use the snapshot_id for lookup.""" + snapshot_id = self.snapshot_id + if snapshot_id in mock_manifests: + return mock_manifests[snapshot_id] + return [] + + def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]: + return [ + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + snapshot_id=self.added_snapshot_id, + ) + ] + + with ( + patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect), + patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry), + ): + result = list( + _added_data_files( + table=table, + starting_snapshot=newest_snapshot, + data_filter=None, + parent_snapshot=oldest_snapshot, + partition_set=None, + ) + ) + + # since we only look at the ManifestContent.Data files + assert len(result) == snapshot_history / 2 + + +@pytest.mark.parametrize("operation", [Operation.DELETE, Operation.REPLACE]) +def test_validate_added_data_files_non_conflicting_count( + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], + operation: Operation, +) -> None: + table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests + + snapshot_history = 100 + snapshots = table.snapshots() + for i in range(1, snapshot_history + 1): + altered_snapshot = snapshots[-i] + altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)}) + snapshots[-i] = altered_snapshot + + table.metadata = table.metadata.model_copy( + update={"snapshots": snapshots}, + ) + + oldest_snapshot = table.snapshots()[-snapshot_history] + newest_snapshot = cast(Snapshot, table.current_snapshot()) + + def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]: + """Mock the manifests method to use the snapshot_id for lookup.""" + snapshot_id = self.snapshot_id + if snapshot_id in mock_manifests: + return mock_manifests[snapshot_id] + return [] + + def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]: + return [ + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + snapshot_id=self.added_snapshot_id, + ) + ] + + with ( + patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect), + patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry), + ): + result = list( + _added_data_files( + table=table, + starting_snapshot=newest_snapshot, + data_filter=None, + parent_snapshot=oldest_snapshot, + partition_set=None, + ) + ) + + assert len(result) == 0 + + +def test_validate_added_data_files_raises_on_conflict( + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], +) -> None: + table, _ = table_v2_with_extensive_snapshots_and_manifests + oldest_snapshot = table.snapshots()[0] + newest_snapshot = cast(Snapshot, table.current_snapshot()) + + class DummyEntry: + snapshot_id = 123 + + with patch("pyiceberg.table.update.validate._added_data_files", return_value=[DummyEntry()]): + with pytest.raises(ValidationException): + _validate_added_data_files( + table=table, + starting_snapshot=newest_snapshot, + data_filter=None, + parent_snapshot=oldest_snapshot, + ) From 98c47ef75067bf76c40f4e10cb84d8606472f711 Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Sun, 1 Jun 2025 13:50:33 -0400 Subject: [PATCH 4/6] docs: add docstrings to the functions --- pyiceberg/table/update/validate.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 18b217a681..b05c40cc25 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -160,6 +160,18 @@ def _added_data_files( partition_set: Optional[dict[int, set[Record]]], parent_snapshot: Optional[Snapshot], ) -> Iterator[ManifestEntry]: + """Return manifest entries for data files added between the starting snapshot and parent snapshot. + + Args: + table: Table to get the history from + starting_snapshot: Starting snapshot to get the history from + data_filter: Optional filter to match data files + partition_set: Optional set of partitions to match data files + parent_snapshot: Parent snapshot to get the history from + + Returns: + Iterator of manifest entries for added data files matching the conditions + """ if parent_snapshot is None: return @@ -197,8 +209,16 @@ def _validate_added_data_files( data_filter: Optional[BooleanExpression], parent_snapshot: Optional[Snapshot], ) -> None: - conflicting_entries = _added_data_files(table, starting_snapshot, data_filter, None, parent_snapshot) + """Validate that no files matching a filter have been added to the table since a starting snapshot. + Args: + table: Table to validate + starting_snapshot: Snapshot current at the start of the operation + data_filter: Expression used to find added data files + parent_snapshot: Ending snapshot on the branch being validated + + """ + conflicting_entries = _added_data_files(table, starting_snapshot, data_filter, None, parent_snapshot) if any(conflicting_entries): conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries if entry.snapshot_id is not None} raise ValidationException(f"Added data files were found matching the filter for snapshots {conflicting_snapshots}!") From 5adb9269adb949402ffcd232eb75ae59d89d4d0f Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Sun, 1 Jun 2025 13:51:50 -0400 Subject: [PATCH 5/6] change variable name to be more specific --- pyiceberg/table/update/validate.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index b05c40cc25..4cea02ea35 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -25,7 +25,7 @@ from pyiceberg.typedef import Record VALIDATE_DATA_FILES_EXIST_OPERATIONS: Set[Operation] = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE} -VALIDATE_ADDED_FILES_OPERATIONS: Set[Operation] = {Operation.APPEND, Operation.OVERWRITE} +VALIDATE_ADDED_DATA_FILES_OPERATIONS: Set[Operation] = {Operation.APPEND, Operation.OVERWRITE} def validation_history( @@ -179,7 +179,7 @@ def _added_data_files( table, parent_snapshot, starting_snapshot, - VALIDATE_ADDED_FILES_OPERATIONS, + VALIDATE_ADDED_DATA_FILES_OPERATIONS, ManifestContent.DATA, ) From 9081c33bbe389b3f32db392e3e0a955e2b4e3d2c Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Sun, 1 Jun 2025 14:20:08 -0400 Subject: [PATCH 6/6] refactor code to avoid redundancies in validations --- pyiceberg/table/update/validate.py | 83 +++++++++++++++++------------- 1 file changed, 48 insertions(+), 35 deletions(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 4cea02ea35..858d0544b4 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -20,6 +20,7 @@ from pyiceberg.expressions import BooleanExpression from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, _InclusiveMetricsEvaluator from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile +from pyiceberg.schema import Schema from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between from pyiceberg.typedef import Record @@ -78,6 +79,47 @@ def validation_history( return manifests_files, snapshots +def _filter_manifest_entries( + entry: ManifestEntry, + snapshot_ids: set[int], + data_filter: Optional[BooleanExpression], + partition_set: Optional[dict[int, set[Record]]], + entry_status: Optional[ManifestEntryStatus], + schema: Schema, +) -> bool: + """Filter manifest entries based on data filter and partition set. + + Args: + entry: Manifest entry to filter + snapshot_ids: set of snapshot ids to match data files + data_filter: Optional filter to match data files + partition_set: Optional set of partitions to match data files + status: Optional status to match data files + table: Table containing the schema for filtering + + Returns: + True if the entry should be included, False otherwise + """ + if entry.snapshot_id not in snapshot_ids: + return False + + if entry_status is not None and entry.status != entry_status: + return False + + if data_filter is not None: + evaluator = _InclusiveMetricsEvaluator(schema, data_filter) + if evaluator.eval(entry.data_file) is ROWS_CANNOT_MATCH: + return False + + if partition_set is not None: + partition = entry.data_file.partition + spec_id = entry.data_file.spec_id + if spec_id not in partition_set or partition not in partition_set[spec_id]: + return False + + return True + + def _deleted_data_files( table: Table, starting_snapshot: Snapshot, @@ -109,27 +151,12 @@ def _deleted_data_files( ManifestContent.DATA, ) - if data_filter is not None: - evaluator = _InclusiveMetricsEvaluator(table.schema(), data_filter).eval - for manifest in manifests: for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False): - if entry.snapshot_id not in snapshot_ids: - continue - - if entry.status != ManifestEntryStatus.DELETED: - continue - - if data_filter is not None and evaluator(entry.data_file) is ROWS_CANNOT_MATCH: - continue - - if partition_set is not None: - spec_id = entry.data_file.spec_id - partition = entry.data_file.partition - if spec_id not in partition_set or partition not in partition_set[spec_id]: - continue - - yield entry + if _filter_manifest_entries( + entry, snapshot_ids, data_filter, partition_set, ManifestEntryStatus.DELETED, table.schema() + ): + yield entry def _validate_deleted_data_files( @@ -183,24 +210,10 @@ def _added_data_files( ManifestContent.DATA, ) - if data_filter is not None: - evaluator = _InclusiveMetricsEvaluator(table.schema(), data_filter) - for manifest in manifests: for entry in manifest.fetch_manifest_entry(table.io): - if entry.snapshot_id not in snapshot_ids: - continue - - if data_filter and evaluator.eval(entry.data_file) is ROWS_CANNOT_MATCH: - continue - - if partition_set is not None: - partition = entry.data_file.partition - spec_id = entry.data_file.spec_id - if spec_id not in partition_set or partition not in partition_set[spec_id]: - continue - - yield entry + if _filter_manifest_entries(entry, snapshot_ids, data_filter, partition_set, None, table.schema()): + yield entry def _validate_added_data_files(