Skip to content

Commit 0066b3b

Browse files
committed
move to snapshot
1 parent 9ade073 commit 0066b3b

File tree

3 files changed

+43
-75
lines changed

3 files changed

+43
-75
lines changed

pyiceberg/manifest.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -553,10 +553,7 @@ def __init__(self, *data: Any, **named_data: Any) -> None:
553553

554554
def __eq__(self, other: Any) -> bool:
555555
"""Return the equality of two instances of the ManifestFile class."""
556-
if not isinstance(other, ManifestFile):
557-
return False
558-
else:
559-
return self.manifest_path == other.manifest_path
556+
return self.manifest_path == other.manifest_path if isinstance(other, ManifestFile) else False
560557

561558
def __hash__(self) -> int:
562559
"""Return the hash of manifest_path."""

pyiceberg/table/__init__.py

Lines changed: 17 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,9 @@
113113
SnapshotLogEntry,
114114
SnapshotSummaryCollector,
115115
Summary,
116+
ancestors_between,
116117
ancestors_of,
118+
is_parent_ancestor_of,
117119
update_snapshot_summaries,
118120
)
119121
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -1807,10 +1809,6 @@ def snapshot(self) -> Optional[Snapshot]:
18071809
return self.table_metadata.snapshot_by_id(self.snapshot_id)
18081810
return self.table_metadata.current_snapshot()
18091811

1810-
def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
1811-
spec = self.table_metadata.specs()[spec_id]
1812-
return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive)
1813-
18141812
def projection(self) -> Schema:
18151813
current_schema = self.table_metadata.schema()
18161814
if self.snapshot_id is not None:
@@ -1831,41 +1829,6 @@ def projection(self) -> Schema:
18311829

18321830
return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive)
18331831

