Skip to content

Incremental Append Scan #2031

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

smaheshwar-pltr
Copy link
Contributor

@smaheshwar-pltr smaheshwar-pltr commented May 21, 2025

Revival of #533 given inactivity there.

Co (/main 😄)-author: @hililiwei, apologies to them if they were stilling working on this. Significant credit goes to them - thank you, @hililiwei!

CC some folks I saw reviewing previous PR / dicussions: @Fokko @chinmay-bhat @kevinjqliu.

I think a bit more work is required (or more depending on people's thoughts) but happy for a review to start now.

Rationale for this change

PyIceberg lacks incremental read utilities. I think it's been asked for multiple times - a strength of PyIceberg is that small data (such as just data appended between snapshots) can be processed theoretically much faster than e.g. Spark, so IMHO, incremental scans are one of the most needed features of PyIceberg right now.

Are these changes tested?

Yes, with unit tests and integration tests.

Are there any user-facing changes?

Comment on lines +454 to +455
from_snapshot_id_exclusive: Optional[int],
to_snapshot_id_inclusive: int,
Copy link
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realise these semantics are confusing given that ancestors_between above is inclusive-inclusive

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ancestors_between is only used in validation_history so far.... do we need both or can we consolidate to just use snapshot_id instead of Snapshot objects?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably should have just used table_metadata.snapshot_by_id now that I am seeing it lol

yield from ancestors_of(table_metadata.snapshot_by_id(to_snapshot_id_inclusive), table_metadata)


def is_ancestor_of(snapshot_id: int, ancestor_snapshot_id: int, table_metadata: TableMetadata) -> bool:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If from_snapshot_id_exclusive is None or no ancestors of the "to" snapshot match it, all ancestors of the "to"
snapshot are returned.
"""
if from_snapshot_id_exclusive is not None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm following the structure of ancestors_between above here and caseworking, but I don't think it's strictly needed though

# https://github.com/apache/iceberg/issues/1092#issuecomment-638432848 / https://github.com/apache/iceberg/issues/3747#issuecomment-1145419407
# REPLACE TABLE requires certain Hive server configuration
if catalog_name != "hive":
# Replace to break snapshot lineage:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly speaking, I don't need this case because I can test broken lineage throwing just by inverting snapshot orders. But this feels like a realer use case to me. And changing the schema in this way also lets me test that the table's current schema is always used 😄

@@ -717,6 +717,14 @@ def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List
if not discard_deleted or entry.status != ManifestEntryStatus.DELETED
]

def __eq__(self, other: Any) -> bool:
Copy link
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes in this file are from #533.

To elaborate on why they're needed:

)

# https://github.com/apache/iceberg/issues/1092#issuecomment-638432848 / https://github.com/apache/iceberg/issues/3747#issuecomment-1145419407
# Don't do replace for Hive catalog as REPLACE TABLE requires certain Hive server configuration
Copy link
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably fixable 🤔 (but I also made use of this here)


spark.sql(
f"""
CREATE OR REPLACE TABLE {catalog_name}.default.test_incremental_read (
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Same as in #533)

return iter([])

append_snapshot_ids: Set[int] = {snapshot.snapshot_id for snapshot in append_snapshots}

manifests = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -1092,6 +1096,61 @@ def scan(
limit=limit,
)

# TODO: Consider more concise name
def incremental_append_scan(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on this method's name? It's a bit verbose, but can't think of a better alternative

Comment on lines +1102 to +1108
row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
selected_fields: Tuple[str, ...] = ("*",),
case_sensitive: bool = True,
from_snapshot_id_exclusive: Optional[int] = None,
to_snapshot_id_inclusive: Optional[int] = None,
options: Properties = EMPTY_DICT,
limit: Optional[int] = None,
Copy link
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For folks reviewing: with the exception of the optional snapshot IDs, these are the same args as the default scan method just above. I think they all make sense for an append scan too. (I also added some tests)

Comment on lines +1125 to +1127
from_snapshot_id_exclusive:
Optional ID of the "from" snapshot, to start the incremental scan from, exclusively. This can be set
on the IncrementalAppendScan object returned, but ultimately must not be None.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a significant, user-facing change compared to #533. This PR throws if the from snapshot ID is not set, but that PR defaults to the oldest ancestor of the end snapshot inclusively. Here is my argument, would love to hear what people think:

  • Incremental Append Scan #533 is good in that it sticks very closely to the Java implementation of IncrementalScan - see the docstring here.
  • However, Spark marks the start-snapshot-id as non-optional, and throws if it's not provided. See docs
  • IMO: the Java APIs are not user-facing, but may be written in a generalised way to allow for flexibility in the engines that consume the APIs which are user-facing. The difference with PyIceberg is that it is user-facing and so I claim should be more engine-inspired.
  • Here, the Spark behaviour makes sense, IMO. It's required for an append scan, but not for a changelog scan.
    • If you omit it for the changelog scan, which a user would do to "read from the start of the table", you just need to check for the oldest ancestor because that root snapshot was the last replace, so the changelog would anyway begin from there. Looking at the docs, it indeed does just say "If not provided, it reads from the table’s first snapshot inclusively."
    • But an append scan is documented as reading all data from append snapshots alone, ignoring all other snapshot types. Therefore, the intuitive meaning of reading from the first snapshot doesn't actually hold because all appends from the first snapshot wouldn't necessarily be relayed (just the ones from the last replace). To avoid user confusion, I think it's best to throw as Spark does here.



class TableScan(ABC):
class AbstractTableScan(ABC):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Github's diff here is messed up.

A goal of this PR is minimising user-facing breaks (I believe), which I am guessing was the main concern with #533 that e.g. removed snapshot_id from TableScan. In this PR, I keep that field - this means it doesn't make sense for incremental scans to subclass TableScan because they have two snapshot IDs.

I therefore introduced this abstract class to be a base class for all table scans, non-incremental and incremental.

Copy link
Contributor

@Fokko Fokko Jun 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do this rename? Changing the name would break any other library that relies on this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #2031 (comment) - TableScan still exists with the same methods so I'd have thought we'd be fine here, or do I misunderstand?

The purpose of this new class is a base class for table scans, including incremental ones, because logic can be shared. But your point about reconsidering class hierarchy still holds

"""Create a copy of this table scan with updated fields."""
return type(self)(**{**self.__dict__, **overrides})

def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A minor user-facing change (IMHO) is that these methods on TableScan that subclasses this now have default implementations based on the to_arrow abstract method. This feels OK to me - its subclasses can override is needed, but maybe should be documented (if we want to go with this approach).

return result


class FileBasedScan(AbstractTableScan, ABC):
Copy link
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In light of #533 (comment), I think it makes sense to have some abstraction for scans that return FileScanTasks specifically.

I think we maybe should've been doing some handling before - on main, this line gives me a warning

if task.residual == AlwaysTrue() and len(task.delete_files) == 0:

because

# every task is a FileScanTask
tasks = self.plan_files()

doesn't follow from the typing. But overriding this return type a Iterable[FileScanTask] fixes that, as here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FileScanTasks-based scan abstraction also means we can provide default implementations of methods like to_arrow and the other ones here based on FileScanTasks being returned by plan_files. This reduces duplication - we get them on both DataScan and IncrementalAppendScan.

"""A base class for table scans that plan FileScanTasks."""

@cached_property
def _manifest_group_planner(self) -> ManifestGroupPlanner:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation for a manifest-based file scan task planner comes from the Java-side https://github.com/apache/iceberg/blob/1911c94ea605a3d3f10a1994b046f00a5e9fdceb/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java#L76-L97 (class here).

I cache this property on this class because the class has a partition_filters method that is cached on that class. It's therefore cached for the FileBasedScan's lifetime, similar to what we had before:

@cached_property
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
return KeyDefaultDict(self._build_partition_projection)

Copy link
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, the abstraction of something that handles planning such tasks from manifests makes sense and naturally reduces duplication with the new append scan type. I think other design decisions are possible like introducing a member for it or rethinking the abstraction. This felt fine to me (I like keeping this class thin)

Comment on lines +1754 to +1755
class TableScan(AbstractTableScan, ABC):
"""A base class for non-incremental table scans that target a single snapshot."""
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See https://github.com/apache/iceberg-python/pull/2031/files#r2102683614.

I don't love this - if compatibility isn't a concern, I'd refactor the hierarchy like #533 did. I can't see much nuance this offers as a base class over DataScan, although it doesn't extend from FileBasedScan but DataScan does.

return self._manifest_group_planner.plan_files(manifests=snapshot.manifests(self.io))

# TODO: Document motivation and un-caching
@property
Copy link
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was previously a cached_property but now that _manifest_group_planner is cached, it can just be a property. (I don't this is a user-facing change)

Copy link
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method isn't used by PyIceberg because it was moved into ManifestGroupPlanner that concerns itself with those things. The only reasons I'm keeping it is compatibility and maybe it was public on DataScan for a reason before, i.e. maybe library users were interested in using it.

partition_type = spec.partition_type(self.table_metadata.schema())
partition_schema = Schema(*partition_type.fields)
partition_expr = self.partition_filters[spec_id]
A = TypeVar("A", bound="IncrementalScan", covariant=True)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Linter didn't like I 😢

def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]:
schema = self.table_metadata.schema()
include_empty_files = strtobool(self.options.get("include_empty_files", "false"))
class IncrementalScan(AbstractTableScan, ABC):
Copy link
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm differing from both #533 and the Java-side here in keeping this thin and not performing any snapshot defaults / validation in this abstract super-class.

This is because of my claim in #2031 (comment) (happy to discuss) - IMHO, both append and changelog scans should perform their own, probably different defaults / validation because we're designing APIs for user, not engine use.

cc @chinmay-bhat, would love to hear your thoughts on this and this PR!

)

def plan_files(self) -> Iterable[FileScanTask]:
"""Plans the relevant files by filtering on the PartitionSpecs.
from_snapshot_id, to_snapshot_id = self._validate_and_resolve_snapshots()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +2057 to +2058
manifest_entry_filter=lambda manifest_entry: manifest_entry.snapshot_id in append_snapshot_ids
and manifest_entry.status == ManifestEntryStatus.ADDED,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.case_sensitive = scan.case_sensitive
self.options = scan.options

def plan_files(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, the diff here is messed up. This is the same as the relevant body of previous DataFile method, but we filter on manifest_evaluators within this method itself on the manifests provided. We also introduce this manifest_entry_filter that's Java inspired

Comment on lines +2162 to +2163
if not manifest_entry_filter(manifest_entry):
continue
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


return ray.data.from_arrow(self.to_arrow())
# TODO: Document that this method was was made static
@staticmethod
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this method static (it wasn't before - the one on DataScan).

]


class ManifestGroupPlanner:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class effectively extracts the code relevant to planning based on manifest files previously in DataScan. The code is largely the same (differences pointed out in comments)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.



@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Using just the REST catalog that has the replace here to get a different schema)



@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")])
Copy link
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Using just the REST catalog that has the replace here)

@smaheshwar-pltr smaheshwar-pltr marked this pull request as ready for review May 22, 2025 15:04
@smaheshwar-pltr
Copy link
Contributor Author

smaheshwar-pltr commented May 28, 2025

Put up apache/iceberg#13179 regarding an append-only option for the Spark side. Would like to hear people's thoughts there - but I think we can proceed with this PR as it is now

@smaheshwar-pltr
Copy link
Contributor Author

@Fokko, please may you review this?

@smaheshwar-pltr
Copy link
Contributor Author

Friendly ping, @Fokko @kevinjqliu

@hililiwei
Copy link
Contributor

Thanks for picking it up. Due to some reasons, I've been away for quite a while. I'm sorry for not making progress on this work.

@Fokko
Copy link
Contributor

Fokko commented Jun 8, 2025

@smaheshwar-pltr First of all, sorry for the long wait, and thanks for picking this up. I'll have to look into this in more detail next week. It would be great to break the reviews into smaller pieces to speed up the reviews. Going over the PR, I'm not sure if we want to copy the whole class hierarchy from Java, as this does not feel very Python in my opinion.

@smaheshwar-pltr
Copy link
Contributor Author

smaheshwar-pltr commented Jun 11, 2025

Not sure why CI failed when tests pass for me locally - did see that error on other PRs, so merged main just now to see if that fixes it. Don't think that failure was related to this PR

@smaheshwar-pltr
Copy link
Contributor Author

Going over the PR, I'm not sure if we want to copy the whole class hierarchy from Java, as this does not feel very Python in my opinion.

Thanks for taking a look, @Fokko - that makes sense.

I think abstract classes used in this PR's way achieve append scan functionalities nicely and without duplication (see the added tests in tests/integration/test_reads.py), and in line with existing code: the previous TableScan has abstract methods. But the hierarchy indeed may be confusing, maybe even more than usual given I've tried to not introduce breaks, and it's true that rearrangement bloats the diff 😄.

Maybe this PR's logic, of planning files for an append scan, APIs, and tests can still be reviewed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants