Skip to content

validate added data files for snapshot compatibility #2050

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 106 additions & 21 deletions pyiceberg/table/update/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Iterator, Optional
from typing import Iterator, Optional, Set

from pyiceberg.exceptions import ValidationException
from pyiceberg.expressions import BooleanExpression
from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, _InclusiveMetricsEvaluator
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
from pyiceberg.typedef import Record

VALIDATE_DATA_FILES_EXIST_OPERATIONS = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE}
VALIDATE_DATA_FILES_EXIST_OPERATIONS: Set[Operation] = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE}
VALIDATE_ADDED_DATA_FILES_OPERATIONS: Set[Operation] = {Operation.APPEND, Operation.OVERWRITE}


def _validation_history(
Expand Down Expand Up @@ -77,6 +79,47 @@ def _validation_history(
return manifests_files, snapshots


def _filter_manifest_entries(
entry: ManifestEntry,
snapshot_ids: set[int],
data_filter: Optional[BooleanExpression],
partition_set: Optional[dict[int, set[Record]]],
entry_status: Optional[ManifestEntryStatus],
schema: Schema,
) -> bool:
"""Filter manifest entries based on data filter and partition set.

Args:
entry: Manifest entry to filter
snapshot_ids: set of snapshot ids to match data files
data_filter: Optional filter to match data files
partition_set: Optional set of partitions to match data files
status: Optional status to match data files
table: Table containing the schema for filtering

Returns:
True if the entry should be included, False otherwise
"""
if entry.snapshot_id not in snapshot_ids:
return False

if entry_status is not None and entry.status != entry_status:
return False

if data_filter is not None:
evaluator = _InclusiveMetricsEvaluator(schema, data_filter)
if evaluator.eval(entry.data_file) is ROWS_CANNOT_MATCH:
return False

if partition_set is not None:
partition = entry.data_file.partition
spec_id = entry.data_file.spec_id
if spec_id not in partition_set or partition not in partition_set[spec_id]:
return False

return True


def _deleted_data_files(
table: Table,
starting_snapshot: Snapshot,
Expand Down Expand Up @@ -108,27 +151,12 @@ def _deleted_data_files(
ManifestContent.DATA,
)

if data_filter is not None:
evaluator = _InclusiveMetricsEvaluator(table.schema(), data_filter).eval

for manifest in manifests:
for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False):
if entry.snapshot_id not in snapshot_ids:
continue

if entry.status != ManifestEntryStatus.DELETED:
continue

if data_filter is not None and evaluator(entry.data_file) is ROWS_CANNOT_MATCH:
continue

if partition_set is not None:
spec_id = entry.data_file.spec_id
partition = entry.data_file.partition
if spec_id not in partition_set or partition not in partition_set[spec_id]:
continue

yield entry
if _filter_manifest_entries(
entry, snapshot_ids, data_filter, partition_set, ManifestEntryStatus.DELETED, table.schema()
):
yield entry


def _validate_deleted_data_files(
Expand All @@ -150,3 +178,60 @@ def _validate_deleted_data_files(
if any(conflicting_entries):
conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries}
raise ValidationException(f"Deleted data files were found matching the filter for snapshots {conflicting_snapshots}!")


def _added_data_files(
table: Table,
starting_snapshot: Snapshot,
data_filter: Optional[BooleanExpression],
partition_set: Optional[dict[int, set[Record]]],
parent_snapshot: Optional[Snapshot],
) -> Iterator[ManifestEntry]:
"""Return manifest entries for data files added between the starting snapshot and parent snapshot.

Args:
table: Table to get the history from
starting_snapshot: Starting snapshot to get the history from
data_filter: Optional filter to match data files
partition_set: Optional set of partitions to match data files
parent_snapshot: Parent snapshot to get the history from

Returns:
Iterator of manifest entries for added data files matching the conditions
"""
if parent_snapshot is None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we introduce a doc string for each function?

return

manifests, snapshot_ids = validation_history(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
manifests, snapshot_ids = validation_history(
manifests, snapshot_ids = _validation_history(

This was made private here #2054

table,
parent_snapshot,
starting_snapshot,
VALIDATE_ADDED_DATA_FILES_OPERATIONS,
ManifestContent.DATA,
)

for manifest in manifests:
for entry in manifest.fetch_manifest_entry(table.io):
if _filter_manifest_entries(entry, snapshot_ids, data_filter, partition_set, None, table.schema()):
yield entry


def _validate_added_data_files(
table: Table,
starting_snapshot: Snapshot,
data_filter: Optional[BooleanExpression],
parent_snapshot: Optional[Snapshot],
) -> None:
"""Validate that no files matching a filter have been added to the table since a starting snapshot.

Args:
table: Table to validate
starting_snapshot: Snapshot current at the start of the operation
data_filter: Expression used to find added data files
parent_snapshot: Ending snapshot on the branch being validated

"""
conflicting_entries = _added_data_files(table, starting_snapshot, data_filter, None, parent_snapshot)
if any(conflicting_entries):
conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries if entry.snapshot_id is not None}
raise ValidationException(f"Added data files were found matching the filter for snapshots {conflicting_snapshots}!")
135 changes: 134 additions & 1 deletion tests/table/test_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
from pyiceberg.table import Table
from pyiceberg.table.snapshots import Operation, Snapshot, Summary
from pyiceberg.table.update.validate import _deleted_data_files, _validate_deleted_data_files, _validation_history
from pyiceberg.table.update.validate import (
_added_data_files,
_deleted_data_files,
_validate_added_data_files,
_validate_deleted_data_files,
_validation_history,
)


@pytest.fixture
Expand Down Expand Up @@ -217,3 +223,130 @@ class DummyEntry:
data_filter=None,
parent_snapshot=oldest_snapshot,
)


@pytest.mark.parametrize("operation", [Operation.APPEND, Operation.OVERWRITE])
def test_validate_added_data_files_conflicting_count(
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
operation: Operation,
) -> None:
table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests

snapshot_history = 100
snapshots = table.snapshots()
for i in range(1, snapshot_history + 1):
altered_snapshot = snapshots[-i]
altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)})
snapshots[-i] = altered_snapshot

