-
Notifications
You must be signed in to change notification settings - Fork 308
Added ExpireSnapshots Feature #1880
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+331
−5
Merged
Changes from 8 commits
Commits
Show all changes
36 commits
Select commit
Hold shift + click to select a range
0a94d96
Added initial units tests and Class for Removing a Snapshot
ForeverAngry 5f0b62b
Added methods needed to expire snapshots by id, and optionally cleanu…
ForeverAngry f995daa
Update test_expire_snapshots.py
ForeverAngry 65365e1
Added the builder method to __init__.py, updated the snapshot api wit…
ForeverAngry e28815f
Snapshots are not being transacted on, but need to re-assign refs
ForeverAngry 4628ede
Fixed the test case.
ForeverAngry e80c41c
adding print statements to help with debugging
ForeverAngry cb9f0c9
Draft ready
ForeverAngry ebcff2b
Applied suggestions to Fix CICD
ForeverAngry 97399bf
Merge branch 'main' into main
ForeverAngry 95e5af2
Rebuild the poetry lock file.
ForeverAngry 5ab5890
Merge branch 'main' into main
ForeverAngry 5acd690
Refactor implementation of `ExpireSnapshots`
ForeverAngry d30a08c
Fixed format and linting issues
ForeverAngry e62ab58
Merge branch 'main' into main
ForeverAngry 1af3258
Fixed format and linting issues
ForeverAngry 352b48f
Merge branch 'main' of https://github.com/ForeverAngry/iceberg-python
ForeverAngry 382e0ea
Merge branch 'main' into main
ForeverAngry 549c183
rebased: from main
ForeverAngry 386cb15
fixed: typo
ForeverAngry 12729fa
removed errant files
ForeverAngry ce3515c
Added: public method signature to the init table file.
ForeverAngry 28fce4b
Removed: `expire_snapshots_older_than` method, in favor of implementi…
ForeverAngry 2c3153e
Update tests/table/test_expire_snapshots.py
ForeverAngry 27c3ece
Removed: unrelated changes, Added: logic to expire snapshot method.
ForeverAngry 05793c0
Merge branch 'main' into 1880-add-expire-snapshots
ForeverAngry 8ec1889
Update test_partition_evolution.py
ForeverAngry b23ac6a
Update test_literals.py
ForeverAngry 5c458f2
Update snapshot.py
ForeverAngry a08eb6b
Update snapshot.py
ForeverAngry 689310d
Fixed: Linting
ForeverAngry 9031f06
Update test_expire_snapshots.py
ForeverAngry 3488314
Update test_expire_snapshots.py
ForeverAngry 4a1b9a3
Update pyiceberg/table/__init__.py
ForeverAngry 1cac992
Removed unused instance variable.
ForeverAngry 511ea91
Implement snapshot expiration with protection for branch/tag heads
ForeverAngry File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
from typing import Any, Dict, Tuple | ||
import pytest | ||
from pyiceberg.catalog.noop import NoopCatalog | ||
from pyiceberg.io import load_file_io | ||
from pyiceberg.table import Table | ||
|
||
import time | ||
from random import randint | ||
from typing import Any, Dict, Optional | ||
import pytest | ||
from pyiceberg.catalog.noop import NoopCatalog | ||
from pyiceberg.io import load_file_io | ||
from pyiceberg.table import Table | ||
from pyiceberg.table.metadata import TableMetadataV2 | ||
from pyiceberg.table import Table | ||
from pyiceberg.catalog.noop import NoopCatalog | ||
from pyiceberg.table.update import TableRequirement, TableUpdate | ||
# Mock definition for CommitTableResponse | ||
from pyiceberg.table.metadata import TableMetadataV2 | ||
from pyiceberg.schema import Schema | ||
from pyiceberg.types import NestedField, LongType | ||
from pyiceberg.partitioning import PartitionSpec, PartitionField | ||
from pyiceberg.transforms import BucketTransform, IdentityTransform | ||
from pyiceberg.table.sorting import SortOrder, SortField, SortDirection, NullOrder | ||
|
||
class CommitTableResponse: | ||
def __init__(self, metadata=None, metadata_location='s3://bucket/test/location'): | ||
if metadata is None: | ||
# Provide a default TableMetadata object to avoid NoneType errors | ||
metadata = TableMetadataV2( | ||
location=metadata_location, | ||
table_uuid='9c12d441-03fe-4693-9a96-a0705ddf69c1', | ||
last_updated_ms=1602638573590, | ||
last_column_id=3, | ||
schemas=[ | ||
Schema( | ||
NestedField(field_id=1, name="x", field_type=LongType(), required=True), | ||
NestedField(field_id=2, name="y", field_type=LongType(), required=True, doc="comment"), | ||
NestedField(field_id=3, name="z", field_type=LongType(), required=True), | ||
identifier_field_ids=[1, 2], | ||
schema_id=1 | ||
) | ||
], | ||
current_schema_id=1, | ||
partition_specs=[ | ||
PartitionSpec( | ||
PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="x"), spec_id=0 | ||
) | ||
], | ||
default_spec_id=0, | ||
sort_orders=[ | ||
SortOrder( | ||
SortField(source_id=2, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST), | ||
order_id=3 | ||
) | ||
], | ||
default_sort_order_id=3, | ||
properties={}, | ||
current_snapshot_id=None, | ||
snapshots=[], | ||
snapshot_log=[], | ||
metadata_log=[], | ||
refs={}, | ||
statistics=[], | ||
format_version=2, | ||
last_sequence_number=34 | ||
) | ||
self.metadata = metadata | ||
self.metadata_location = metadata_location | ||
|
||
class MockCatalog(NoopCatalog): | ||
def commit_table( | ||
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] | ||
) -> CommitTableResponse: | ||
# Mock implementation of commit_table | ||
return CommitTableResponse() | ||
|
||
@pytest.fixture | ||
def example_table_metadata_v2_with_extensive_snapshots() -> Dict[str, Any]: | ||
def generate_snapshot( | ||
snapshot_id: int, | ||
parent_snapshot_id: Optional[int] = None, | ||
timestamp_ms: Optional[int] = None, | ||
sequence_number: int = 0, | ||
) -> Dict[str, Any]: | ||
return { | ||
"snapshot-id": snapshot_id, | ||
"parent-snapshot-id": parent_snapshot_id, | ||
"timestamp-ms": timestamp_ms or int(time.time() * 1000), | ||
"sequence-number": sequence_number, | ||
"summary": {"operation": "append"}, | ||
"manifest-list": f"s3://a/b/{snapshot_id}.avro", | ||
} | ||
|
||
snapshots = [] | ||
snapshot_log = [] | ||
initial_snapshot_id = 3051729675574597004 | ||
|
||
for i in range(2000): | ||
snapshot_id = initial_snapshot_id + i | ||
parent_snapshot_id = snapshot_id - 1 if i > 0 else None | ||
timestamp_ms = int(time.time() * 1000) - randint(0, 1000000) | ||
snapshots.append(generate_snapshot(snapshot_id, parent_snapshot_id, timestamp_ms, i)) | ||
snapshot_log.append({"snapshot-id": snapshot_id, "timestamp-ms": timestamp_ms}) | ||
|
||
return { | ||
"format-version": 2, | ||
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", | ||
"location": "s3://bucket/test/location", | ||
"last-sequence-number": 34, | ||
"last-updated-ms": 1602638573590, | ||
"last-column-id": 3, | ||
"current-schema-id": 1, | ||
"schemas": [ | ||
{"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": True, "type": "long"}]}, | ||
{ | ||
"type": "struct", | ||
"schema-id": 1, | ||
"identifier-field-ids": [1, 2], | ||
"fields": [ | ||
{"id": 1, "name": "x", "required": True, "type": "long"}, | ||
{"id": 2, "name": "y", "required": True, "type": "long", "doc": "comment"}, | ||
{"id": 3, "name": "z", "required": True, "type": "long"}, | ||
], | ||
}, | ||
], | ||
"default-spec-id": 0, | ||
"partition-specs": [{"spec-id": 0, "fields": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}]}], | ||
"last-partition-id": 1000, | ||
"default-sort-order-id": 3, | ||
"sort-orders": [ | ||
{ | ||
"order-id": 3, | ||
"fields": [ | ||
{"transform": "identity", "source-id": 2, "direction": "asc", "null-order": "nulls-first"}, | ||
{"transform": "identity", "source-id": 3, "direction": "desc", "null-order": "nulls-last"}, # Adjusted field | ||
], | ||
} | ||
], | ||
"properties": {"read.split.target.size": "134217728"}, | ||
"current-snapshot-id": initial_snapshot_id + 1999, | ||
"snapshots": snapshots, | ||
"snapshot-log": snapshot_log, | ||
"metadata-log": [{"metadata-file": "s3://bucket/.../v1.json", "timestamp-ms": 1515100}], | ||
"refs": {"test": {"snapshot-id": initial_snapshot_id, "type": "tag", "max-ref-age-ms": 10000000}}, | ||
} | ||
|
||
@pytest.fixture | ||
def table_v2_with_extensive_snapshots(example_table_metadata_v2_with_extensive_snapshots: Dict[str, Any]) -> Table: | ||
table_metadata = TableMetadataV2(**example_table_metadata_v2_with_extensive_snapshots) | ||
return Table( | ||
identifier=("database", "table"), | ||
metadata=table_metadata, | ||
metadata_location=f"{table_metadata.location}/uuid.metadata.json", | ||
io=load_file_io(location=f"{table_metadata.location}/uuid.metadata.json"), | ||
catalog=NoopCatalog("NoopCatalog"), | ||
) | ||
|
||
def test_remove_snapshot(table_v2_with_extensive_snapshots: Table): | ||
table = table_v2_with_extensive_snapshots | ||
table.catalog = MockCatalog("MockCatalog") | ||
|
||
# Verify the table has metadata and a current snapshot before proceeding | ||
assert table.metadata is not None, "Table metadata is None" | ||
assert table.metadata.current_snapshot_id is not None, "Current snapshot ID is None" | ||
|
||
snapshot_to_expire = 3051729675574599003 | ||
|
||
# Ensure the table has snapshots | ||
assert table.metadata.snapshots is not None, "Snapshots list is None" | ||
assert len(table.metadata.snapshots) == 2000, f"Expected 2000 snapshots, got {len(table.metadata.snapshots)}" | ||
|
||
assert snapshot_to_expire is not None, "No valid snapshot found to expire" | ||
|
||
# Remove a snapshot using the expire_snapshots API | ||
table.expire_snapshots().expire_snapshot_id(snapshot_to_expire).commit() | ||
|
||
# Verify the snapshot was removed | ||
assert snapshot_to_expire not in [snapshot.snapshot_id for snapshot in table.metadata.snapshots], \ | ||
f"Snapshot ID {snapshot_to_expire} was not removed" | ||
|
||
# Use the built-in pytest capsys fixture to capture printed output | ||
print(f"Snapshot ID {snapshot_to_expire} expired successfully") | ||
print(f"Number of snapshots after expiry: {table.metadata}") |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.