-
Notifications
You must be signed in to change notification settings - Fork 302
Incremental Append Scan #2031
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Incremental Append Scan #2031
Conversation
from_snapshot_id_exclusive: Optional[int], | ||
to_snapshot_id_inclusive: int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realise these semantics are confusing given that ancestors_between
above is inclusive-inclusive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ancestors_between
is only used in validation_history
so far.... do we need both or can we consolidate to just use snapshot_id
instead of Snapshot
objects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I probably should have just used table_metadata.snapshot_by_id
now that I am seeing it lol
yield from ancestors_of(table_metadata.snapshot_by_id(to_snapshot_id_inclusive), table_metadata) | ||
|
||
|
||
def is_ancestor_of(snapshot_id: int, ancestor_snapshot_id: int, table_metadata: TableMetadata) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If from_snapshot_id_exclusive is None or no ancestors of the "to" snapshot match it, all ancestors of the "to" | ||
snapshot are returned. | ||
""" | ||
if from_snapshot_id_exclusive is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm following the structure of ancestors_between
above here and caseworking, but I don't think it's strictly needed though
# https://github.com/apache/iceberg/issues/1092#issuecomment-638432848 / https://github.com/apache/iceberg/issues/3747#issuecomment-1145419407 | ||
# REPLACE TABLE requires certain Hive server configuration | ||
if catalog_name != "hive": | ||
# Replace to break snapshot lineage: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strictly speaking, I don't need this case because I can test broken lineage throwing just by inverting snapshot orders. But this feels like a realer use case to me. And changing the schema in this way also lets me test that the table's current schema is always used 😄
@@ -717,6 +717,14 @@ def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List | |||
if not discard_deleted or entry.status != ManifestEntryStatus.DELETED | |||
] | |||
|
|||
def __eq__(self, other: Any) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes in this file are from #533.
To elaborate on why they're needed:
- We maintain a set of manifest files (see Incremental Append Scan #2031 (comment)) when planning an append scan
- Equality and hash methods are therefore needed. This is all inspired by the Java implementation, see https://github.com/apache/iceberg/blob/1911c94ea605a3d3f10a1994b046f00a5e9fdceb/core/src/main/java/org/apache/iceberg/GenericManifestFile.java#L407-L421.
) | ||
|
||
# https://github.com/apache/iceberg/issues/1092#issuecomment-638432848 / https://github.com/apache/iceberg/issues/3747#issuecomment-1145419407 | ||
# Don't do replace for Hive catalog as REPLACE TABLE requires certain Hive server configuration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably fixable 🤔 (but I also made use of this here)
|
||
spark.sql( | ||
f""" | ||
CREATE OR REPLACE TABLE {catalog_name}.default.test_incremental_read ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Same as in #533)
return iter([]) | ||
|
||
append_snapshot_ids: Set[int] = {snapshot.snapshot_id for snapshot in append_snapshots} | ||
|
||
manifests = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that we maintain a set of manifest files, just like https://github.com/apache/iceberg/blob/1911c94ea605a3d3f10a1994b046f00a5e9fdceb/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java#L70-L74.
@@ -1092,6 +1096,61 @@ def scan( | |||
limit=limit, | |||
) | |||
|
|||
# TODO: Consider more concise name | |||
def incremental_append_scan( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts on this method's name? It's a bit verbose, but can't think of a better alternative
row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, | ||
selected_fields: Tuple[str, ...] = ("*",), | ||
case_sensitive: bool = True, | ||
from_snapshot_id_exclusive: Optional[int] = None, | ||
to_snapshot_id_inclusive: Optional[int] = None, | ||
options: Properties = EMPTY_DICT, | ||
limit: Optional[int] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For folks reviewing: with the exception of the optional snapshot IDs, these are the same args as the default scan
method just above. I think they all make sense for an append scan too. (I also added some tests)
from_snapshot_id_exclusive: | ||
Optional ID of the "from" snapshot, to start the incremental scan from, exclusively. This can be set | ||
on the IncrementalAppendScan object returned, but ultimately must not be None. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a significant, user-facing change compared to #533. This PR throws if the from
snapshot ID is not set, but that PR defaults to the oldest ancestor of the end snapshot inclusively. Here is my argument, would love to hear what people think:
- Incremental Append Scan #533 is good in that it sticks very closely to the Java implementation of
IncrementalScan
- see the docstring here. - However, Spark marks the
start-snapshot-id
as non-optional, and throws if it's not provided. See docs - IMO: the Java APIs are not user-facing, but may be written in a generalised way to allow for flexibility in the engines that consume the APIs which are user-facing. The difference with PyIceberg is that it is user-facing and so I claim should be more engine-inspired.
- Here, the Spark behaviour makes sense, IMO. It's required for an append scan, but not for a changelog scan.
- If you omit it for the changelog scan, which a user would do to "read from the start of the table", you just need to check for the oldest ancestor because that root snapshot was the last replace, so the changelog would anyway begin from there. Looking at the docs, it indeed does just say "If not provided, it reads from the table’s first snapshot inclusively."
- But an append scan is documented as reading all data from append snapshots alone, ignoring all other snapshot types. Therefore, the intuitive meaning of reading from the first snapshot doesn't actually hold because all appends from the first snapshot wouldn't necessarily be relayed (just the ones from the last replace). To avoid user confusion, I think it's best to throw as Spark does here.
|
||
|
||
class TableScan(ABC): | ||
class AbstractTableScan(ABC): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Github's diff here is messed up.
A goal of this PR is minimising user-facing breaks (I believe), which I am guessing was the main concern with #533 that e.g. removed snapshot_id
from TableScan
. In this PR, I keep that field - this means it doesn't make sense for incremental scans to subclass TableScan
because they have two snapshot IDs.
I therefore introduced this abstract class to be a base class for all table scans, non-incremental and incremental.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to do this rename? Changing the name would break any other library that relies on this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #2031 (comment) - TableScan
still exists with the same methods so I'd have thought we'd be fine here, or do I misunderstand?
The purpose of this new class is a base class for table scans, including incremental ones, because logic can be shared. But your point about reconsidering class hierarchy still holds
"""Create a copy of this table scan with updated fields.""" | ||
return type(self)(**{**self.__dict__, **overrides}) | ||
|
||
def to_pandas(self, **kwargs: Any) -> pd.DataFrame: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A minor user-facing change (IMHO) is that these methods on TableScan
that subclasses this now have default implementations based on the to_arrow
abstract method. This feels OK to me - its subclasses can override is needed, but maybe should be documented (if we want to go with this approach).
return result | ||
|
||
|
||
class FileBasedScan(AbstractTableScan, ABC): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In light of #533 (comment), I think it makes sense to have some abstraction for scans that return FileScanTask
s specifically.
I think we maybe should've been doing some handling before - on main
, this line gives me a warning
iceberg-python/pyiceberg/table/__init__.py
Line 1933 in f47513b
if task.residual == AlwaysTrue() and len(task.delete_files) == 0: |
because
iceberg-python/pyiceberg/table/__init__.py
Lines 1926 to 1927 in f47513b
# every task is a FileScanTask | |
tasks = self.plan_files() |
doesn't follow from the typing. But overriding this return type a Iterable[FileScanTask] fixes that, as here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The FileScanTasks
-based scan abstraction also means we can provide default implementations of methods like to_arrow
and the other ones here based on FileScanTasks
being returned by plan_files
. This reduces duplication - we get them on both DataScan
and IncrementalAppendScan
.
"""A base class for table scans that plan FileScanTasks.""" | ||
|
||
@cached_property | ||
def _manifest_group_planner(self) -> ManifestGroupPlanner: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The motivation for a manifest-based file scan task planner comes from the Java-side https://github.com/apache/iceberg/blob/1911c94ea605a3d3f10a1994b046f00a5e9fdceb/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java#L76-L97 (class here).
I cache this property on this class because the class has a partition_filters
method that is cached on that class. It's therefore cached for the FileBasedScan
's lifetime, similar to what we had before:
iceberg-python/pyiceberg/table/__init__.py
Lines 1696 to 1698 in f47513b
@cached_property | |
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: | |
return KeyDefaultDict(self._build_partition_projection) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me, the abstraction of something that handles planning such tasks from manifests makes sense and naturally reduces duplication with the new append scan type. I think other design decisions are possible like introducing a member for it or rethinking the abstraction. This felt fine to me (I like keeping this class thin)
class TableScan(AbstractTableScan, ABC): | ||
"""A base class for non-incremental table scans that target a single snapshot.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See https://github.com/apache/iceberg-python/pull/2031/files#r2102683614.
I don't love this - if compatibility isn't a concern, I'd refactor the hierarchy like #533 did. I can't see much nuance this offers as a base class over DataScan
, although it doesn't extend from FileBasedScan
but DataScan
does.
return self._manifest_group_planner.plan_files(manifests=snapshot.manifests(self.io)) | ||
|
||
# TODO: Document motivation and un-caching | ||
@property |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was previously a cached_property
but now that _manifest_group_planner
is cached, it can just be a property. (I don't this is a user-facing change)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method isn't used by PyIceberg because it was moved into ManifestGroupPlanner
that concerns itself with those things. The only reasons I'm keeping it is compatibility and maybe it was public on DataScan
for a reason before, i.e. maybe library users were interested in using it.
partition_type = spec.partition_type(self.table_metadata.schema()) | ||
partition_schema = Schema(*partition_type.fields) | ||
partition_expr = self.partition_filters[spec_id] | ||
A = TypeVar("A", bound="IncrementalScan", covariant=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Linter didn't like I
😢
def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]: | ||
schema = self.table_metadata.schema() | ||
include_empty_files = strtobool(self.options.get("include_empty_files", "false")) | ||
class IncrementalScan(AbstractTableScan, ABC): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm differing from both #533 and the Java-side here in keeping this thin and not performing any snapshot defaults / validation in this abstract super-class.
This is because of my claim in #2031 (comment) (happy to discuss) - IMHO, both append and changelog scans should perform their own, probably different defaults / validation because we're designing APIs for user, not engine use.
cc @chinmay-bhat, would love to hear your thoughts on this and this PR!
) | ||
|
||
def plan_files(self) -> Iterable[FileScanTask]: | ||
"""Plans the relevant files by filtering on the PartitionSpecs. | ||
from_snapshot_id, to_snapshot_id = self._validate_and_resolve_snapshots() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation here is largely inspired by the Java implementation https://github.com/apache/iceberg/blob/1911c94ea605a3d3f10a1994b046f00a5e9fdceb/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java#L46.
manifest_entry_filter=lambda manifest_entry: manifest_entry.snapshot_id in append_snapshot_ids | ||
and manifest_entry.status == ManifestEntryStatus.ADDED, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.case_sensitive = scan.case_sensitive | ||
self.options = scan.options | ||
|
||
def plan_files( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, the diff here is messed up. This is the same as the relevant body of previous DataFile
method, but we filter on manifest_evaluators
within this method itself on the manifests provided. We also introduce this manifest_entry_filter
that's Java inspired
if not manifest_entry_filter(manifest_entry): | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
return ray.data.from_arrow(self.to_arrow()) | ||
# TODO: Document that this method was was made static | ||
@staticmethod |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made this method static (it wasn't before - the one on DataScan
).
] | ||
|
||
|
||
class ManifestGroupPlanner: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class effectively extracts the code relevant to planning based on manifest files previously in DataScan
. The code is largely the same (differences pointed out in comments)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See https://github.com/apache/iceberg-python/pull/2031/files#r2102715753 regarding inspiration
|
||
|
||
@pytest.mark.integration | ||
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Using just the REST catalog that has the replace here to get a different schema)
|
||
|
||
@pytest.mark.integration | ||
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Using just the REST catalog that has the replace here)
Put up apache/iceberg#13179 regarding an append-only option for the Spark side. Would like to hear people's thoughts there - but I think we can proceed with this PR as it is now |
@Fokko, please may you review this? |
Friendly ping, @Fokko @kevinjqliu |
Thanks for picking it up. Due to some reasons, I've been away for quite a while. I'm sorry for not making progress on this work. |
@smaheshwar-pltr First of all, sorry for the long wait, and thanks for picking this up. I'll have to look into this in more detail next week. It would be great to break the reviews into smaller pieces to speed up the reviews. Going over the PR, I'm not sure if we want to copy the whole class hierarchy from Java, as this does not feel very Python in my opinion. |
Not sure why CI failed when tests pass for me locally - did see that error on other PRs, so merged |
Thanks for taking a look, @Fokko - that makes sense. I think abstract classes used in this PR's way achieve append scan functionalities nicely and without duplication (see the added tests in Maybe this PR's logic, of planning files for an append scan, APIs, and tests can still be reviewed? |
Revival of #533 given inactivity there.
Co (/main 😄)-author: @hililiwei, apologies to them if they were stilling working on this. Significant credit goes to them - thank you, @hililiwei!
CC some folks I saw reviewing previous PR / dicussions: @Fokko @chinmay-bhat @kevinjqliu.
I think a bit more work is required (or more depending on people's thoughts) but happy for a review to start now.
Rationale for this change
PyIceberg lacks incremental read utilities. I think it's been asked for multiple times - a strength of PyIceberg is that small data (such as just data appended between snapshots) can be processed theoretically much faster than e.g. Spark, so IMHO, incremental scans are one of the most needed features of PyIceberg right now.
Are these changes tested?
Yes, with unit tests and integration tests.
Are there any user-facing changes?