Skip to content

Commit e8bdc87

Browse files
stevie9868Yingjian Wu
and
Yingjian Wu
authored
refactor partition_summary_limit into SnapshotSummaryCollector constr… (#1940)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> Closes #1779 # Rationale for this change See [issue](#1779 (comment)) # Are these changes tested? Tested locally # Are there any user-facing changes? No <!-- In the case of user-facing changes, please add the changelog label. --> --------- Co-authored-by: Yingjian Wu <yingjianw@netflix.com>
1 parent 07de7bc commit e8bdc87

File tree

3 files changed

+38
-4
lines changed

3 files changed

+38
-4
lines changed

pyiceberg/table/snapshots.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,10 +272,10 @@ class SnapshotSummaryCollector:
272272
partition_metrics: DefaultDict[str, UpdateMetrics]
273273
max_changed_partitions_for_summaries: int
274274

275-
def __init__(self) -> None:
275+
def __init__(self, partition_summary_limit: int = 0) -> None:
276276
self.metrics = UpdateMetrics()
277277
self.partition_metrics = defaultdict(UpdateMetrics)
278-
self.max_changed_partitions_for_summaries = 0
278+
self.max_changed_partitions_for_summaries = partition_summary_limit
279279

280280
def set_partition_summary_limit(self, limit: int) -> None:
281281
self.max_changed_partitions_for_summaries = limit

pyiceberg/table/update/snapshot.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,13 +203,12 @@ def _write_delete_manifest() -> List[ManifestFile]:
203203
def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
204204
from pyiceberg.table import TableProperties
205205

206-
ssc = SnapshotSummaryCollector()
207206
partition_summary_limit = int(
208207
self._transaction.table_metadata.properties.get(
209208
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
210209
)
211210
)
212-
ssc.set_partition_summary_limit(partition_summary_limit)
211+
ssc = SnapshotSummaryCollector(partition_summary_limit=partition_summary_limit)
213212

214213
for data_file in self._added_data_files:
215214
ssc.add_file(

tests/table/test_snapshots.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,41 @@ def test_snapshot_summary_collector_with_partition() -> None:
218218
}
219219

220220

221+
@pytest.mark.integration
222+
def test_snapshot_summary_collector_with_partition_limit_in_constructor() -> None:
223+
# Given
224+
partition_summary_limit = 10
225+
ssc = SnapshotSummaryCollector(partition_summary_limit=partition_summary_limit)
226+
227+
assert ssc.build() == {}
228+
schema = Schema(
229+
NestedField(field_id=1, name="bool_field", field_type=BooleanType(), required=False),
230+
NestedField(field_id=2, name="string_field", field_type=StringType(), required=False),
231+
NestedField(field_id=3, name="int_field", field_type=IntegerType(), required=False),
232+
)
233+
spec = PartitionSpec(PartitionField(source_id=3, field_id=1001, transform=IdentityTransform(), name="int_field"))
234+
data_file_1 = DataFile.from_args(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record(1))
235+
data_file_2 = DataFile.from_args(content=DataFileContent.DATA, record_count=200, file_size_in_bytes=4321, partition=Record(2))
236+
237+
# When
238+
ssc.add_file(data_file=data_file_1, schema=schema, partition_spec=spec)
239+
ssc.remove_file(data_file=data_file_1, schema=schema, partition_spec=spec)
240+
ssc.remove_file(data_file=data_file_2, schema=schema, partition_spec=spec)
241+
242+
# Then
243+
assert ssc.build() == {
244+
"added-files-size": "1234",
245+
"removed-files-size": "5555",
246+
"added-data-files": "1",
247+
"deleted-data-files": "2",
248+
"added-records": "100",
249+
"deleted-records": "300",
250+
"changed-partition-count": "2",
251+
"partitions.int_field=1": "added-files-size=1234,removed-files-size=1234,added-data-files=1,deleted-data-files=1,added-records=100,deleted-records=100",
252+
"partitions.int_field=2": "removed-files-size=4321,deleted-data-files=1,deleted-records=200",
253+
}
254+
255+
221256
def test_merge_snapshot_summaries_empty() -> None:
222257
assert update_snapshot_summaries(Summary(Operation.APPEND)) == Summary(
223258
operation=Operation.APPEND,

0 commit comments

Comments
 (0)