Skip to content

Commit e8be5ab

Browse files
committed
move to snapshot
1 parent 3797e9e commit e8be5ab

File tree

3 files changed

+43
-75
lines changed

3 files changed

+43
-75
lines changed

pyiceberg/manifest.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -551,10 +551,7 @@ def __init__(self, *data: Any, **named_data: Any) -> None:
551551

552552
def __eq__(self, other: Any) -> bool:
553553
"""Return the equality of two instances of the ManifestFile class."""
554-
if not isinstance(other, ManifestFile):
555-
return False
556-
else:
557-
return self.manifest_path == other.manifest_path
554+
return self.manifest_path == other.manifest_path if isinstance(other, ManifestFile) else False
558555

559556
def __hash__(self) -> int:
560557
"""Return the hash of manifest_path."""

pyiceberg/table/__init__.py

Lines changed: 17 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@
113113
SnapshotLogEntry,
114114
SnapshotSummaryCollector,
115115
Summary,
116+
ancestors_between,
117+
is_parent_ancestor_of,
116118
update_snapshot_summaries,
117119
)
118120
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -1703,10 +1705,6 @@ def snapshot(self) -> Optional[Snapshot]:
17031705
return self.table_metadata.snapshot_by_id(self.snapshot_id)
17041706
return self.table_metadata.current_snapshot()
17051707

1706-
def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
1707-
spec = self.table_metadata.specs()[spec_id]
1708-
return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive)
1709-
17101708
def projection(self) -> Schema:
17111709
current_schema = self.table_metadata.schema()
17121710
if self.snapshot_id is not None:
@@ -1727,41 +1725,6 @@ def projection(self) -> Schema:
17271725

17281726
return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive)
17291727

