-
Notifications
You must be signed in to change notification settings - Fork 302
Incremental Append Scan #2031
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Incremental Append Scan #2031
Changes from 12 commits
57f51a2
b5d3363
ea251e0
d47c206
61b4f19
2747e19
08ad36f
99021cc
aeca168
6cb3539
f0ee5cb
14cbe27
a2e063e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -413,3 +413,58 @@ | |
) | ||
spark.sql(f"ALTER TABLE {catalog_name}.default.test_empty_scan_ordered_str WRITE ORDERED BY id") | ||
spark.sql(f"INSERT INTO {catalog_name}.default.test_empty_scan_ordered_str VALUES 'a', 'c'") | ||
|
||
spark.sql( | ||
f""" | ||
CREATE OR REPLACE TABLE {catalog_name}.default.test_incremental_read ( | ||
dt date, | ||
number integer, | ||
letter string | ||
) | ||
USING iceberg | ||
TBLPROPERTIES ( | ||
'format-version'='2' | ||
); | ||
""" | ||
) | ||
|
||
spark.sql( | ||
f""" | ||
INSERT INTO {catalog_name}.default.test_incremental_read | ||
VALUES (CAST('2022-03-01' AS date), 1, 'a') | ||
""" | ||
) | ||
|
||
spark.sql( | ||
f""" | ||
INSERT INTO {catalog_name}.default.test_incremental_read | ||
VALUES (CAST('2022-03-01' AS date), 2, 'b') | ||
""" | ||
) | ||
|
||
spark.sql( | ||
f""" | ||
INSERT INTO {catalog_name}.default.test_incremental_read | ||
VALUES (CAST('2022-03-02' AS date), 3, 'c'), (CAST('2022-03-02' AS date), 4, 'b') | ||
""" | ||
) | ||
|
||
spark.sql( | ||
f""" | ||
DELETE FROM {catalog_name}.default.test_incremental_read | ||
WHERE number = 2 | ||
""" | ||
) | ||
|
||
# https://github.com/apache/iceberg/issues/1092#issuecomment-638432848 / https://github.com/apache/iceberg/issues/3747#issuecomment-1145419407 | ||
# Don't do replace for Hive catalog as REPLACE TABLE requires certain Hive server configuration | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is probably fixable 🤔 (but I also made use of this here) |
||
if catalog_name != "hive": | ||
# Replace to break snapshot lineage: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Strictly speaking, I don't need this case because I can test broken lineage throwing just by inverting snapshot orders. But this feels like a realer use case to me. And changing the schema in this way also lets me test that the table's current schema is always used 😄 |
||
spark.sql( | ||
f""" | ||
REPLACE TABLE {catalog_name}.default.test_incremental_read | ||
USING iceberg | ||
TBLPROPERTIES ('format-version'='2') | ||
AS SELECT number, letter FROM {catalog_name}.default.test_incremental_read | ||
""" | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -717,6 +717,14 @@ def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List | |
if not discard_deleted or entry.status != ManifestEntryStatus.DELETED | ||
] | ||
|
||
def __eq__(self, other: Any) -> bool: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changes in this file are from #533. To elaborate on why they're needed:
|
||
"""Return the equality of two instances of the ManifestFile class.""" | ||
return self.manifest_path == other.manifest_path if isinstance(other, ManifestFile) else False | ||
|
||
def __hash__(self) -> int: | ||
"""Return the hash of manifest_path.""" | ||
return hash(self.manifest_path) | ||
|
||
|
||
@cached(cache=LRUCache(maxsize=128), key=lambda io, manifest_list: hashkey(manifest_list)) | ||
def _manifests(io: FileIO, manifest_list: str) -> Tuple[ManifestFile, ...]: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Same as in #533)