Skip to content

Commit e6f6018

Browse files
authored
feat: validation_history and ancestors_between (#1935)
1 parent 5bdb008 commit e6f6018

File tree

6 files changed

+293
-40
lines changed

6 files changed

+293
-40
lines changed

pyiceberg/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,7 @@ class CommitStateUnknownException(RESTError):
122122

123123
class WaitingForLockException(Exception):
124124
"""Need to wait for a lock, try again."""
125+
126+
127+
class ValidationException(Exception):
128+
"""Raised when validation fails."""

pyiceberg/table/snapshots.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,3 +435,16 @@ def ancestors_of(current_snapshot: Optional[Snapshot], table_metadata: TableMeta
435435
if snapshot.parent_snapshot_id is None:
436436
break
437437
snapshot = table_metadata.snapshot_by_id(snapshot.parent_snapshot_id)
438+
439+
440+
def ancestors_between(
441+
to_snapshot: Snapshot, from_snapshot: Optional[Snapshot], table_metadata: TableMetadata
442+
) -> Iterable[Snapshot]:
443+
"""Get the ancestors of and including the given snapshot between the to and from snapshots."""
444+
if from_snapshot is not None:
445+
for snapshot in ancestors_of(to_snapshot, table_metadata):
446+
yield snapshot
447+
if snapshot == from_snapshot:
448+
break
449+
else:
450+
yield from ancestors_of(to_snapshot, table_metadata)

pyiceberg/table/update/validate.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from pyiceberg.exceptions import ValidationException
19+
from pyiceberg.manifest import ManifestContent, ManifestFile
20+
from pyiceberg.table import Table
21+
from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
22+
23+
24+
def validation_history(
25+
table: Table,
26+
to_snapshot: Snapshot,
27+
from_snapshot: Snapshot,
28+
matching_operations: set[Operation],
29+
manifest_content_filter: ManifestContent,
30+
) -> tuple[list[ManifestFile], set[int]]:
31+
"""Return newly added manifests and snapshot IDs between the starting snapshot and parent snapshot.
32+
33+
Args:
34+
table: Table to get the history from
35+
to_snapshot: Starting snapshot
36+
from_snapshot: Parent snapshot to get the history from
37+
matching_operations: Operations to match on
38+
manifest_content_filter: Manifest content type to filter
39+
40+
Raises:
41+
ValidationException: If no matching snapshot is found or only one snapshot is found
42+
43+
Returns:
44+
List of manifest files and set of snapshots ID's matching conditions
45+
"""
46+
manifests_files: list[ManifestFile] = []
47+
snapshots: set[int] = set()
48+
49+
last_snapshot = None
50+
for snapshot in ancestors_between(to_snapshot, from_snapshot, table.metadata):
51+
last_snapshot = snapshot
52+
summary = snapshot.summary
53+
if summary is None:
54+
raise ValidationException(f"No summary found for snapshot {snapshot}!")
55+
if summary.operation not in matching_operations:
56+
continue
57+
58+
snapshots.add(snapshot.snapshot_id)
59+
# TODO: Maybe do the IO in a separate thread at some point, and collect at the bottom (we can easily merge the sets
60+
manifests_files.extend(
61+
[
62+
manifest
63+
for manifest in snapshot.manifests(table.io)
64+
if manifest.added_snapshot_id == snapshot.snapshot_id and manifest.content == manifest_content_filter
65+
]
66+
)
67+
68+
if last_snapshot is not None and last_snapshot.snapshot_id != from_snapshot.snapshot_id:
69+
raise ValidationException("No matching snapshot found.")
70+
71+
return manifests_files, snapshots

tests/table/test_init.py

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@
5757
Snapshot,
5858
SnapshotLogEntry,
5959
Summary,
60-
ancestors_of,
6160
)
6261
from pyiceberg.table.sorting import (
6362
NullOrder,
@@ -225,44 +224,6 @@ def test_snapshot_by_timestamp(table_v2: Table) -> None:
225224
assert table_v2.snapshot_as_of_timestamp(1515100955770, inclusive=False) is None
226225

227226

228-
def test_ancestors_of(table_v2: Table) -> None:
229-
assert list(ancestors_of(table_v2.current_snapshot(), table_v2.metadata)) == [
230-
Snapshot(
231-
snapshot_id=3055729675574597004,
232-
parent_snapshot_id=3051729675574597004,
233-
sequence_number=1,
234-
timestamp_ms=1555100955770,
235-
manifest_list="s3://a/b/2.avro",
236-
summary=Summary(Operation.APPEND),
237-
schema_id=1,
238-
),
239-
Snapshot(
240-
snapshot_id=3051729675574597004,
241-
parent_snapshot_id=None,
242-
sequence_number=0,
243-
timestamp_ms=1515100955770,
244-
manifest_list="s3://a/b/1.avro",
245-
summary=Summary(Operation.APPEND),
246-
schema_id=None,
247-
),
248-
]
249-
250-
251-
def test_ancestors_of_recursive_error(table_v2_with_extensive_snapshots: Table) -> None:
252-
# Test RecursionError: maximum recursion depth exceeded
253-
assert (
254-
len(
255-
list(
256-
ancestors_of(
257-
table_v2_with_extensive_snapshots.current_snapshot(),
258-
table_v2_with_extensive_snapshots.metadata,
259-
)
260-
)
261-
)
262-
== 2000
263-
)
264-
265-
266227
def test_snapshot_by_id_does_not_exist(table_v2: Table) -> None:
267228
assert table_v2.snapshot_by_id(-1) is None
268229

tests/table/test_snapshots.py

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,23 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
# pylint:disable=redefined-outer-name,eval-used
18+
from typing import cast
19+
1820
import pytest
1921

2022
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile
2123
from pyiceberg.partitioning import PartitionField, PartitionSpec
2224
from pyiceberg.schema import Schema
23-
from pyiceberg.table.snapshots import Operation, Snapshot, SnapshotSummaryCollector, Summary, update_snapshot_summaries
25+
from pyiceberg.table import Table
26+
from pyiceberg.table.snapshots import (
27+
Operation,
28+
Snapshot,
29+
SnapshotSummaryCollector,
30+
Summary,
31+
ancestors_between,
32+
ancestors_of,
33+
update_snapshot_summaries,
34+
)
2435
from pyiceberg.transforms import IdentityTransform
2536
from pyiceberg.typedef import Record
2637
from pyiceberg.types import (
@@ -368,3 +379,58 @@ def test_invalid_type() -> None:
368379
)
369380

370381
assert "Could not parse summary property total-data-files to an int: abc" in str(e.value)
382+
383+
384+
def test_ancestors_of(table_v2: Table) -> None:
385+
assert list(ancestors_of(table_v2.current_snapshot(), table_v2.metadata)) == [
386+
Snapshot(
387+
snapshot_id=3055729675574597004,
388+
parent_snapshot_id=3051729675574597004,
389+
sequence_number=1,
390+
timestamp_ms=1555100955770,
391+
manifest_list="s3://a/b/2.avro",
392+
summary=Summary(Operation.APPEND),
393+
schema_id=1,
394+
),
395+
Snapshot(
396+
snapshot_id=3051729675574597004,
397+
parent_snapshot_id=None,
398+
sequence_number=0,
399+
timestamp_ms=1515100955770,
400+
manifest_list="s3://a/b/1.avro",
401+
summary=Summary(Operation.APPEND),
402+
schema_id=None,
403+
),
404+
]
405+
406+
407+
def test_ancestors_of_recursive_error(table_v2_with_extensive_snapshots: Table) -> None:
408+
# Test RecursionError: maximum recursion depth exceeded
409+
assert (
410+
len(
411+
list(
412+
ancestors_of(
413+
table_v2_with_extensive_snapshots.current_snapshot(),
414+
table_v2_with_extensive_snapshots.metadata,
415+
)
416+
)
417+
)
418+
== 2000
419+
)
420+
421+
422+
def test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None:
423+
oldest_snapshot = table_v2_with_extensive_snapshots.snapshots()[0]
424+
current_snapshot = cast(Snapshot, table_v2_with_extensive_snapshots.current_snapshot())
425+
assert (
426+
len(
427+
list(
428+
ancestors_between(
429+
current_snapshot,
430+
oldest_snapshot,
431+
table_v2_with_extensive_snapshots.metadata,
432+
)
433+
)
434+
)
435+
== 2000
436+
)

tests/table/test_validate.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
# pylint:disable=redefined-outer-name,eval-used
18+
from typing import cast
19+
from unittest.mock import patch
20+
21+
import pytest
22+
23+
from pyiceberg.exceptions import ValidationException
24+
from pyiceberg.io import FileIO
25+
from pyiceberg.manifest import ManifestContent, ManifestFile
26+
from pyiceberg.table import Table
27+
from pyiceberg.table.snapshots import Operation, Snapshot
28+
from pyiceberg.table.update.validate import validation_history
29+
30+
31+
@pytest.fixture
32+
def table_v2_with_extensive_snapshots_and_manifests(
33+
table_v2_with_extensive_snapshots: Table,
34+
) -> tuple[Table, dict[int, list[ManifestFile]]]:
35+
"""Fixture to create a table with extensive snapshots and manifests."""
36+
mock_manifests = {}
37+
38+
for i, snapshot in enumerate(table_v2_with_extensive_snapshots.snapshots()):
39+
mock_manifest = ManifestFile.from_args(
40+
manifest_path=f"foo/bar/{i}",
41+
manifest_length=1,
42+
partition_spec_id=1,
43+
content=ManifestContent.DATA if i % 2 == 0 else ManifestContent.DELETES,
44+
sequence_number=1,
45+
min_sequence_number=1,
46+
added_snapshot_id=snapshot.snapshot_id,
47+
)
48+
49+
# Store the manifest for this specific snapshot
50+
mock_manifests[snapshot.snapshot_id] = [mock_manifest]
51+
52+
return table_v2_with_extensive_snapshots, mock_manifests
53+
54+
55+
def test_validation_history(table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]]) -> None:
56+
"""Test the validation history function."""
57+
table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests
58+
59+
expected_manifest_data_counts = len([m for m in mock_manifests.values() if m[0].content == ManifestContent.DATA])
60+
61+
oldest_snapshot = table.snapshots()[0]
62+
newest_snapshot = cast(Snapshot, table.current_snapshot())
63+
64+
def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]:
65+
"""Mock the manifests method to use the snapshot_id for lookup."""
66+
snapshot_id = self.snapshot_id
67+
if snapshot_id in mock_manifests:
68+
return mock_manifests[snapshot_id]
69+
return []
70+
71+
with patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect):
72+
manifests, snapshots = validation_history(
73+
table,
74+
newest_snapshot,
75+
oldest_snapshot,
76+
{Operation.APPEND},
77+
ManifestContent.DATA,
78+
)
79+
80+
assert len(manifests) == expected_manifest_data_counts
81+
82+
83+
def test_validation_history_fails_on_snapshot_with_no_summary(
84+
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
85+
) -> None:
86+
"""Test the validation history function fails on snapshot with no summary."""
87+
table, _ = table_v2_with_extensive_snapshots_and_manifests
88+
oldest_snapshot = table.snapshots()[0]
89+
newest_snapshot = cast(Snapshot, table.current_snapshot())
90+
91+
# Create a snapshot with no summary
92+
snapshot_with_no_summary = Snapshot(
93+
snapshot_id="1234",
94+
parent_id="5678",
95+
timestamp_ms=0,
96+
operation=Operation.APPEND,
97+
summary=None,
98+
manifest_list="foo/bar",
99+
)
100+
with patch("pyiceberg.table.update.validate.ancestors_between", return_value=[snapshot_with_no_summary]):
101+
with pytest.raises(ValidationException):
102+
validation_history(
103+
table,
104+
newest_snapshot,
105+
oldest_snapshot,
106+
{Operation.APPEND},
107+
ManifestContent.DATA,
108+
)
109+
110+
111+
def test_validation_history_fails_on_from_snapshot_not_matching_last_snapshot(
112+
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
113+
) -> None:
114+
"""Test the validation history function fails when from_snapshot doesn't match last_snapshot."""
115+
table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests
116+
117+
oldest_snapshot = table.snapshots()[0]
118+
newest_snapshot = cast(Snapshot, table.current_snapshot())
119+
120+
def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]:
121+
"""Mock the manifests method to use the snapshot_id for lookup."""
122+
snapshot_id = self.snapshot_id
123+
if snapshot_id in mock_manifests:
124+
return mock_manifests[snapshot_id]
125+
return []
126+
127+
missing_oldest_snapshot = table.snapshots()[1:]
128+
129+
with patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect):
130+
with patch("pyiceberg.table.update.validate.ancestors_between", return_value=missing_oldest_snapshot):
131+
with pytest.raises(ValidationException):
132+
validation_history(
133+
table,
134+
newest_snapshot,
135+
oldest_snapshot,
136+
{Operation.APPEND},
137+
ManifestContent.DATA,
138+
)

0 commit comments

Comments
 (0)