1834-
def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
1835-
spec = self.table_metadata.specs()[spec_id]
1836-
partition_type = spec.partition_type(self.table_metadata.schema())
1837-
partition_schema = Schema(*partition_type.fields)
1838-
partition_expr = self.partition_filters[spec_id]
1839-
1840-
# The lambda created here is run in multiple threads.
1841-
# So we avoid creating _EvaluatorExpression methods bound to a single
1842-
# shared instance across multiple threads.
1843-
return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition)
1844-
1845-
def _check_sequence_number(self, min_data_sequence_number: int, manifest: ManifestFile) -> bool:
1846-
"""Ensure that no manifests are loaded that contain deletes that are older than the data.
1847-
1848-
Args:
1849-
min_data_sequence_number (int): The minimal sequence number.
1850-
manifest (ManifestFile): A ManifestFile that can be either data or deletes.
1851-
1852-
Returns:
1853-
Boolean indicating if it is either a data file, or a relevant delete file.
1854-
"""
1855-
return manifest.content == ManifestContent.DATA or (
1856-
# Not interested in deletes that are older than the data
1857-
manifest.content == ManifestContent.DELETES
1858-
and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_data_sequence_number
1859-
)
1860-
1861-
def use_ref(self: S, name: str) -> S:
1862-
if self.snapshot_id: # type: ignore
1863-
raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}") # type: ignore
1864-
if snapshot := self.table_metadata.snapshot_by_name(name):
1865-
return self.update(snapshot_id=snapshot.snapshot_id)
1866-
1867-
raise ValueError(f"Cannot scan unknown ref={name}")
1868-
18691832
def plan_files(self) -> Iterable[FileScanTask]:
18701833
"""Plans the relevant files by filtering on the PartitionSpecs.
18711834
@@ -1971,6 +1934,14 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
19711934
def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
19721935
return self.to_arrow().to_pandas(**kwargs)
19731936

1937+
def use_ref(self: S, name: str) -> S:
1938+
if self.snapshot_id: # type: ignore
1939+
raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}") # type: ignore
1940+
if snapshot := self.table_metadata.snapshot_by_name(name):
1941+
return self.update(snapshot_id=snapshot.snapshot_id)
1942+
1943+
raise ValueError(f"Cannot scan unknown ref={name}")
1944+
19741945
def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
19751946
import duckdb
19761947

@@ -1986,6 +1957,13 @@ def to_ray(self) -> ray.data.dataset.Dataset:
19861957

19871958

19881959
class BaseIncrementalScan(TableScan):
1960+
"""Base class for incremental scans.
1961+
1962+
Args:
1963+
to_snapshot_id: The end snapshot ID (inclusive).
1964+
from_snapshot_id_exclusive: The start snapshot ID (exclusive).
1965+
"""
1966+
19891967
to_snapshot_id: Optional[int]
19901968
from_snapshot_id_exclusive: Optional[int]
19911969

@@ -4273,35 +4251,3 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T
42734251
table_partitions: List[TablePartition] = _get_table_partitions(arrow_table, spec, schema, slice_instructions)
42744252

42754253
return table_partitions
4276-
4277-
4278-
def ancestors_between(to_snapshot: int, from_snapshot: Optional[int], table_metadata: TableMetadata) -> Iterable[Snapshot]:
4279-
if from_snapshot is not None:
4280-
for snapshot in ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata): # type: ignore
4281-
if snapshot.snapshot_id == from_snapshot:
4282-
break
4283-
yield snapshot
4284-
else:
4285-
yield from ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata) # type: ignore
4286-
4287-
4288-
def is_parent_ancestor_of(snapshot_id: int, ancestor_parent_snapshot_id: int, table_metadata: TableMetadata) -> bool:
4289-
for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore
4290-
if snapshot.parent_snapshot_id and snapshot.parent_snapshot_id == ancestor_parent_snapshot_id:
4291-
return True
4292-
return False
4293-
4294-
4295-
def oldest_ancestor_of(snapshot_id: int, table_metadata: TableMetadata) -> Optional[int]:
4296-
last_snapshot = None
4297-
for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore
4298-
last_snapshot = snapshot.snapshot_id
4299-
return last_snapshot
4300-
4301-
4302-
def ancestors_of(latest_snapshot: Snapshot, table_metadata: TableMetadata) -> Iterable[Snapshot]:
4303-
if latest_snapshot:
4304-
yield latest_snapshot
4305-
if latest_snapshot.parent_snapshot_id:
4306-
if parent := table_metadata.snapshot_by_id(latest_snapshot.parent_snapshot_id):
4307-
yield from ancestors_of(parent, table_metadata)

pyiceberg/table/snapshots.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,3 +429,28 @@ def ancestors_of(current_snapshot: Optional[Snapshot], table_metadata: TableMeta
429429
if snapshot.parent_snapshot_id is None:
430430
break
431431
snapshot = table_metadata.snapshot_by_id(snapshot.parent_snapshot_id)
432+
433+
434+
def ancestors_between(to_snapshot: int, from_snapshot: Optional[int], table_metadata: TableMetadata) -> Iterable[Snapshot]:
435+
if from_snapshot is not None:
436+
for snapshot in ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata): # type: ignore
437+
if snapshot.snapshot_id == from_snapshot:
438+
break
439+
yield snapshot
440+
else:
441+
yield from ancestors_of(table_metadata.snapshot_by_id(to_snapshot), table_metadata) # type: ignore
442+
443+
444+
def is_parent_ancestor_of(snapshot_id: int, ancestor_parent_snapshot_id: int, table_metadata: TableMetadata) -> bool:
445+
for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore
446+
if snapshot.parent_snapshot_id and snapshot.parent_snapshot_id == ancestor_parent_snapshot_id:
447+
return True
448+
return False
449+
450+
451+
def oldest_ancestor_of(snapshot_id: int, table_metadata: TableMetadata) -> Optional[int]:
452+
last_snapshot = None
453+
for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): # type: ignore
454+
last_snapshot = snapshot.snapshot_id
455+
return last_snapshot
456+

0 commit comments

Comments
 (0)