diff --git a/dev/provision.py b/dev/provision.py index 837189204e..92aa4d8256 100644 --- a/dev/provision.py +++ b/dev/provision.py @@ -413,3 +413,58 @@ ) spark.sql(f"ALTER TABLE {catalog_name}.default.test_empty_scan_ordered_str WRITE ORDERED BY id") spark.sql(f"INSERT INTO {catalog_name}.default.test_empty_scan_ordered_str VALUES 'a', 'c'") + + spark.sql( + f""" + CREATE OR REPLACE TABLE {catalog_name}.default.test_incremental_read ( + dt date, + number integer, + letter string + ) + USING iceberg + TBLPROPERTIES ( + 'format-version'='2' + ); + """ + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_incremental_read + VALUES (CAST('2022-03-01' AS date), 1, 'a') + """ + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_incremental_read + VALUES (CAST('2022-03-01' AS date), 2, 'b') + """ + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_incremental_read + VALUES (CAST('2022-03-02' AS date), 3, 'c'), (CAST('2022-03-02' AS date), 4, 'b') + """ + ) + + spark.sql( + f""" + DELETE FROM {catalog_name}.default.test_incremental_read + WHERE number = 2 + """ + ) + + # https://github.com/apache/iceberg/issues/1092#issuecomment-638432848 / https://github.com/apache/iceberg/issues/3747#issuecomment-1145419407 + # Don't do replace for Hive catalog as REPLACE TABLE requires certain Hive server configuration + if catalog_name != "hive": + # Replace to break snapshot lineage: + spark.sql( + f""" + REPLACE TABLE {catalog_name}.default.test_incremental_read + USING iceberg + TBLPROPERTIES ('format-version'='2') + AS SELECT number, letter FROM {catalog_name}.default.test_incremental_read + """ + ) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 61cb87e3d8..8b6ba4091d 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -717,6 +717,14 @@ def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List if not discard_deleted or entry.status != ManifestEntryStatus.DELETED ] + def __eq__(self, other: Any) -> bool: + """Return the equality of two instances of the ManifestFile class.""" + return self.manifest_path == other.manifest_path if isinstance(other, ManifestFile) else False + + def __hash__(self) -> int: + """Return the hash of manifest_path.""" + return hash(self.manifest_path) + @cached(cache=LRUCache(maxsize=128), key=lambda io, manifest_list: hashkey(manifest_list)) def _manifests(io: FileIO, manifest_list: str) -> Tuple[ManifestFile, ...]: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 78676a774a..9a2ee77e86 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -69,6 +69,7 @@ DataFileContent, ManifestContent, ManifestEntry, + ManifestEntryStatus, ManifestFile, ) from pyiceberg.partitioning import ( @@ -89,8 +90,11 @@ ) from pyiceberg.table.refs import SnapshotRef from pyiceberg.table.snapshots import ( + Operation, Snapshot, SnapshotLogEntry, + ancestors_between_ids, + is_ancestor_of, ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.table.update import ( @@ -1092,6 +1096,60 @@ def scan( limit=limit, ) + def incremental_append_scan( + self, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + from_snapshot_id_exclusive: Optional[int] = None, + to_snapshot_id_inclusive: Optional[int] = None, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + ) -> IncrementalAppendScan: + """Fetch an IncrementalAppendScan based on the table's current metadata. + + The incremental append scan can be used to project the table's data + from append snapshots within a snapshot range and that matches the + provided row_filter onto the table's current schema + + Args: + row_filter: + A string or BooleanExpression that describes the + desired rows + selected_fields: + A tuple of strings representing the column names + to return in the output dataframe. + case_sensitive: + If True column matching is case sensitive + from_snapshot_id_exclusive: + Optional ID of the "from" snapshot, to start the incremental scan from, exclusively. This can be set + on the IncrementalAppendScan object returned, but ultimately must not be None. + to_snapshot_id_inclusive: + Optional ID of the "to" snapshot, to end the incremental scan at, inclusively. This can be set on the + IncrementalAppendScan object returned. Ultimately, it will default to the table's current snapshot. + options: + Additional Table properties as a dictionary of + string key value pairs to use for this scan. + limit: + An integer representing the number of rows to + return in the scan result. If None, fetches all + matching rows. + + Returns: + An IncrementalAppendScan based on the table's current metadata and provided parameters. + """ + return IncrementalAppendScan( + table_metadata=self.metadata, + io=self.io, + row_filter=row_filter, + selected_fields=selected_fields, + case_sensitive=case_sensitive, + from_snapshot_id_exclusive=from_snapshot_id_exclusive, + to_snapshot_id_inclusive=to_snapshot_id_inclusive, + options=options, + limit=limit, + ) + @property def format_version(self) -> TableVersion: return self.metadata.format_version @@ -1507,16 +1565,17 @@ def _parse_row_filter(expr: Union[str, BooleanExpression]) -> BooleanExpression: return parser.parse(expr) if isinstance(expr, str) else expr -S = TypeVar("S", bound="TableScan", covariant=True) +S = TypeVar("S", bound="AbstractTableScan", covariant=True) -class TableScan(ABC): +class AbstractTableScan(ABC): + """A base class for all table scans.""" + table_metadata: TableMetadata io: FileIO row_filter: BooleanExpression selected_fields: Tuple[str, ...] case_sensitive: bool - snapshot_id: Optional[int] options: Properties limit: Optional[int] @@ -1527,7 +1586,6 @@ def __init__( row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, selected_fields: Tuple[str, ...] = ("*",), case_sensitive: bool = True, - snapshot_id: Optional[int] = None, options: Properties = EMPTY_DICT, limit: Optional[int] = None, ): @@ -1536,10 +1594,181 @@ def __init__( self.row_filter = _parse_row_filter(row_filter) self.selected_fields = selected_fields self.case_sensitive = case_sensitive - self.snapshot_id = snapshot_id self.options = options self.limit = limit + @abstractmethod + def projection(self) -> Schema: ... + + @abstractmethod + def plan_files(self) -> Iterable[ScanTask]: ... + + @abstractmethod + def to_arrow(self) -> pa.Table: ... + + @abstractmethod + def count(self) -> int: ... + + def select(self: S, *field_names: str) -> S: + if "*" in self.selected_fields: + return self.update(selected_fields=field_names) + return self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names)))) + + def filter(self: S, expr: Union[str, BooleanExpression]) -> S: + return self.update(row_filter=And(self.row_filter, _parse_row_filter(expr))) + + def with_case_sensitive(self: S, case_sensitive: bool = True) -> S: + return self.update(case_sensitive=case_sensitive) + + def update(self: S, **overrides: Any) -> S: + """Create a copy of this table scan with updated fields.""" + return type(self)(**{**self.__dict__, **overrides}) + + def to_pandas(self, **kwargs: Any) -> pd.DataFrame: + """Read a Pandas DataFrame eagerly from this Iceberg table scan. + + Returns: + pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table scan + """ + return self.to_arrow().to_pandas(**kwargs) + + def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: + """Shorthand for loading this table scan in DuckDB. + + Returns: + DuckDBPyConnection: In memory DuckDB connection with the Iceberg table scan. + """ + import duckdb + + con = connection or duckdb.connect(database=":memory:") + con.register(table_name, self.to_arrow()) + + return con + + def to_ray(self) -> ray.data.dataset.Dataset: + """Read a Ray Dataset eagerly from this Iceberg table scan. + + Returns: + ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table scan + """ + import ray + + return ray.data.from_arrow(self.to_arrow()) + + def to_polars(self) -> pl.DataFrame: + """Read a Polars DataFrame from this Iceberg table scan. + + Returns: + pl.DataFrame: Materialized Polars Dataframe from the Iceberg table scan + """ + import polars as pl + + result = pl.from_arrow(self.to_arrow()) + if isinstance(result, pl.Series): + result = result.to_frame() + + return result + + +class FileBasedScan(AbstractTableScan, ABC): + """A base class for table scans that plan FileScanTasks.""" + + @cached_property + def _manifest_group_planner(self) -> ManifestGroupPlanner: + return ManifestGroupPlanner(self) + + @abstractmethod + def plan_files(self) -> Iterable[FileScanTask]: ... + + def to_arrow(self) -> pa.Table: + """Read an Arrow table eagerly from this scan. + + All rows will be loaded into memory at once. + + Returns: + pa.Table: Materialized Arrow Table from the Iceberg table scan + """ + from pyiceberg.io.pyarrow import ArrowScan + + return ArrowScan( + self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit + ).to_table(self.plan_files()) + + def to_arrow_batch_reader(self) -> pa.RecordBatchReader: + """Return an Arrow RecordBatchReader from this scan. + + For large results, using a RecordBatchReader requires less memory than + loading an Arrow Table for the same DataScan, because a RecordBatch + is read one at a time. + + Returns: + pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table scan + which can be used to read a stream of record batches one by one. + """ + import pyarrow as pa + + from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow + + target_schema = schema_to_pyarrow(self.projection()) + batches = ArrowScan( + self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit + ).to_record_batches(self.plan_files()) + + return pa.RecordBatchReader.from_batches( + target_schema, + batches, + ).cast(target_schema) + + def count(self) -> int: + from pyiceberg.io.pyarrow import ArrowScan + + # Usage: Calculates the total number of records in a Scan that haven't had positional deletes. + res = 0 + # every task is a FileScanTask + tasks = self.plan_files() + + for task in tasks: + # task.residual is a Boolean Expression if the filter condition is fully satisfied by the + # partition value and task.delete_files represents that positional delete haven't been merged yet + # hence those files have to read as a pyarrow table applying the filter and deletes + if task.residual == AlwaysTrue() and len(task.delete_files) == 0: + # Every File has a metadata stat that stores the file record count + res += task.file.record_count + else: + arrow_scan = ArrowScan( + table_metadata=self.table_metadata, + io=self.io, + projected_schema=self.projection(), + row_filter=self.row_filter, + case_sensitive=self.case_sensitive, + ) + tbl = arrow_scan.to_table([task]) + res += len(tbl) + return res + + +T = TypeVar("T", bound="TableScan", covariant=True) + + +class TableScan(AbstractTableScan, ABC): + """A base class for non-incremental table scans that target a single snapshot.""" + + snapshot_id: Optional[int] + + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + snapshot_id: Optional[int] = None, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + ): + super().__init__(table_metadata, io, row_filter, selected_fields, case_sensitive, options, limit) + self.snapshot_id = snapshot_id + def snapshot(self) -> Optional[Snapshot]: if self.snapshot_id: return self.table_metadata.snapshot_by_id(self.snapshot_id) @@ -1565,23 +1794,7 @@ def projection(self) -> Schema: return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) - @abstractmethod - def plan_files(self) -> Iterable[ScanTask]: ... - - @abstractmethod - def to_arrow(self) -> pa.Table: ... - - @abstractmethod - def to_pandas(self, **kwargs: Any) -> pd.DataFrame: ... - - @abstractmethod - def to_polars(self) -> pl.DataFrame: ... - - def update(self: S, **overrides: Any) -> S: - """Create a copy of this table scan with updated fields.""" - return type(self)(**{**self.__dict__, **overrides}) - - def use_ref(self: S, name: str) -> S: + def use_ref(self: T, name: str) -> T: if self.snapshot_id: raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}") if snapshot := self.table_metadata.snapshot_by_name(name): @@ -1589,20 +1802,6 @@ def use_ref(self: S, name: str) -> S: raise ValueError(f"Cannot scan unknown ref={name}") - def select(self: S, *field_names: str) -> S: - if "*" in self.selected_fields: - return self.update(selected_fields=field_names) - return self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names)))) - - def filter(self: S, expr: Union[str, BooleanExpression]) -> S: - return self.update(row_filter=And(self.row_filter, _parse_row_filter(expr))) - - def with_case_sensitive(self: S, case_sensitive: bool = True) -> S: - return self.update(case_sensitive=case_sensitive) - - @abstractmethod - def count(self) -> int: ... - class ScanTask(ABC): pass @@ -1688,102 +1887,250 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent return set() -class DataScan(TableScan): - def _build_partition_projection(self, spec_id: int) -> BooleanExpression: - project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive) - return project(self.row_filter) +class DataScan(FileBasedScan, TableScan): + """A scan of a table's data. - @cached_property + Args: + row_filter: + A string or BooleanExpression that describes the + desired rows + selected_fields: + A tuple of strings representing the column names + to return in the output dataframe. + case_sensitive: + If True column matching is case sensitive + snapshot_id: + Optional Snapshot ID to time travel to. If None, + scans the table as of the current snapshot ID. + options: + Additional Table properties as a dictionary of + string key value pairs to use for this scan. + limit: + An integer representing the number of rows to + return in the scan result. If None, fetches all + matching rows. + """ + + def plan_files(self) -> Iterable[FileScanTask]: + """Plans the relevant files by filtering on the PartitionSpecs. + + Returns: + List of FileScanTasks that contain both data and delete files. + """ + snapshot = self.snapshot() + if not snapshot: + return iter([]) + + return self._manifest_group_planner.plan_files(manifests=snapshot.manifests(self.io)) + + @property def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: - return KeyDefaultDict(self._build_partition_projection) + return self._manifest_group_planner.partition_filters - def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: - spec = self.table_metadata.specs()[spec_id] - return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive) - def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]: - spec = self.table_metadata.specs()[spec_id] - partition_type = spec.partition_type(self.table_metadata.schema()) - partition_schema = Schema(*partition_type.fields) - partition_expr = self.partition_filters[spec_id] +A = TypeVar("A", bound="IncrementalScan", covariant=True) - # The lambda created here is run in multiple threads. - # So we avoid creating _EvaluatorExpression methods bound to a single - # shared instance across multiple threads. - return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition) - def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]: - schema = self.table_metadata.schema() - include_empty_files = strtobool(self.options.get("include_empty_files", "false")) +class IncrementalScan(AbstractTableScan, ABC): + """A base class for incremental scans.""" - # The lambda created here is run in multiple threads. - # So we avoid creating _InclusiveMetricsEvaluator methods bound to a single - # shared instance across multiple threads. - return lambda data_file: _InclusiveMetricsEvaluator( - schema, - self.row_filter, - self.case_sensitive, - include_empty_files, - ).eval(data_file) + from_snapshot_id_exclusive: Optional[int] + to_snapshot_id_inclusive: Optional[int] - def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]: - spec = self.table_metadata.specs()[spec_id] + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + from_snapshot_id_exclusive: Optional[int] = None, + to_snapshot_id_inclusive: Optional[int] = None, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + ): + super().__init__(table_metadata, io, row_filter, selected_fields, case_sensitive, options, limit) + self.from_snapshot_id_exclusive = from_snapshot_id_exclusive + self.to_snapshot_id_inclusive = to_snapshot_id_inclusive - # The lambda created here is run in multiple threads. - # So we avoid creating _EvaluatorExpression methods bound to a single - # shared instance across multiple threads. - # return lambda data_file: (partition_schema, partition_expr, self.case_sensitive)(data_file.partition) - from pyiceberg.expressions.visitors import residual_evaluator_of + def from_snapshot_exclusive(self: A, from_snapshot_id_exclusive: Optional[int]) -> A: + """Instructs this scan to look for changes starting from a particular snapshot (exclusive). - # assert self.row_filter == False - return lambda datafile: ( - residual_evaluator_of( - spec=spec, - expr=self.row_filter, - case_sensitive=self.case_sensitive, - schema=self.table_metadata.schema(), - ) - ) + Args: + from_snapshot_id_exclusive: the start snapshot ID (exclusive) - def _check_sequence_number(self, min_sequence_number: int, manifest: ManifestFile) -> bool: - """Ensure that no manifests are loaded that contain deletes that are older than the data. + Returns: + this for method chaining + """ + return self.update(from_snapshot_id_exclusive=from_snapshot_id_exclusive) + + def to_snapshot_inclusive(self: A, to_snapshot_id_inclusive: Optional[int]) -> A: + """Instructs this scan to look for changes up to a particular snapshot (inclusive). Args: - min_sequence_number (int): The minimal sequence number. - manifest (ManifestFile): A ManifestFile that can be either data or deletes. + to_snapshot_id_inclusive: the end snapshot ID (inclusive) Returns: - Boolean indicating if it is either a data file, or a relevant delete file. + this for method chaining """ - return manifest.content == ManifestContent.DATA or ( - # Not interested in deletes that are older than the data - manifest.content == ManifestContent.DELETES - and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number + return self.update(to_snapshot_id_inclusive=to_snapshot_id_inclusive) + + def projection(self) -> Schema: + current_schema = self.table_metadata.schema() + + if "*" in self.selected_fields: + return current_schema + + return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) + + +class IncrementalAppendScan(IncrementalScan, FileBasedScan): + """An incremental scan of a table's data that accumulates appended data between two snapshots. + + Args: + row_filter: + A string or BooleanExpression that describes the + desired rows + selected_fields: + A tuple of strings representing the column names + to return in the output dataframe. + case_sensitive: + If True column matching is case sensitive + from_snapshot_id_exclusive: + Optional ID of the "from" snapshot, to start the incremental scan from, exclusively. When the scan is + ultimately planned, this must not be None. + to_snapshot_id_inclusive: + Optional ID of the "to" snapshot, to end the incremental scan at, inclusively. + Omitting it will default to the table's current snapshot. + options: + Additional Table properties as a dictionary of + string key value pairs to use for this scan. + limit: + An integer representing the number of rows to + return in the scan result. If None, fetches all + matching rows. + """ + + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + from_snapshot_id_exclusive: Optional[int] = None, + to_snapshot_id_inclusive: Optional[int] = None, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + ): + super().__init__( + table_metadata, + io, + row_filter, + selected_fields, + case_sensitive, + from_snapshot_id_exclusive, + to_snapshot_id_inclusive, + options, + limit, ) def plan_files(self) -> Iterable[FileScanTask]: - """Plans the relevant files by filtering on the PartitionSpecs. + from_snapshot_id, to_snapshot_id = self._validate_and_resolve_snapshots() - Returns: - List of FileScanTasks that contain both data and delete files. - """ - snapshot = self.snapshot() - if not snapshot: + append_snapshots: List[Snapshot] = self._appends_between(from_snapshot_id, to_snapshot_id, self.table_metadata) + if len(append_snapshots) == 0: return iter([]) + append_snapshot_ids: Set[int] = {snapshot.snapshot_id for snapshot in append_snapshots} + + manifests = { + manifest_file + for snapshot in append_snapshots + for manifest_file in snapshot.manifests(self.io) + if manifest_file.content == ManifestContent.DATA and manifest_file.added_snapshot_id in append_snapshot_ids + } + + return self._manifest_group_planner.plan_files( + manifests=list(manifests), + manifest_entry_filter=lambda manifest_entry: manifest_entry.snapshot_id in append_snapshot_ids + and manifest_entry.status == ManifestEntryStatus.ADDED, + ) + + def _validate_and_resolve_snapshots(self) -> tuple[int, int]: + current_snapshot = self.table_metadata.current_snapshot() + to_snapshot_id = self.to_snapshot_id_inclusive + + if self.from_snapshot_id_exclusive is None: + raise ValueError("Start snapshot of append scan unspecified, please set from_snapshot_id_exclusive") + + if to_snapshot_id is None: + if current_snapshot is None: + raise ValueError("End snapshot of append scan is not set and table has no current snapshot") + + to_snapshot_id = current_snapshot.snapshot_id + elif self._is_snapshot_missing(to_snapshot_id): + raise ValueError(f"End snapshot of append scan not found on table metadata: {to_snapshot_id}") + + if self._is_snapshot_missing(self.from_snapshot_id_exclusive): + raise ValueError(f"Start snapshot of append scan not found on table metadata: {self.from_snapshot_id_exclusive}") + + if not is_ancestor_of(to_snapshot_id, self.from_snapshot_id_exclusive, self.table_metadata): + raise ValueError( + f"Append scan's start snapshot {self.from_snapshot_id_exclusive} is not an ancestor of end snapshot {to_snapshot_id}" + ) + + return self.from_snapshot_id_exclusive, to_snapshot_id + + def _is_snapshot_missing(self, snapshot_id: int) -> bool: + """Return whether the snapshot ID is missing in the table metadata.""" + return self.table_metadata.snapshot_by_id(snapshot_id) is None + + @staticmethod + def _appends_between( + from_snapshot_id_exclusive: int, to_snapshot_id_inclusive: int, table_metadata: TableMetadata + ) -> List[Snapshot]: + """Return the list of snapshots that are appends between two snapshot IDs.""" + return [ + snapshot + for snapshot in ancestors_between_ids( + from_snapshot_id_exclusive=from_snapshot_id_exclusive, + to_snapshot_id_inclusive=to_snapshot_id_inclusive, + table_metadata=table_metadata, + ) + if snapshot.summary is not None and snapshot.summary.operation == Operation.APPEND + ] + + +class ManifestGroupPlanner: + io: FileIO + table_metadata: TableMetadata + row_filter: BooleanExpression + case_sensitive: bool + options: Properties + + def __init__(self, scan: AbstractTableScan): + self.io = scan.io + self.table_metadata = scan.table_metadata + self.row_filter = scan.row_filter + self.case_sensitive = scan.case_sensitive + self.options = scan.options + + def plan_files( + self, + manifests: List[ManifestFile], + manifest_entry_filter: Callable[[ManifestEntry], bool] = lambda _: True, + ) -> Iterable[FileScanTask]: # step 1: filter manifests using partition summaries # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) - - residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator) - manifests = [ - manifest_file - for manifest_file in snapshot.manifests(self.io) - if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) + manifest_file for manifest_file in manifests if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) ] + residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator) + # step 2: filter the data files in each manifest # this filter depends on the partition spec used to write the manifest file @@ -1810,6 +2157,9 @@ def plan_files(self) -> Iterable[FileScanTask]: ], ) ): + if not manifest_entry_filter(manifest_entry): + continue + data_file = manifest_entry.data_file if data_file.content == DataFileContent.DATA: data_entries.append(manifest_entry) @@ -1834,116 +2184,78 @@ def plan_files(self) -> Iterable[FileScanTask]: for data_entry in data_entries ] - def to_arrow(self) -> pa.Table: - """Read an Arrow table eagerly from this DataScan. - - All rows will be loaded into memory at once. - - Returns: - pa.Table: Materialized Arrow Table from the Iceberg table's DataScan - """ - from pyiceberg.io.pyarrow import ArrowScan - - return ArrowScan( - self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit - ).to_table(self.plan_files()) - - def to_arrow_batch_reader(self) -> pa.RecordBatchReader: - """Return an Arrow RecordBatchReader from this DataScan. - - For large results, using a RecordBatchReader requires less memory than - loading an Arrow Table for the same DataScan, because a RecordBatch - is read one at a time. - - Returns: - pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan - which can be used to read a stream of record batches one by one. - """ - import pyarrow as pa - - from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow - - target_schema = schema_to_pyarrow(self.projection()) - batches = ArrowScan( - self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit - ).to_record_batches(self.plan_files()) + @cached_property + def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: + return KeyDefaultDict(self._build_partition_projection) - return pa.RecordBatchReader.from_batches( - target_schema, - batches, - ).cast(target_schema) + def _build_partition_projection(self, spec_id: int) -> BooleanExpression: + project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive) + return project(self.row_filter) - def to_pandas(self, **kwargs: Any) -> pd.DataFrame: - """Read a Pandas DataFrame eagerly from this Iceberg table. + def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: + spec = self.table_metadata.specs()[spec_id] + return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive) - Returns: - pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table - """ - return self.to_arrow().to_pandas(**kwargs) + def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]: + spec = self.table_metadata.specs()[spec_id] + partition_type = spec.partition_type(self.table_metadata.schema()) + partition_schema = Schema(*partition_type.fields) + partition_expr = self.partition_filters[spec_id] - def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: - """Shorthand for loading the Iceberg Table in DuckDB. + # The lambda created here is run in multiple threads. + # So we avoid creating _EvaluatorExpression methods bound to a single + # shared instance across multiple threads. + return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition) - Returns: - DuckDBPyConnection: In memory DuckDB connection with the Iceberg table. - """ - import duckdb + def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]: + schema = self.table_metadata.schema() + include_empty_files = strtobool(self.options.get("include_empty_files", "false")) - con = connection or duckdb.connect(database=":memory:") - con.register(table_name, self.to_arrow()) + # The lambda created here is run in multiple threads. + # So we avoid creating _InclusiveMetricsEvaluator methods bound to a single + # shared instance across multiple threads. + return lambda data_file: _InclusiveMetricsEvaluator( + schema, + self.row_filter, + self.case_sensitive, + include_empty_files, + ).eval(data_file) - return con + def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]: + spec = self.table_metadata.specs()[spec_id] - def to_ray(self) -> ray.data.dataset.Dataset: - """Read a Ray Dataset eagerly from this Iceberg table. + # The lambda created here is run in multiple threads. + # So we avoid creating _EvaluatorExpression methods bound to a single + # shared instance across multiple threads. + # return lambda data_file: (partition_schema, partition_expr, self.case_sensitive)(data_file.partition) + from pyiceberg.expressions.visitors import residual_evaluator_of - Returns: - ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table - """ - import ray + # assert self.row_filter == False + return lambda datafile: ( + residual_evaluator_of( + spec=spec, + expr=self.row_filter, + case_sensitive=self.case_sensitive, + schema=self.table_metadata.schema(), + ) + ) - return ray.data.from_arrow(self.to_arrow()) + @staticmethod + def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool: + """Ensure that no manifests are loaded that contain deletes that are older than the data. - def to_polars(self) -> pl.DataFrame: - """Read a Polars DataFrame from this Iceberg table. + Args: + min_sequence_number (int): The minimal sequence number. + manifest (ManifestFile): A ManifestFile that can be either data or deletes. Returns: - pl.DataFrame: Materialized Polars Dataframe from the Iceberg table + Boolean indicating if it is either a data file, or a relevant delete file. """ - import polars as pl - - result = pl.from_arrow(self.to_arrow()) - if isinstance(result, pl.Series): - result = result.to_frame() - - return result - - def count(self) -> int: - from pyiceberg.io.pyarrow import ArrowScan - - # Usage: Calculates the total number of records in a Scan that haven't had positional deletes. - res = 0 - # every task is a FileScanTask - tasks = self.plan_files() - - for task in tasks: - # task.residual is a Boolean Expression if the filter condition is fully satisfied by the - # partition value and task.delete_files represents that positional delete haven't been merged yet - # hence those files have to read as a pyarrow table applying the filter and deletes - if task.residual == AlwaysTrue() and len(task.delete_files) == 0: - # Every File has a metadata stat that stores the file record count - res += task.file.record_count - else: - arrow_scan = ArrowScan( - table_metadata=self.table_metadata, - io=self.io, - projected_schema=self.projection(), - row_filter=self.row_filter, - case_sensitive=self.case_sensitive, - ) - tbl = arrow_scan.to_table([task]) - res += len(tbl) - return res + return manifest.content == ManifestContent.DATA or ( + # Not interested in deletes that are older than the data + manifest.content == ManifestContent.DELETES + and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number + ) @dataclass(frozen=True) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 8d1a24c420..f339f64fe6 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -440,7 +440,7 @@ def ancestors_of(current_snapshot: Optional[Snapshot], table_metadata: TableMeta def ancestors_between( from_snapshot: Optional[Snapshot], to_snapshot: Snapshot, table_metadata: TableMetadata ) -> Iterable[Snapshot]: - """Get the ancestors of and including the given snapshot between the to and from snapshots.""" + """Get the ancestors of and including the given snapshot between the to and from snapshots, both inclusively.""" if from_snapshot is not None: for snapshot in ancestors_of(to_snapshot, table_metadata): yield snapshot @@ -448,3 +448,32 @@ def ancestors_between( break else: yield from ancestors_of(to_snapshot, table_metadata) + + +def ancestors_between_ids( + from_snapshot_id_exclusive: Optional[int], + to_snapshot_id_inclusive: int, + table_metadata: TableMetadata, +) -> Iterable[Snapshot]: + """Return the ancestors of and including the given "to" snapshot, up to but not including the "from" snapshot. + + If from_snapshot_id_exclusive is None or no ancestors of the "to" snapshot match it, all ancestors of the "to" + snapshot are returned. + """ + if from_snapshot_id_exclusive is not None: + for snapshot in ancestors_of(table_metadata.snapshot_by_id(to_snapshot_id_inclusive), table_metadata): + if snapshot.snapshot_id == from_snapshot_id_exclusive: + break + + yield snapshot + else: + yield from ancestors_of(table_metadata.snapshot_by_id(to_snapshot_id_inclusive), table_metadata) + + +def is_ancestor_of(snapshot_id: int, ancestor_snapshot_id: int, table_metadata: TableMetadata) -> bool: + """Return whether ancestor_snapshot_id is an ancestor of snapshot_id.""" + for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): + if snapshot.snapshot_id == ancestor_snapshot_id: + return True + + return False diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 5ac5162f8e..e995b6bd48 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -1003,3 +1003,205 @@ def test_scan_with_datetime(catalog: Catalog) -> None: df = table.scan(row_filter=LessThan("datetime", yesterday)).to_pandas() assert len(df) == 0 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_append_only(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan() + # Only "append"-operation snapshots occurred in this range + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) + ) + + assert len(list(scan.plan_files())) == 2 + + # Check various read methods + assert len(scan.to_arrow()) == 3 + assert len(scan.to_arrow_batch_reader().read_all()) == 3 + assert len(scan.to_pandas()) == 3 + assert len(scan.to_polars()) == 3 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_ignores_non_append_snapshots(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = test_table.incremental_append_scan( + from_snapshot_id_exclusive=test_table.snapshots()[0].snapshot_id, + # This is a "delete"-operation snapshot, that should be ignored by the append scan + to_snapshot_id_inclusive=test_table.snapshots()[3].snapshot_id, + ) + + assert len(list(scan.plan_files())) == 2 + assert len(scan.to_arrow()) == 3 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_uses_current_schema(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan() + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) + ) + + # The schema within the snapshot range above included an extra date field, but the table was then replaced, + # removing it. An append scan always uses the current schema of the table. + expected_schema = pa.schema( + [ + pa.field("number", pa.int32()), + pa.field("letter", pa.string()), + ] + ) + + result_table = scan.to_arrow() + assert result_table.schema.equals(expected_schema) + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_row_filter(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan(row_filter=EqualTo("letter", "b")) + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) + ) + + # This filter should match against the only row added in snapshots[1] and one of the two rows added in snapshots[2] + assert len(list(scan.plan_files())) == 2 + assert len(scan.to_arrow()) == 2 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_selected_fields(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan(selected_fields=("number",)) + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) + ) + + expected_schema = pa.schema( + [ + pa.field("number", pa.int32()), + ] + ) + + result_table = scan.to_arrow() + assert result_table.schema.equals(expected_schema) + assert sorted(result_table["number"].to_pylist()) == [2, 3, 4] + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_limit(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan(limit=2) + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) + ) + + # Although three rows were added in the range, the limit of 2 should be applied + assert len(scan.to_arrow()) == 2 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_count(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan() + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) + ) + + assert scan.count() == 3 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive")]) +def test_incremental_append_scan_to_snapshot_defaults_to_current(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + assert ( + len(test_table.incremental_append_scan().from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id).to_arrow()) == 3 + ) + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_equal_from_and_to_snapshots(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + snapshot_id = test_table.snapshots()[0].snapshot_id + + # Exclusive-inclusive semantics mean an empty table should be returned if equal from and to snapshots are specified + assert ( + len( + test_table.incremental_append_scan() + .from_snapshot_exclusive(snapshot_id) + .to_snapshot_inclusive(snapshot_id) + .to_arrow() + ) + == 0 + ) + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_throws_on_disconnected_snapshots(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + from_id = test_table.snapshots()[0].snapshot_id + to_id = test_table.snapshots()[4].snapshot_id + + with pytest.raises(ValueError) as e: + test_table.incremental_append_scan( + from_snapshot_id_exclusive=from_id, + # A table replace occurred just before this snapshot, breaking snapshot lineage / incremental-ity + to_snapshot_id_inclusive=to_id, + ).plan_files() + + assert f"Append scan's start snapshot {from_id} is not an ancestor of end snapshot {to_id}" in str(e.value) + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_throws_on_missing_snapshot_ids(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + # from_snapshot_id not specified + with pytest.raises(ValueError) as e: + test_table.incremental_append_scan( + to_snapshot_id_inclusive=test_table.snapshots()[0].snapshot_id, + ).plan_files() + assert "Start snapshot of append scan unspecified, please set from_snapshot_id_exclusive" in str(e.value) + + # from_snapshot_id missing from metadata + with pytest.raises(ValueError) as e: + test_table.incremental_append_scan( + from_snapshot_id_exclusive=42, + to_snapshot_id_inclusive=test_table.snapshots()[0].snapshot_id, + ).plan_files() + assert "Start snapshot of append scan not found on table metadata: 42" in str(e.value) + + # to_snapshot_id missing from metadata + with pytest.raises(ValueError) as e: + test_table.incremental_append_scan( + from_snapshot_id_exclusive=test_table.snapshots()[0].snapshot_id, + to_snapshot_id_inclusive=42, + ).plan_files() + assert "End snapshot of append scan not found on table metadata: 42" in str(e.value) diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index b71d92aa55..343efd9ae3 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -29,7 +29,9 @@ SnapshotSummaryCollector, Summary, ancestors_between, + ancestors_between_ids, ancestors_of, + is_ancestor_of, update_snapshot_summaries, ) from pyiceberg.transforms import IdentityTransform @@ -434,3 +436,43 @@ def test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None: ) == 2000 ) + + +def test_is_ancestor_of(table_v2: Table) -> None: + snapshot_id, ancestor_snapshot_id = 3055729675574597004, 3051729675574597004 + + assert is_ancestor_of(snapshot_id, ancestor_snapshot_id, table_v2.metadata) + assert not is_ancestor_of(ancestor_snapshot_id, snapshot_id, table_v2.metadata) + + +def test_ancestors_between_ids(table_v2: Table) -> None: + snapshot_id, ancestor_snapshot_id = 3055729675574597004, 3051729675574597004 + + result = list(ancestors_between_ids(ancestor_snapshot_id, snapshot_id, table_v2.metadata)) + ids = [ancestor.snapshot_id for ancestor in result] + + # Exclusive-inclusive semantics means just 'snapshot_id' should be returned + assert ids == [snapshot_id] + + +def test_ancestors_between_equal_ids(table_v2: Table) -> None: + snapshot_id = 3055729675574597004 + + result = list(ancestors_between_ids(snapshot_id, snapshot_id, table_v2.metadata)) + + # Exclusive-inclusive semantics mean no ancestors should be returned + assert result == [] + + +def test_ancestors_between_ids_missing_from_snapshot(table_v2: Table) -> None: + snapshot_id, ancestor_snapshot_id = 3055729675574597004, 3051729675574597004 + + result = list( + ancestors_between_ids( + from_snapshot_id_exclusive=None, to_snapshot_id_inclusive=snapshot_id, table_metadata=table_v2.metadata + ) + ) + ids = [ancestor.snapshot_id for ancestor in result] + + # With a from snapshot missing, all ancestors should be returned + assert ids == [snapshot_id, ancestor_snapshot_id]