table.metadata = table.metadata.model_copy(
update={"snapshots": snapshots},
)

oldest_snapshot = table.snapshots()[-snapshot_history]
newest_snapshot = cast(Snapshot, table.current_snapshot())

def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]:
"""Mock the manifests method to use the snapshot_id for lookup."""
snapshot_id = self.snapshot_id
if snapshot_id in mock_manifests:
return mock_manifests[snapshot_id]
return []

def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]:
return [
ManifestEntry.from_args(
status=ManifestEntryStatus.ADDED,
snapshot_id=self.added_snapshot_id,
)
]

with (
patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect),
patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry),
):
result = list(
_added_data_files(
table=table,
starting_snapshot=newest_snapshot,
data_filter=None,
parent_snapshot=oldest_snapshot,
partition_set=None,
)
)

# since we only look at the ManifestContent.Data files
assert len(result) == snapshot_history / 2


@pytest.mark.parametrize("operation", [Operation.DELETE, Operation.REPLACE])
def test_validate_added_data_files_non_conflicting_count(
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
operation: Operation,
) -> None:
table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests

snapshot_history = 100
snapshots = table.snapshots()
for i in range(1, snapshot_history + 1):
altered_snapshot = snapshots[-i]
altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)})
snapshots[-i] = altered_snapshot

table.metadata = table.metadata.model_copy(
update={"snapshots": snapshots},
)

oldest_snapshot = table.snapshots()[-snapshot_history]
newest_snapshot = cast(Snapshot, table.current_snapshot())

def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]:
"""Mock the manifests method to use the snapshot_id for lookup."""
snapshot_id = self.snapshot_id
if snapshot_id in mock_manifests:
return mock_manifests[snapshot_id]
return []

def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]:
return [
ManifestEntry.from_args(
status=ManifestEntryStatus.ADDED,
snapshot_id=self.added_snapshot_id,
)
]

with (
patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect),
patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry),
):
result = list(
_added_data_files(
table=table,
starting_snapshot=newest_snapshot,
data_filter=None,
parent_snapshot=oldest_snapshot,
partition_set=None,
)
)

assert len(result) == 0


def test_validate_added_data_files_raises_on_conflict(
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
) -> None:
table, _ = table_v2_with_extensive_snapshots_and_manifests
oldest_snapshot = table.snapshots()[0]
newest_snapshot = cast(Snapshot, table.current_snapshot())

class DummyEntry:
snapshot_id = 123

with patch("pyiceberg.table.update.validate._added_data_files", return_value=[DummyEntry()]):
with pytest.raises(ValidationException):
_validate_added_data_files(
table=table,
starting_snapshot=newest_snapshot,
data_filter=None,
parent_snapshot=oldest_snapshot,
)
Loading