1730-
def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
1731-
spec = self.table_metadata.specs()[spec_id]
1732-
partition_type = spec.partition_type(self.table_metadata.schema())
1733-
partition_schema = Schema(*partition_type.fields)
1734-
partition_expr = self.partition_filters[spec_id]
1735-
1736-
# The lambda created here is run in multiple threads.
1737-
# So we avoid creating _EvaluatorExpression methods bound to a single
1738-
# shared instance across multiple threads.
1739-
return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition)
1740-
1741-
def _check_sequence_number(self, min_data_sequence_number: int, manifest: ManifestFile) -> bool:
1742-
"""Ensure that no manifests are loaded that contain deletes that are older than the data.
1743-
1744-
Args:
1745-
min_data_sequence_number (int): The minimal sequence number.
1746-
manifest (ManifestFile): A ManifestFile that can be either data or deletes.
1747-
1748-
Returns:
1749-
Boolean indicating if it is either a data file, or a relevant delete file.
1750-
"""
1751-
return manifest.content == ManifestContent.DATA or (
1752-
# Not interested in deletes that are older than the data
1753-
manifest.content == ManifestContent.DELETES
1754-
and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_data_sequence_number
1755-
)
1756-
1757-
def use_ref(self: S, name: str) -> S:
1758-
if self.snapshot_id: # type: ignore
1759-
raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}") # type: ignore
1760-
if snapshot := self.table_metadata.snapshot_by_name(name):
1761-
return self.update(snapshot_id=snapshot.snapshot_id)
1762-
1763-
raise ValueError(f"Cannot scan unknown ref={name}")
1764-
17651728
def plan_files(self) -> Iterable[FileScanTask]:
17661729
"""Plans the relevant files by filtering on the PartitionSpecs.
17671730
@@ -1849,6 +1812,14 @@ def to_arrow(self) -> pa.Table:
18491812
def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
18501813
return self.to_arrow().to_pandas(**kwargs)
18511814

1815+
def use_ref(self: S, name: str) -> S:
1816+
if self.snapshot_id: # type: ignore
1817+
raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}") # type: ignore
1818+
if snapshot := self.table_metadata.snapshot_by_name(name):
1819+
return self.update(snapshot_id=snapshot.snapshot_id)
1820+
1821+
raise ValueError(f"Cannot scan unknown ref={name}")
1822+
18521823
def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
18531824
import duckdb
18541825

@@ -1864,6 +1835,13 @@ def to_ray(self) -> ray.data.dataset.Dataset:
18641835

18651836

18661837
class BaseIncrementalScan(TableScan):
1838+
"""Base class for incremental scans.
1839+
1840+
Args:
1841+
to_snapshot_id: The end snapshot ID (inclusive).
1842+
from_snapshot_id_exclusive: The start snapshot ID (exclusive).
1843+
"""
1844+
18671845
to_snapshot_id: Optional[int]
18681846
from_snapshot_id_exclusive: Optional[int]
18691847

@@ -4015,35 +3993,3 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T
40153993
table_partitions: List[TablePartition] = _get_table_partitions(arrow_table, spec, schema, slice_instructions)
40163994

40173995
return table_partitions
4018-
4019-
4020-
def ancestors_between(to_snapshot: int, from_snapshot: Optional[int], table_metadata: TableMetadata) -> Iterable[Snapshot]:
4021-
if from_snapshot is not None:
4022-
for snapshot in ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata): # type: ignore
4023-
if snapshot.snapshot_id == from_snapshot:
4024-
break
4025-
yield snapshot
4026-
else:
4027-
yield from ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata) # type: ignore
4028-
4029-
4030-
def is_parent_ancestor_of(snapshot_id: int, ancestor_parent_snapshot_id: int, table_metadata: TableMetadata) -> bool:
4031-
for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore
4032-
if snapshot.parent_snapshot_id and snapshot.parent_snapshot_id == ancestor_parent_snapshot_id:
4033-
return True
4034-
return False
4035-
4036-
4037-
def oldest_ancestor_of(snapshot_id: int, table_metadata: TableMetadata) -> Optional[int]:
4038-
last_snapshot = None
4039-
for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore
4040-
last_snapshot = snapshot.snapshot_id
4041-
return last_snapshot
4042-
4043-
4044-
def ancestors_of(latest_snapshot: Snapshot, table_metadata: TableMetadata) -> Iterable[Snapshot]:
4045-
if latest_snapshot:
4046-
yield latest_snapshot
4047-
if latest_snapshot.parent_snapshot_id:
4048-
if parent := table_metadata.snapshot_by_id(latest_snapshot.parent_snapshot_id):
4049-
yield from ancestors_of(parent, table_metadata)

pyiceberg/table/snapshots.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,3 +426,28 @@ def ancestors_of(current_snapshot: Optional[Snapshot], table_metadata: TableMeta
426426
if current_snapshot.parent_snapshot_id is not None:
427427
if parent := table_metadata.snapshot_by_id(current_snapshot.parent_snapshot_id):
428428
yield from ancestors_of(parent, table_metadata)
429+
430+
431+
def ancestors_between(to_snapshot: int, from_snapshot: Optional[int], table_metadata: TableMetadata) -> Iterable[Snapshot]:
432+
if from_snapshot is not None:
433+
for snapshot in ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata): # type: ignore
434+
if snapshot.snapshot_id == from_snapshot:
435+
break
436+
yield snapshot
437+
else:
438+
yield from ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata) # type: ignore
439+
440+
441+
def is_parent_ancestor_of(snapshot_id: int, ancestor_parent_snapshot_id: int, table_metadata: TableMetadata) -> bool:
442+
for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore
443+
if snapshot.parent_snapshot_id and snapshot.parent_snapshot_id == ancestor_parent_snapshot_id:
444+
return True
445+
return False
446+
447+
448+
def oldest_ancestor_of(snapshot_id: int, table_metadata: TableMetadata) -> Optional[int]:
449+
last_snapshot = None
450+
for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore
451+
last_snapshot = snapshot.snapshot_id
452+
return last_snapshot
453+

0 commit comments

Comments
 (0)