Skip to content

Commit e98f685

Browse files
jayceslesarFokko
andauthored
feat: validate_deleted_data_files (#1938)
Closes #1928 # Rationale for this change Add `validate_deleted_data_files` which depends on #1935 # Are these changes tested? Added a test! # References Java `deletedDataFiles` impl: https://github.com/apache/iceberg/blob/3a29199e73f2e9ae0f8f92a1a0732a338c66aa0d/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L678 Java `ManifestGroup.entries` impl: https://github.com/apache/iceberg/blob/3a29199e73f2e9ae0f8f92a1a0732a338c66aa0d/core/src/main/java/org/apache/iceberg/ManifestGroup.java#L242 --------- Co-authored-by: Fokko Driesprong <fokko@apache.org>
1 parent a149018 commit e98f685

File tree

2 files changed

+166
-4
lines changed

2 files changed

+166
-4
lines changed

pyiceberg/table/update/validate.py

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,17 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17+
from typing import Iterator, Optional
1718

1819
from pyiceberg.exceptions import ValidationException
19-
from pyiceberg.manifest import ManifestContent, ManifestFile
20+
from pyiceberg.expressions import BooleanExpression
21+
from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, _InclusiveMetricsEvaluator
22+
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
2023
from pyiceberg.table import Table
2124
from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
25+
from pyiceberg.typedef import Record
26+
27+
VALIDATE_DATA_FILES_EXIST_OPERATIONS = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE}
2228

2329

2430
def validation_history(
@@ -69,3 +75,78 @@ def validation_history(
6975
raise ValidationException("No matching snapshot found.")
7076

7177
return manifests_files, snapshots
78+
79+
80+
def _deleted_data_files(
81+
table: Table,
82+
starting_snapshot: Snapshot,
83+
data_filter: Optional[BooleanExpression],
84+
partition_set: Optional[dict[int, set[Record]]],
85+
parent_snapshot: Optional[Snapshot],
86+
) -> Iterator[ManifestEntry]:
87+
"""Find deleted data files matching a filter since a starting snapshot.
88+
89+
Args:
90+
table: Table to validate
91+
starting_snapshot: Snapshot current at the start of the operation
92+
data_filter: Expression used to find deleted data files
93+
partition_set: dict of {spec_id: set[partition]} to filter on
94+
parent_snapshot: Ending snapshot on the branch being validated
95+
96+
Returns:
97+
List of conflicting manifest-entries
98+
"""
99+
# if there is no current table state, no files have been deleted
100+
if parent_snapshot is None:
101+
return
102+
103+
manifests, snapshot_ids = validation_history(
104+
table,
105+
parent_snapshot,
106+
starting_snapshot,
107+
VALIDATE_DATA_FILES_EXIST_OPERATIONS,
108+
ManifestContent.DATA,
109+
)
110+
111+
if data_filter is not None:
112+
evaluator = _InclusiveMetricsEvaluator(table.schema(), data_filter).eval
113+
114+
for manifest in manifests:
115+
for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False):
116+
if entry.snapshot_id not in snapshot_ids:
117+
continue
118+
119+
if entry.status != ManifestEntryStatus.DELETED:
120+
continue
121+
122+
if data_filter is not None and evaluator(entry.data_file) is ROWS_CANNOT_MATCH:
123+
continue
124+
125+
if partition_set is not None:
126+
spec_id = entry.data_file.spec_id
127+
partition = entry.data_file.partition
128+
if spec_id not in partition_set or partition not in partition_set[spec_id]:
129+
continue
130+
131+
yield entry
132+
133+
134+
def _validate_deleted_data_files(
135+
table: Table,
136+
starting_snapshot: Snapshot,
137+
data_filter: Optional[BooleanExpression],
138+
parent_snapshot: Snapshot,
139+
) -> None:
140+
"""Validate that no files matching a filter have been deleted from the table since a starting snapshot.
141+
142+
Args:
143+
table: Table to validate
144+
starting_snapshot: Snapshot current at the start of the operation
145+
data_filter: Expression used to find deleted data files
146+
parent_snapshot: Ending snapshot on the branch being validated
147+
148+
"""
149+
conflicting_entries = _deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot)
150+
if any(conflicting_entries):
151+
conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries}
152+
raise ValidationException(f"Deleted data files were found matching the filter for snapshots {conflicting_snapshots}!")

tests/table/test_validate.py

Lines changed: 84 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222

2323
from pyiceberg.exceptions import ValidationException
2424
from pyiceberg.io import FileIO
25-
from pyiceberg.manifest import ManifestContent, ManifestFile
25+
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
2626
from pyiceberg.table import Table
27-
from pyiceberg.table.snapshots import Operation, Snapshot
28-
from pyiceberg.table.update.validate import validation_history
27+
from pyiceberg.table.snapshots import Operation, Snapshot, Summary
28+
from pyiceberg.table.update.validate import _deleted_data_files, _validate_deleted_data_files, validation_history
2929

3030

3131
@pytest.fixture
@@ -136,3 +136,84 @@ def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestF
136136
{Operation.APPEND},
137137
ManifestContent.DATA,
138138
)
139+
140+
141+
def test_deleted_data_files(
142+
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
143+
) -> None:
144+
table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests
145+
146+
oldest_snapshot = table.snapshots()[0]
147+
newest_snapshot = cast(Snapshot, table.current_snapshot())
148+
149+
def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]:
150+
"""Mock the manifests method to use the snapshot_id for lookup."""
151+
snapshot_id = self.snapshot_id
152+
if snapshot_id in mock_manifests:
153+
return mock_manifests[snapshot_id]
154+
return []
155+
156+
# every snapshot is an append, so we should get nothing!
157+
with patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect):
158+
result = list(
159+
_deleted_data_files(
160+
table=table,
161+
starting_snapshot=newest_snapshot,
162+
data_filter=None,
163+
parent_snapshot=oldest_snapshot,
164+
partition_set=None,
165+
)
166+
)
167+
168+
assert result == []
169+
170+
# modify second to last snapshot to be a delete
171+
snapshots = table.snapshots()
172+
altered_snapshot = snapshots[-2]
173+
altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=Operation.DELETE)})
174+
snapshots[-2] = altered_snapshot
175+
176+
table.metadata = table.metadata.model_copy(
177+
update={"snapshots": snapshots},
178+
)
179+
180+
my_entry = ManifestEntry.from_args(
181+
status=ManifestEntryStatus.DELETED,
182+
snapshot_id=altered_snapshot.snapshot_id,
183+
)
184+
185+
with (
186+
patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect),
187+
patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", return_value=[my_entry]),
188+
):
189+
result = list(
190+
_deleted_data_files(
191+
table=table,
192+
starting_snapshot=newest_snapshot,
193+
data_filter=None,
194+
parent_snapshot=oldest_snapshot,
195+
partition_set=None,
196+
)
197+
)
198+
199+
assert result == [my_entry]
200+
201+
202+
def test_validate_deleted_data_files_raises_on_conflict(
203+
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
204+
) -> None:
205+
table, _ = table_v2_with_extensive_snapshots_and_manifests
206+
oldest_snapshot = table.snapshots()[0]
207+
newest_snapshot = cast(Snapshot, table.current_snapshot())
208+
209+
class DummyEntry:
210+
snapshot_id = 123
211+
212+
with patch("pyiceberg.table.update.validate._deleted_data_files", return_value=[DummyEntry()]):
213+
with pytest.raises(ValidationException):
214+
_validate_deleted_data_files(
215+
table=table,
216+
starting_snapshot=newest_snapshot,
217+
data_filter=None,
218+
parent_snapshot=oldest_snapshot,
219+
)

0 commit comments

Comments
 (0)