From 9154c50fa5a1d004dc6f71c92a55282e6f8951f7 Mon Sep 17 00:00:00 2001 From: Connor McArthur Date: Thu, 19 Jun 2025 13:34:45 -0400 Subject: [PATCH 1/5] feature: expire snapshots action --- crates/iceberg/src/lib.rs | 2 + .../src/maintenance/expire_snapshots.rs | 1144 +++++++++++++++++ crates/iceberg/src/maintenance/mod.rs | 26 + .../src/spec/table_metadata_builder.rs | 9 +- crates/iceberg/src/transaction/mod.rs | 2 +- 5 files changed, 1181 insertions(+), 2 deletions(-) create mode 100644 crates/iceberg/src/maintenance/expire_snapshots.rs create mode 100644 crates/iceberg/src/maintenance/mod.rs diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 06e39156f..3534b05a6 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -89,3 +89,5 @@ pub mod writer; mod delete_vector; pub mod puffin; + +pub mod maintenance; diff --git a/crates/iceberg/src/maintenance/expire_snapshots.rs b/crates/iceberg/src/maintenance/expire_snapshots.rs new file mode 100644 index 000000000..bf23e5880 --- /dev/null +++ b/crates/iceberg/src/maintenance/expire_snapshots.rs @@ -0,0 +1,1144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Expire snapshots maintenance operation +//! +//! This module implements the expire snapshots operation that removes old snapshots +//! and their associated metadata files from the table while keeping the table +//! in a consistent state. + +use std::collections::HashSet; + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; + +use crate::error::Result; +use crate::runtime::JoinHandle; +use crate::spec::SnapshotRef; +use crate::table::Table; +use crate::transaction::Transaction; +use crate::{Catalog, Error, ErrorKind, TableUpdate}; + +/// Result of the expire snapshots operation. Contains information about how many files were +/// deleted. +#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ExpireSnapshotsResult { + /// Number of data files deleted. Data file deletion is not supported by this action yet, this + /// will always be 0. + pub deleted_data_files_count: u64, + /// Number of position delete files deleted. Position delete file deletion is not supported by + /// this action yet, this will always be 0. + pub deleted_position_delete_files_count: u64, + /// Number of equality delete files deleted. Equality delete file deletion is not supported by + /// this action yet, this will always be 0. + pub deleted_equality_delete_files_count: u64, + /// Number of manifest files deleted + pub deleted_manifest_files_count: u64, + /// Number of manifest list files deleted + pub deleted_manifest_lists_count: u64, + /// Number of statistics files deleted. Statistics file deletion is not supported by this action + /// yet, this will always be 0. + pub deleted_statistics_files_count: u64, +} + +/// Configuration for the expire snapshots operation +#[derive(Debug, Clone)] +pub struct ExpireSnapshotsConfig { + /// Timestamp in milliseconds. Snapshots older than this will be expired + pub older_than_ms: Option, + /// Minimum number of snapshots to retain + pub retain_last: Option, + /// Maximum number of concurrent file deletions + pub max_concurrent_deletes: Option, + /// Specific snapshot IDs to expire + pub snapshot_ids: Vec, + /// Whether to perform a dry run. If true, the operation will not delete any files, but will + /// still identify the files to delete and return the result. + pub dry_run: bool, +} + +impl Default for ExpireSnapshotsConfig { + fn default() -> Self { + Self { + older_than_ms: None, + retain_last: Some(1), // Default to retaining at least 1 snapshot + max_concurrent_deletes: None, + snapshot_ids: vec![], + dry_run: false, + } + } +} + +/// Trait for performing expire snapshots operations +/// +/// This trait provides a low-level API for expiring snapshots that can be +/// extended with different implementations for different environments. +#[async_trait] +pub trait ExpireSnapshots: Send + Sync { + /// Execute the expire snapshots operation + async fn execute(&self, catalog: &dyn Catalog) -> Result; +} + +/// Implementation of the expire snapshots operation +pub struct ExpireSnapshotsAction { + table: Table, + config: ExpireSnapshotsConfig, +} + +impl ExpireSnapshotsAction { + /// Create a new expire snapshots action + pub fn new(table: Table) -> Self { + Self { + table, + config: ExpireSnapshotsConfig::default(), + } + } + + /// Set the timestamp threshold for expiring snapshots + pub fn expire_older_than(mut self, timestamp_ms: i64) -> Self { + self.config.older_than_ms = Some(timestamp_ms); + self + } + + /// Set the dry run flag + pub fn dry_run(mut self, dry_run: bool) -> Self { + self.config.dry_run = dry_run; + self + } + + /// Set the minimum number of snapshots to retain. If the number of snapshots is less than 1, + /// it will be automatically adjusted to 1, following the behavior in Spark. + pub fn retain_last(mut self, num_snapshots: u32) -> Self { + if num_snapshots < 1 { + self.config.retain_last = Some(1); + } else { + self.config.retain_last = Some(num_snapshots); + } + self + } + + /// Set specific snapshot IDs to expire. An empty list is equivalent to the default behavior + /// of expiring all but `retain_last` snapshots! When only expiring specific snapshots, please + /// ensure that the list of snapshot IDs is non-empty before using this method. + pub fn expire_snapshot_ids(mut self, snapshot_ids: Vec) -> Self { + self.config.snapshot_ids = snapshot_ids; + self + } + + /// Set the maximum number of concurrent file deletions + pub fn max_concurrent_deletes(mut self, max_deletes: u32) -> Self { + if max_deletes > 0 { + self.config.max_concurrent_deletes = Some(max_deletes); + } + self + } + + /// Determine which snapshots should be expired based on the configuration. This will: + /// + /// - Sort snapshots by timestamp (oldest first) + /// - Apply filters if supplied. If multiple filters are supplied, the result will be the + /// intersection of the results of each filter. + /// - If specific snapshot IDs are provided, only expire those + /// - If `older_than_ms` is provided, expire snapshots older than this timestamp + /// - If `retain_last` is provided, retain the last `retain_last` snapshots + /// - Never expire the current snapshot! + /// + /// Returns a Vec of SnapshotRefs that should be expired (references removed, and deleted). + fn identify_snapshots_to_expire(&self) -> Result> { + let metadata = self.table.metadata(); + let all_snapshots: Vec = metadata.snapshots().cloned().collect(); + + if all_snapshots.is_empty() { + return Ok(vec![]); + } + + if !self.config.snapshot_ids.is_empty() { + let snapshot_id_set: HashSet = self.config.snapshot_ids.iter().cloned().collect(); + let snapshots_to_expire: Vec = all_snapshots + .into_iter() + .filter(|snapshot| snapshot_id_set.contains(&snapshot.snapshot_id())) + .collect(); + + if let Some(current_snapshot_id) = metadata.current_snapshot_id() { + if snapshot_id_set.contains(¤t_snapshot_id) { + return Err(Error::new( + ErrorKind::DataInvalid, + "Cannot expire the current snapshot", + )); + } + } + + return Ok(snapshots_to_expire); + } + + let mut sorted_snapshots = all_snapshots; + sorted_snapshots.sort_by_key(|snapshot| snapshot.timestamp_ms()); + + let mut snapshots_to_expire = vec![]; + let retain_last = self.config.retain_last.unwrap_or(1) as usize; + + if sorted_snapshots.len() <= retain_last { + return Ok(vec![]); + } + + let mut candidates = sorted_snapshots; + + candidates.truncate(candidates.len().saturating_sub(retain_last)); + + if let Some(older_than_ms) = self.config.older_than_ms { + candidates.retain(|snapshot| snapshot.timestamp_ms() < older_than_ms); + } + + // NEVER expire the current snapshot! + if let Some(current_snapshot_id) = metadata.current_snapshot_id() { + candidates.retain(|snapshot| snapshot.snapshot_id() != current_snapshot_id); + } + + snapshots_to_expire.extend(candidates); + + Ok(snapshots_to_expire) + } + + /// Collect all files that should be deleted along with the expired snapshots + async fn collect_files_to_delete( + &self, + expired_snapshots: &[SnapshotRef], + ) -> Result> { + let mut files_to_delete = Vec::new(); + let file_io = self.table.file_io(); + let metadata = self.table.metadata(); + + // Collect files from snapshots that are being expired + let mut expired_manifest_lists = HashSet::new(); + let mut expired_manifests = HashSet::new(); + + for snapshot in expired_snapshots { + expired_manifest_lists.insert(snapshot.manifest_list().to_string()); + + match snapshot.load_manifest_list(file_io, metadata).await { + Ok(manifest_list) => { + for manifest_entry in manifest_list.entries() { + expired_manifests.insert(manifest_entry.manifest_path.clone()); + } + } + Err(e) => { + // Log warning but continue - the manifest list file might already be deleted + eprintln!( + "Warning: Failed to load manifest list {}: {}", + snapshot.manifest_list(), + e + ); + } + } + } + + // Collect files that are still referenced by remaining snapshots + let remaining_snapshots: Vec = metadata + .snapshots() + .filter(|snapshot| { + !expired_snapshots + .iter() + .any(|exp| exp.snapshot_id() == snapshot.snapshot_id()) + }) + .cloned() + .collect(); + + let mut still_referenced_manifest_lists = HashSet::new(); + let mut still_referenced_manifests = HashSet::new(); + + for snapshot in &remaining_snapshots { + still_referenced_manifest_lists.insert(snapshot.manifest_list().to_string()); + + match snapshot.load_manifest_list(file_io, metadata).await { + Ok(manifest_list) => { + for manifest_entry in manifest_list.entries() { + still_referenced_manifests.insert(manifest_entry.manifest_path.clone()); + } + } + Err(e) => { + // Log warning but continue + eprintln!( + "Warning: Failed to load manifest list {}: {}", + snapshot.manifest_list(), + e + ); + } + } + } + + for manifest_list_path in expired_manifest_lists { + if !still_referenced_manifest_lists.contains(&manifest_list_path) { + files_to_delete.push(manifest_list_path); + } + } + + for manifest_path in expired_manifests { + if !still_referenced_manifests.contains(&manifest_path) { + files_to_delete.push(manifest_path); + } + } + + Ok(files_to_delete) + } + + async fn process_file_deletion(&self, result: &mut ExpireSnapshotsResult, file_path: String) { + if file_path.ends_with(".avro") && file_path.contains("snap-") { + result.deleted_manifest_lists_count += 1; + } else if file_path.ends_with(".avro") { + result.deleted_manifest_files_count += 1; + } + } + + /// Delete files concurrently with respect to max_concurrent_deletes setting + /// Should not be called if dry_run is true, but this is checked for extra safety. + async fn delete_files(&self, files_to_delete: Vec) -> Result { + let mut result = ExpireSnapshotsResult::default(); + + if self.config.dry_run { + for file_path in files_to_delete { + self.process_file_deletion(&mut result, file_path).await; + } + return Ok(result); + } + + let file_io = self.table.file_io(); + + if files_to_delete.is_empty() { + return Ok(result); + } + + let num_concurrent_deletes = self.config.max_concurrent_deletes.unwrap_or(1) as usize; + let mut delete_tasks: Vec>>> = + Vec::with_capacity(num_concurrent_deletes); + + eprintln!("Num concurrent deletes: {}", num_concurrent_deletes); + + for task_index in 0..num_concurrent_deletes { + // Ideally we'd use a semaphore here to allow each thread to delete as fast as possible. + // However, we can't assume that tokio::sync::Semaphore is available, and AsyncStd + // does not appear to have a usable Semaphore. Instead, we'll pre-sort the files into + // `num_concurrent_deletes` equal size chunks and spawn a task for each chunk. + let task_file_paths: Vec = files_to_delete + .iter() + .skip(task_index) + .step_by(num_concurrent_deletes) + .cloned() + .collect(); + let file_io_clone = file_io.clone(); + let task = crate::runtime::spawn(async move { + let mut results: Vec> = Vec::new(); + for file_path in task_file_paths { + match file_io_clone.delete(&file_path).await { + Ok(_) => { + eprintln!("Deleted file: {:?}", file_path); + results.push(Ok(file_path)); + } + Err(e) => { + eprintln!("Error deleting file: {:?}", e); + results.push(Err(e)); + } + } + } + results + }); + + delete_tasks.push(task); + } + + for task in delete_tasks { + let file_delete_results = task.await; + for file_delete_result in file_delete_results { + eprintln!("Deleted file: {:?}", file_delete_result); + match file_delete_result { + Ok(deleted_path) => { + self.process_file_deletion(&mut result, deleted_path).await; + } + Err(e) => { + eprintln!("Warning: File deletion task failed: {}", e); + } + } + } + } + + Ok(result) + } +} + +#[async_trait] +impl ExpireSnapshots for ExpireSnapshotsAction { + /// The main entrypoint for the expire snapshots action. This will: + /// + /// - Validate the table state + /// - Identify snapshots to expire + /// - Update the table metadata to remove expired snapshots + /// - Collect files to delete + /// - Delete the files + async fn execute(&self, catalog: &dyn Catalog) -> Result { + if self.table.readonly() { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Cannot expire snapshots on a readonly table", + )); + } + + let snapshots_to_expire = self.identify_snapshots_to_expire()?; + + if snapshots_to_expire.is_empty() { + return Ok(ExpireSnapshotsResult::default()); + } + + let files_to_delete = self.collect_files_to_delete(&snapshots_to_expire).await?; + + if self.config.dry_run { + let mut result = ExpireSnapshotsResult::default(); + for file_path in files_to_delete { + self.process_file_deletion(&mut result, file_path).await; + } + return Ok(result); + } + + // update the table metadata to remove the expired snapshots _before_ deleting anything! + // TODO: make this retry + let mut transaction = Transaction::new(&self.table); + let mut snapshot_ids: Vec = snapshots_to_expire + .iter() + .map(|s| s.snapshot_id()) + .collect(); + // sort for a deterministic output + snapshot_ids.sort(); + transaction.apply( + vec![TableUpdate::RemoveSnapshots { snapshot_ids }], + // no requirements here. if the table's main branch was rewound while this operation is + // running, this will potentially corrupt the table by deleting the wrong snapshots. + // but, if this fails because of a concurrent update, we have to repeat the logic. + // TODO: verify that this is running against the latest metadata version. if the commit + // fails, refresh the metadata and try again if and only if new snapshots were added. + vec![], + )?; + transaction.commit(catalog).await?; + + let result = self.delete_files(files_to_delete).await?; + + Ok(result) + } +} + +/// Builder for creating expire snapshots operations +pub struct ExpireSnapshotsBuilder { + table: Table, +} + +impl ExpireSnapshotsBuilder { + /// Create a new builder for the given table + pub fn new(table: Table) -> Self { + Self { table } + } + + /// Build an expire snapshots action with default configuration + pub fn build(self) -> ExpireSnapshotsAction { + ExpireSnapshotsAction::new(self.table) + } +} + +// Extension trait to add expire_snapshots method to Table +impl Table { + /// Create a new expire snapshots builder for this table + pub fn expire_snapshots(&self) -> ExpireSnapshotsBuilder { + ExpireSnapshotsBuilder::new(self.clone()) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::{Arc, Mutex}; + + use chrono::Utc; + use uuid::Uuid; + + use super::*; + use crate::io::{FileIOBuilder, OutputFile}; + use crate::spec::{ + DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, ManifestEntry, + ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, Operation, + PrimitiveType, Schema, Snapshot, Struct, Summary, TableMetadataBuilder, Type, + }; + use crate::table::{Table, TableBuilder}; + use crate::{Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent}; + + #[derive(Debug)] + struct MockCatalog { + table: Table, + update_table_calls: Mutex>, + } + + impl MockCatalog { + fn new(table: Table) -> Self { + Self { + table, + update_table_calls: Mutex::new(vec![]), + } + } + } + + #[async_trait] + impl Catalog for MockCatalog { + async fn load_table(&self, _table_ident: &TableIdent) -> Result { + Ok(self.table.clone()) + } + + async fn drop_table(&self, _table_ident: &TableIdent) -> Result<()> { + unimplemented!() + } + + async fn table_exists(&self, _table_ident: &TableIdent) -> Result { + unimplemented!() + } + + async fn rename_table( + &self, + _src_table_ident: &TableIdent, + _dst_table_ident: &TableIdent, + ) -> Result<()> { + unimplemented!() + } + + async fn update_table(&self, commit: TableCommit) -> Result
{ + self.update_table_calls.lock().unwrap().push(commit); + Ok(self.table.clone()) + } + + async fn create_table( + &self, + _namespace: &NamespaceIdent, + _creation: TableCreation, + ) -> Result
{ + unimplemented!() + } + + async fn list_tables(&self, _namespace: &NamespaceIdent) -> Result> { + unimplemented!() + } + + async fn create_namespace( + &self, + _namespace: &NamespaceIdent, + _properties: HashMap, + ) -> Result { + unimplemented!() + } + + async fn get_namespace(&self, _namespace: &NamespaceIdent) -> Result { + unimplemented!() + } + + async fn list_namespaces( + &self, + _parent: Option<&NamespaceIdent>, + ) -> Result> { + unimplemented!() + } + + async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> Result { + unimplemented!() + } + + async fn update_namespace( + &self, + _namespace: &NamespaceIdent, + _properties: HashMap, + ) -> Result<()> { + unimplemented!() + } + + async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> { + unimplemented!() + } + } + + struct TableTestFixture { + pub table: Table, + pub base_time: i64, + } + + impl TableTestFixture { + async fn new() -> Self { + let file_io = FileIOBuilder::new("memory").build().unwrap(); + + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + // rewind time to allow transaction to succeed, since it autogenerates a new timestamp + let base_time = Utc::now().timestamp_millis() - (24 * 60 * 60 * 1000); + + let table_metadata_builder = TableMetadataBuilder::new( + schema, + crate::spec::PartitionSpec::unpartition_spec(), + crate::spec::SortOrder::unsorted_order(), + "memory://test/table".to_string(), + FormatVersion::V2, + HashMap::new(), + ) + .unwrap(); + let table_metadata_builder = table_metadata_builder.set_last_updated_ms(base_time); + + let metadata = table_metadata_builder.build().unwrap().metadata; + + let table = TableBuilder::new() + .metadata(Arc::new(metadata)) + .identifier(TableIdent::from_strs(["test", "table"]).unwrap()) + .file_io(file_io) + .build() + .unwrap(); + + Self { table, base_time } + } + + fn next_manifest_file(&self) -> OutputFile { + self.table + .file_io() + .new_output(format!( + "memory://test/table/metadata/manifest_{}.avro", + Uuid::new_v4() + )) + .unwrap() + } + + fn next_manifest_list_file(&self) -> OutputFile { + self.table + .file_io() + .new_output(format!( + "memory://test/table/metadata/snap-{}-manifest-list.avro", + Uuid::new_v4() + )) + .unwrap() + } + + fn next_data_file_path(&self) -> String { + format!("memory://test/table/data/data_{}.parquet", Uuid::new_v4()) + } + + async fn add_snapshot(&mut self, snapshot_id: i64) { + eprintln!("Adding snapshot: {}", snapshot_id); + let parent_id = if snapshot_id == 1000 { + None + } else { + Some(snapshot_id - 1) + }; + eprintln!("Parent id: {:?}", parent_id); + + let manifest_file = self.next_manifest_file(); + let manifest_list_file = self.next_manifest_list_file(); + let manifest_list_file_path = manifest_list_file.location().to_string(); + + let mut writer = ManifestWriterBuilder::new( + manifest_file, + Some(snapshot_id), + None, + self.table.metadata().current_schema().clone(), + self.table + .metadata() + .default_partition_spec() + .as_ref() + .clone(), + ) + .build_v2_data(); + + writer + .add_entry( + ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(self.next_data_file_path()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([])) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + let data_file_manifest = writer.write_manifest_file().await.unwrap(); + + // Write to manifest list + let mut manifest_list_write = + ManifestListWriter::v2(manifest_list_file, snapshot_id, parent_id, snapshot_id + 1); + manifest_list_write + .add_manifests(vec![data_file_manifest].into_iter()) + .unwrap(); + manifest_list_write.close().await.unwrap(); + + let snapshot = Snapshot::builder() + .with_snapshot_id(snapshot_id) + .with_parent_snapshot_id(parent_id) + .with_sequence_number(snapshot_id + 1) + .with_timestamp_ms(self.base_time + (snapshot_id * 60000)) + .with_manifest_list(manifest_list_file_path) + .with_summary(Summary { + operation: Operation::Append, + additional_properties: HashMap::new(), + }) + .with_schema_id(0) + .build(); + + let mut table_metadata_builder = + TableMetadataBuilder::new_from_metadata(self.table.metadata().clone(), None); + + table_metadata_builder = table_metadata_builder.add_snapshot(snapshot).unwrap(); + + table_metadata_builder = table_metadata_builder.assign_uuid(Uuid::new_v4()); + table_metadata_builder = table_metadata_builder + .set_ref(crate::spec::MAIN_BRANCH, crate::spec::SnapshotReference { + snapshot_id, + retention: crate::spec::SnapshotRetention::Branch { + min_snapshots_to_keep: Some(10), + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }) + .unwrap(); + + let metadata = table_metadata_builder.build().unwrap().metadata; + + let mut table = self.table.clone(); + table.with_metadata(Arc::new(metadata)); + + self.table = table; + } + } + + /// Helper function to create a table with snapshots + async fn create_table_with_snapshots(snapshot_count: usize) -> Table { + let mut table_fixture = TableTestFixture::new().await; + + for i in 0..snapshot_count { + table_fixture.add_snapshot((i + 1000) as i64).await; + } + + table_fixture.table + } + + #[tokio::test] + async fn test_expire_snapshots_zero_snapshots() { + let table = create_table_with_snapshots(0).await; + let catalog = MockCatalog::new(table.clone()); + + let action = table.expire_snapshots().build(); + let result = action.execute(&catalog).await.unwrap(); + + // Should complete successfully with no deletions + assert_eq!(result.deleted_data_files_count, 0); + assert_eq!(result.deleted_manifest_files_count, 0); + assert_eq!(result.deleted_manifest_lists_count, 0); + assert_eq!(result.deleted_position_delete_files_count, 0); + assert_eq!(result.deleted_equality_delete_files_count, 0); + assert_eq!(result.deleted_statistics_files_count, 0); + + assert_eq!(catalog.update_table_calls.lock().unwrap().len(), 0); + } + + #[tokio::test] + async fn test_expire_snapshots_one_snapshot() { + let table = create_table_with_snapshots(1).await; + let catalog = MockCatalog::new(table.clone()); + + let action = table.expire_snapshots().build(); + let result = action.execute(&catalog).await.unwrap(); + + assert_eq!(result.deleted_data_files_count, 0); + assert_eq!(result.deleted_manifest_files_count, 0); + assert_eq!(result.deleted_manifest_lists_count, 0); + assert_eq!(result.deleted_position_delete_files_count, 0); + assert_eq!(result.deleted_equality_delete_files_count, 0); + assert_eq!(result.deleted_statistics_files_count, 0); + + assert_eq!(catalog.update_table_calls.lock().unwrap().len(), 0); + } + + #[tokio::test] + async fn test_expire_snapshots_many_snapshots_retain_last() { + let table = create_table_with_snapshots(5).await; + let catalog = MockCatalog::new(table.clone()); + + let action = table.expire_snapshots().build().retain_last(2); + + let snapshots_to_expire = action.identify_snapshots_to_expire().unwrap(); + + assert_eq!(snapshots_to_expire.len(), 3); + + let mut ids_to_expire: Vec = snapshots_to_expire + .iter() + .map(|s| s.snapshot_id()) + .collect(); + + ids_to_expire.sort(); + + assert_eq!(ids_to_expire, vec![1000, 1001, 1002]); + + let result = action.execute(&catalog).await.unwrap(); + + assert_eq!(result.deleted_manifest_lists_count, 3); + assert_eq!(result.deleted_manifest_files_count, 3); + assert_eq!(result.deleted_data_files_count, 0); + assert_eq!(result.deleted_position_delete_files_count, 0); + assert_eq!(result.deleted_equality_delete_files_count, 0); + + assert_eq!(catalog.update_table_calls.lock().unwrap().len(), 1); + + let mut commit = catalog.update_table_calls.lock().unwrap().pop().unwrap(); + let updates = commit.take_updates(); + assert_eq!(updates.len(), 1); + assert_eq!(updates[0], TableUpdate::RemoveSnapshots { + snapshot_ids: vec![1000, 1001, 1002] + }); + } + + #[tokio::test] + async fn test_expire_snapshots_older_than_timestamp() { + // Test case 3b: Table with many snapshots, expire based on timestamp + let table = create_table_with_snapshots(5).await; + let catalog = MockCatalog::new(table.clone()); + + // Get the timestamp of the middle snapshot and use it as threshold + let middle_timestamp = table + .metadata() + .snapshots() + .find(|s| s.snapshot_id() == 1002) + .unwrap() + .timestamp_ms(); + + let action = table + .expire_snapshots() + .build() + .expire_older_than(middle_timestamp) + .retain_last(1); // Keep at least 1 + + let snapshots_to_expire = action.identify_snapshots_to_expire().unwrap(); + + // Should expire snapshots older than the middle one, but keep at least 1 + // So we expect snapshots 1000 and 1001 to be expired + assert_eq!(snapshots_to_expire.len(), 2); + + let mut ids_to_expire: Vec = snapshots_to_expire + .iter() + .map(|s| s.snapshot_id()) + .collect(); + ids_to_expire.sort(); + + assert_eq!(ids_to_expire, vec![1000, 1001]); + + let result = action.execute(&catalog).await.unwrap(); + + assert_eq!(result.deleted_manifest_lists_count, 2); + assert_eq!(result.deleted_manifest_files_count, 2); + assert_eq!(result.deleted_data_files_count, 0); + assert_eq!(result.deleted_position_delete_files_count, 0); + assert_eq!(result.deleted_equality_delete_files_count, 0); + assert_eq!(result.deleted_statistics_files_count, 0); + + assert_eq!(catalog.update_table_calls.lock().unwrap().len(), 1); + + let mut commit = catalog.update_table_calls.lock().unwrap().pop().unwrap(); + let updates = commit.take_updates(); + assert_eq!(updates.len(), 1); + assert_eq!(updates[0], TableUpdate::RemoveSnapshots { + snapshot_ids: vec![1000, 1001] + }); + } + + #[tokio::test] + async fn test_expire_specific_snapshot_ids() { + let table = create_table_with_snapshots(5).await; + let catalog = MockCatalog::new(table.clone()); + + let action = table + .expire_snapshots() + .build() + .expire_snapshot_ids(vec![1001, 1003]); + + let snapshots_to_expire = action.identify_snapshots_to_expire().unwrap(); + + assert_eq!(snapshots_to_expire.len(), 2); + + let mut ids_to_expire: Vec = snapshots_to_expire + .iter() + .map(|s| s.snapshot_id()) + .collect(); + + ids_to_expire.sort(); + + assert_eq!(ids_to_expire, vec![1001, 1003]); + + let result = action.execute(&catalog).await.unwrap(); + + assert_eq!(result.deleted_manifest_lists_count, 2); + assert_eq!(result.deleted_manifest_files_count, 2); + assert_eq!(result.deleted_data_files_count, 0); + assert_eq!(result.deleted_position_delete_files_count, 0); + assert_eq!(result.deleted_equality_delete_files_count, 0); + assert_eq!(result.deleted_statistics_files_count, 0); + + assert_eq!(catalog.update_table_calls.lock().unwrap().len(), 1); + + let mut commit = catalog.update_table_calls.lock().unwrap().pop().unwrap(); + let updates = commit.take_updates(); + assert_eq!(updates.len(), 1); + assert_eq!(updates[0], TableUpdate::RemoveSnapshots { + snapshot_ids: vec![1001, 1003] + }); + } + + #[tokio::test] + async fn test_expire_current_snapshot_error() { + let table = create_table_with_snapshots(3).await; + let current_snapshot_id = table.metadata().current_snapshot_id().unwrap(); + + let action = table + .expire_snapshots() + .build() + .expire_snapshot_ids(vec![current_snapshot_id]); + + let result = action.identify_snapshots_to_expire(); + + assert!(result.is_err()); + let error = result.unwrap_err(); + assert_eq!(error.kind(), ErrorKind::DataInvalid); + assert!( + error + .message() + .contains("Cannot expire the current snapshot") + ); + } + + #[tokio::test] + async fn test_expire_readonly_table_error() { + let table = create_table_with_snapshots(3).await; + let catalog = MockCatalog::new(table.clone()); + + let readonly_table = TableBuilder::new() + .metadata(table.metadata_ref()) + .identifier(table.identifier().clone()) + .file_io(table.file_io().clone()) + .readonly(true) + .build() + .unwrap(); + + let action = readonly_table.expire_snapshots().build(); + let result = action.execute(&catalog).await; + + assert!(result.is_err()); + let error = result.unwrap_err(); + assert_eq!(error.kind(), ErrorKind::FeatureUnsupported); + assert!( + error + .message() + .contains("Cannot expire snapshots on a readonly table") + ); + } + + #[tokio::test] + async fn test_retain_last_minimum_validation() { + let table = create_table_with_snapshots(3).await; + let catalog = MockCatalog::new(table.clone()); + + let action = table.expire_snapshots().build().retain_last(0); + + assert_eq!(action.config.retain_last, Some(1)); + + let result = action.execute(&catalog).await.unwrap(); + + assert_eq!(result.deleted_manifest_lists_count, 2); + assert_eq!(result.deleted_manifest_files_count, 2); + assert_eq!(result.deleted_data_files_count, 0); + assert_eq!(result.deleted_position_delete_files_count, 0); + assert_eq!(result.deleted_equality_delete_files_count, 0); + assert_eq!(result.deleted_statistics_files_count, 0); + + assert_eq!(catalog.update_table_calls.lock().unwrap().len(), 1); + + let mut commit = catalog.update_table_calls.lock().unwrap().pop().unwrap(); + let updates = commit.take_updates(); + assert_eq!(updates.len(), 1); + assert_eq!(updates[0], TableUpdate::RemoveSnapshots { + snapshot_ids: vec![1000, 1001] + }); + } + + #[tokio::test] + async fn test_max_concurrent_deletes_configuration() { + let table = create_table_with_snapshots(3).await; + let catalog = MockCatalog::new(table.clone()); + + let action = table.expire_snapshots().build().max_concurrent_deletes(5); + + assert_eq!(action.config.max_concurrent_deletes, Some(5)); + + let action2 = table.expire_snapshots().build().max_concurrent_deletes(0); + + assert_eq!(action2.config.max_concurrent_deletes, None); + + let result = action.execute(&catalog).await.unwrap(); + + assert_eq!(result.deleted_manifest_lists_count, 2); + assert_eq!(result.deleted_manifest_files_count, 2); + assert_eq!(result.deleted_data_files_count, 0); + + assert_eq!(catalog.update_table_calls.lock().unwrap().len(), 1); + + let mut commit = catalog.update_table_calls.lock().unwrap().pop().unwrap(); + let updates = commit.take_updates(); + assert_eq!(updates.len(), 1); + assert_eq!(updates[0], TableUpdate::RemoveSnapshots { + snapshot_ids: vec![1000, 1001] + }); + } + + #[tokio::test] + async fn test_empty_snapshot_ids_list() { + let table = create_table_with_snapshots(3).await; + let catalog = MockCatalog::new(table.clone()); + + let action = table.expire_snapshots().build().expire_snapshot_ids(vec![]); + + let snapshots_to_expire = action.identify_snapshots_to_expire().unwrap(); + + assert_eq!(snapshots_to_expire.len(), 2); + + let result = action.execute(&catalog).await.unwrap(); + + assert_eq!(result.deleted_manifest_lists_count, 2); + assert_eq!(result.deleted_manifest_files_count, 2); + assert_eq!(result.deleted_data_files_count, 0); + + assert_eq!(catalog.update_table_calls.lock().unwrap().len(), 1); + + let mut commit = catalog.update_table_calls.lock().unwrap().pop().unwrap(); + let updates = commit.take_updates(); + assert_eq!(updates.len(), 1); + assert_eq!(updates[0], TableUpdate::RemoveSnapshots { + snapshot_ids: vec![1000, 1001] + }); + } + + #[tokio::test] + async fn test_nonexistent_snapshot_ids() { + let table = create_table_with_snapshots(3).await; + let catalog = MockCatalog::new(table.clone()); + + let action = table + .expire_snapshots() + .build() + .expire_snapshot_ids(vec![9999, 8888]); + + let snapshots_to_expire = action.identify_snapshots_to_expire().unwrap(); + + assert_eq!(snapshots_to_expire.len(), 0); + + let result = action.execute(&catalog).await.unwrap(); + + assert_eq!(result.deleted_manifest_lists_count, 0); + assert_eq!(result.deleted_manifest_files_count, 0); + assert_eq!(result.deleted_data_files_count, 0); + + assert_eq!(catalog.update_table_calls.lock().unwrap().len(), 0); + } + + #[tokio::test] + async fn test_builder_pattern() { + let table = create_table_with_snapshots(5).await; + + let action = table + .expire_snapshots() + .build() + .expire_older_than(Utc::now().timestamp_millis()) + .retain_last(3) + .max_concurrent_deletes(10) + .expire_snapshot_ids(vec![1001, 1002]); + + assert!(action.config.older_than_ms.is_some()); + assert_eq!(action.config.retain_last, Some(3)); + assert_eq!(action.config.max_concurrent_deletes, Some(10)); + assert_eq!(action.config.snapshot_ids, vec![1001, 1002]); + } + + #[tokio::test] + async fn test_collect_files_to_delete_logic() { + let table = create_table_with_snapshots(4).await; + + let action = table.expire_snapshots().build().retain_last(2); + + let snapshots_to_expire = action.identify_snapshots_to_expire().unwrap(); + assert_eq!(snapshots_to_expire.len(), 2); + + let files_to_delete = action + .collect_files_to_delete(&snapshots_to_expire) + .await + .unwrap(); + + assert_eq!(files_to_delete.len(), 4); + + // should be two of each type of file + let manifest_files = files_to_delete.iter().filter(|f| f.contains("manifest_")); + assert_eq!(manifest_files.count(), 2); + + let manifest_list_files = files_to_delete.iter().filter(|f| f.contains("snap-")); + assert_eq!(manifest_list_files.count(), 2); + + let data_files = files_to_delete.iter().filter(|f| f.contains("data_")); + assert_eq!(data_files.count(), 0); + } + + #[tokio::test] + async fn test_default_configuration() { + let table = create_table_with_snapshots(3).await; + let action = table.expire_snapshots().build(); + + assert_eq!(action.config.older_than_ms, None); + assert_eq!(action.config.retain_last, Some(1)); + assert_eq!(action.config.max_concurrent_deletes, None); + assert!(action.config.snapshot_ids.is_empty()); + } + + #[tokio::test] + async fn test_dry_run() { + let table = create_table_with_snapshots(3).await; + let catalog = MockCatalog::new(table.clone()); + let action = table.expire_snapshots().build().dry_run(true); + + let result = action.execute(&catalog).await.unwrap(); + + assert_eq!(result.deleted_manifest_lists_count, 2); + assert_eq!(result.deleted_manifest_files_count, 2); + assert_eq!(result.deleted_data_files_count, 0); + assert_eq!(result.deleted_position_delete_files_count, 0); + assert_eq!(result.deleted_equality_delete_files_count, 0); + assert_eq!(result.deleted_statistics_files_count, 0); + + assert_eq!(catalog.update_table_calls.lock().unwrap().len(), 0); + } +} diff --git a/crates/iceberg/src/maintenance/mod.rs b/crates/iceberg/src/maintenance/mod.rs new file mode 100644 index 000000000..40114ec3e --- /dev/null +++ b/crates/iceberg/src/maintenance/mod.rs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Table maintenance operations for Apache Iceberg +//! +//! This module provides low-level table maintenance operations that can be +//! executed on a single node without requiring Spark or other distributed +//! computing frameworks. + +mod expire_snapshots; + +pub use expire_snapshots::*; diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs index 1f3f89533..4605c8a81 100644 --- a/crates/iceberg/src/spec/table_metadata_builder.rs +++ b/crates/iceberg/src/spec/table_metadata_builder.rs @@ -204,6 +204,13 @@ impl TableMetadataBuilder { self } + /// Set the last updated ms. Intended for use in tests. + #[cfg(test)] + pub(crate) fn set_last_updated_ms(mut self, last_updated_ms: i64) -> Self { + self.last_updated_ms = Some(last_updated_ms); + self + } + /// Upgrade `FormatVersion`. Downgrades are not allowed. /// /// # Errors @@ -1028,7 +1035,7 @@ impl TableMetadataBuilder { /// intermediate snapshot is added to table metadata, it is added to the snapshot log, assuming /// that it will be the current snapshot. when there are multiple snapshot updates, the log must /// be corrected by suppressing the intermediate snapshot entries. - /// + /// /// A snapshot is an intermediate snapshot if it was added but is not the current snapshot. fn get_intermediate_snapshots(&self) -> HashSet { let added_snapshot_ids = self diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index a2af9aaa3..cddab3cfd 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -76,7 +76,7 @@ impl Transaction { Ok(()) } - fn apply( + pub(crate) fn apply( &mut self, updates: Vec, requirements: Vec, From 91e51ff65aceea8b6b9d94e06304605676b495c8 Mon Sep 17 00:00:00 2001 From: Connor McArthur Date: Fri, 20 Jun 2025 09:58:43 -0400 Subject: [PATCH 2/5] add tests covering more realistic table structures, cleanup eprintlns --- .../src/maintenance/expire_snapshots.rs | 309 +++++++++++++++--- 1 file changed, 258 insertions(+), 51 deletions(-) diff --git a/crates/iceberg/src/maintenance/expire_snapshots.rs b/crates/iceberg/src/maintenance/expire_snapshots.rs index bf23e5880..3a7183d07 100644 --- a/crates/iceberg/src/maintenance/expire_snapshots.rs +++ b/crates/iceberg/src/maintenance/expire_snapshots.rs @@ -236,12 +236,10 @@ impl ExpireSnapshotsAction { } } Err(e) => { - // Log warning but continue - the manifest list file might already be deleted - eprintln!( - "Warning: Failed to load manifest list {}: {}", - snapshot.manifest_list(), - e - ); + // If we end up here, the failure mode is that we will skip some snapshots. + // This could happen if the manifest list was already deleted. This is not a + // critical error, so we will just skip this snapshot. It can be cleaned up + // later by a delete orphan files action. } } } @@ -270,12 +268,10 @@ impl ExpireSnapshotsAction { } } Err(e) => { - // Log warning but continue - eprintln!( - "Warning: Failed to load manifest list {}: {}", - snapshot.manifest_list(), - e - ); + // If we end up here, the failure mode is the opposite of the above! We could + // accidentally delete a manifest list that is still referenced by this snapshot. + // We have to return error here, otherwise we risk deleting the wrong files. + return Err(e); } } } @@ -325,8 +321,6 @@ impl ExpireSnapshotsAction { let mut delete_tasks: Vec>>> = Vec::with_capacity(num_concurrent_deletes); - eprintln!("Num concurrent deletes: {}", num_concurrent_deletes); - for task_index in 0..num_concurrent_deletes { // Ideally we'd use a semaphore here to allow each thread to delete as fast as possible. // However, we can't assume that tokio::sync::Semaphore is available, and AsyncStd @@ -344,11 +338,9 @@ impl ExpireSnapshotsAction { for file_path in task_file_paths { match file_io_clone.delete(&file_path).await { Ok(_) => { - eprintln!("Deleted file: {:?}", file_path); results.push(Ok(file_path)); } Err(e) => { - eprintln!("Error deleting file: {:?}", e); results.push(Err(e)); } } @@ -362,13 +354,13 @@ impl ExpireSnapshotsAction { for task in delete_tasks { let file_delete_results = task.await; for file_delete_result in file_delete_results { - eprintln!("Deleted file: {:?}", file_delete_result); match file_delete_result { Ok(deleted_path) => { self.process_file_deletion(&mut result, deleted_path).await; } Err(e) => { - eprintln!("Warning: File deletion task failed: {}", e); + // This is not a critical error, so we will just continue. The result is + // that an orphaned file will be left behind. } } } @@ -474,8 +466,8 @@ mod tests { use crate::io::{FileIOBuilder, OutputFile}; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, ManifestEntry, - ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, Operation, - PrimitiveType, Schema, Snapshot, Struct, Summary, TableMetadataBuilder, Type, + ManifestFile, ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, + Operation, PrimitiveType, Schema, Snapshot, Struct, Summary, TableMetadataBuilder, Type, }; use crate::table::{Table, TableBuilder}; use crate::{Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent}; @@ -637,19 +629,22 @@ mod tests { format!("memory://test/table/data/data_{}.parquet", Uuid::new_v4()) } - async fn add_snapshot(&mut self, snapshot_id: i64) { - eprintln!("Adding snapshot: {}", snapshot_id); + async fn add_snapshot(&mut self, snapshot_id: i64, include_existing_manifests: bool) { let parent_id = if snapshot_id == 1000 { None } else { Some(snapshot_id - 1) }; - eprintln!("Parent id: {:?}", parent_id); let manifest_file = self.next_manifest_file(); let manifest_list_file = self.next_manifest_list_file(); let manifest_list_file_path = manifest_list_file.location().to_string(); + // Prep manifest list + let mut manifest_list_write = + ManifestListWriter::v2(manifest_list_file, snapshot_id, parent_id, snapshot_id + 1); + let mut manifest_list_entries: Vec = Vec::new(); + let mut writer = ManifestWriterBuilder::new( manifest_file, Some(snapshot_id), @@ -683,13 +678,32 @@ mod tests { .build(), ) .unwrap(); - let data_file_manifest = writer.write_manifest_file().await.unwrap(); - // Write to manifest list - let mut manifest_list_write = - ManifestListWriter::v2(manifest_list_file, snapshot_id, parent_id, snapshot_id + 1); + if include_existing_manifests { + let previous_snapshots = self + .table + .metadata() + .snapshots() + .filter(|s| s.snapshot_id() < snapshot_id) + .collect::>(); + + for snapshot in previous_snapshots { + let manifest_list = snapshot + .load_manifest_list(self.table.file_io(), self.table.metadata()) + .await + .unwrap(); + for manifest in manifest_list.entries() { + if !manifest_list_entries.contains(manifest) { + manifest_list_entries.push(manifest.clone()); + } + } + } + } + + let data_file_manifest = writer.write_manifest_file().await.unwrap(); + manifest_list_entries.push(data_file_manifest.clone()); manifest_list_write - .add_manifests(vec![data_file_manifest].into_iter()) + .add_manifests(manifest_list_entries.into_iter()) .unwrap(); manifest_list_write.close().await.unwrap(); @@ -732,12 +746,27 @@ mod tests { } } - /// Helper function to create a table with snapshots - async fn create_table_with_snapshots(snapshot_count: usize) -> Table { + /// Helper function to create a table with snapshots where each snapshot has its own separate + /// manifest. When running expire snapshots on this table, both manifest files and manifest + /// lists should be deleted. + async fn create_table_with_isolated_manifests(snapshot_count: usize) -> Table { + let mut table_fixture = TableTestFixture::new().await; + + for i in 0..snapshot_count { + table_fixture.add_snapshot((i + 1000) as i64, false).await; + } + + table_fixture.table + } + + /// Helper function to create a table with snapshots where some snapshots share manifest files. + /// When running expire snapshots on this table, only the manifest files that are not shared + /// with remaining snapshots should be deleted. + async fn create_table_with_shared_manifests(snapshot_count: usize) -> Table { let mut table_fixture = TableTestFixture::new().await; for i in 0..snapshot_count { - table_fixture.add_snapshot((i + 1000) as i64).await; + table_fixture.add_snapshot((i + 1000) as i64, true).await; } table_fixture.table @@ -745,7 +774,7 @@ mod tests { #[tokio::test] async fn test_expire_snapshots_zero_snapshots() { - let table = create_table_with_snapshots(0).await; + let table = create_table_with_isolated_manifests(0).await; let catalog = MockCatalog::new(table.clone()); let action = table.expire_snapshots().build(); @@ -763,8 +792,26 @@ mod tests { } #[tokio::test] - async fn test_expire_snapshots_one_snapshot() { - let table = create_table_with_snapshots(1).await; + async fn test_expire_snapshots_one_snapshot_isolated_manifests() { + let table = create_table_with_isolated_manifests(1).await; + let catalog = MockCatalog::new(table.clone()); + + let action = table.expire_snapshots().build(); + let result = action.execute(&catalog).await.unwrap(); + + assert_eq!(result.deleted_data_files_count, 0); + assert_eq!(result.deleted_manifest_files_count, 0); + assert_eq!(result.deleted_manifest_lists_count, 0); + assert_eq!(result.deleted_position_delete_files_count, 0); + assert_eq!(result.deleted_equality_delete_files_count, 0); + assert_eq!(result.deleted_statistics_files_count, 0); + + assert_eq!(catalog.update_table_calls.lock().unwrap().len(), 0); + } + + #[tokio::test] + async fn test_expire_snapshots_one_snapshot_shared_manifests() { + let table = create_table_with_shared_manifests(1).await; let catalog = MockCatalog::new(table.clone()); let action = table.expire_snapshots().build(); @@ -781,8 +828,8 @@ mod tests { } #[tokio::test] - async fn test_expire_snapshots_many_snapshots_retain_last() { - let table = create_table_with_snapshots(5).await; + async fn test_expire_snapshots_many_snapshots_isolated_manifests_retain_last() { + let table = create_table_with_isolated_manifests(5).await; let catalog = MockCatalog::new(table.clone()); let action = table.expire_snapshots().build().retain_last(2); @@ -819,9 +866,47 @@ mod tests { } #[tokio::test] - async fn test_expire_snapshots_older_than_timestamp() { + async fn test_expire_snapshots_many_snapshots_shared_manifests_retain_last() { + let table = create_table_with_shared_manifests(5).await; + let catalog = MockCatalog::new(table.clone()); + + let action = table.expire_snapshots().build().retain_last(2); + + let snapshots_to_expire = action.identify_snapshots_to_expire().unwrap(); + + assert_eq!(snapshots_to_expire.len(), 3); + + let mut ids_to_expire: Vec = snapshots_to_expire + .iter() + .map(|s| s.snapshot_id()) + .collect(); + + ids_to_expire.sort(); + + assert_eq!(ids_to_expire, vec![1000, 1001, 1002]); + + let result = action.execute(&catalog).await.unwrap(); + + assert_eq!(result.deleted_manifest_lists_count, 3); + assert_eq!(result.deleted_manifest_files_count, 0); + assert_eq!(result.deleted_data_files_count, 0); + assert_eq!(result.deleted_position_delete_files_count, 0); + assert_eq!(result.deleted_equality_delete_files_count, 0); + + assert_eq!(catalog.update_table_calls.lock().unwrap().len(), 1); + + let mut commit = catalog.update_table_calls.lock().unwrap().pop().unwrap(); + let updates = commit.take_updates(); + assert_eq!(updates.len(), 1); + assert_eq!(updates[0], TableUpdate::RemoveSnapshots { + snapshot_ids: vec![1000, 1001, 1002] + }); + } + + #[tokio::test] + async fn test_expire_snapshots_older_than_timestamp_isolated_manifests() { // Test case 3b: Table with many snapshots, expire based on timestamp - let table = create_table_with_snapshots(5).await; + let table = create_table_with_isolated_manifests(5).await; let catalog = MockCatalog::new(table.clone()); // Get the timestamp of the middle snapshot and use it as threshold @@ -872,8 +957,61 @@ mod tests { } #[tokio::test] - async fn test_expire_specific_snapshot_ids() { - let table = create_table_with_snapshots(5).await; + async fn test_expire_snapshots_older_than_timestamp_shared_manifests() { + // Test case 3b: Table with many snapshots, expire based on timestamp + let table = create_table_with_shared_manifests(5).await; + let catalog = MockCatalog::new(table.clone()); + + // Get the timestamp of the middle snapshot and use it as threshold + let middle_timestamp = table + .metadata() + .snapshots() + .find(|s| s.snapshot_id() == 1002) + .unwrap() + .timestamp_ms(); + + let action = table + .expire_snapshots() + .build() + .expire_older_than(middle_timestamp) + .retain_last(1); // Keep at least 1 + + let snapshots_to_expire = action.identify_snapshots_to_expire().unwrap(); + + // Should expire snapshots older than the middle one, but keep at least 1 + // So we expect snapshots 1000 and 1001 to be expired + assert_eq!(snapshots_to_expire.len(), 2); + + let mut ids_to_expire: Vec = snapshots_to_expire + .iter() + .map(|s| s.snapshot_id()) + .collect(); + ids_to_expire.sort(); + + assert_eq!(ids_to_expire, vec![1000, 1001]); + + let result = action.execute(&catalog).await.unwrap(); + + assert_eq!(result.deleted_manifest_lists_count, 2); + assert_eq!(result.deleted_manifest_files_count, 0); + assert_eq!(result.deleted_data_files_count, 0); + assert_eq!(result.deleted_position_delete_files_count, 0); + assert_eq!(result.deleted_equality_delete_files_count, 0); + assert_eq!(result.deleted_statistics_files_count, 0); + + assert_eq!(catalog.update_table_calls.lock().unwrap().len(), 1); + + let mut commit = catalog.update_table_calls.lock().unwrap().pop().unwrap(); + let updates = commit.take_updates(); + assert_eq!(updates.len(), 1); + assert_eq!(updates[0], TableUpdate::RemoveSnapshots { + snapshot_ids: vec![1000, 1001] + }); + } + + #[tokio::test] + async fn test_expire_specific_snapshot_ids_isolated_manifests() { + let table = create_table_with_isolated_manifests(5).await; let catalog = MockCatalog::new(table.clone()); let action = table @@ -913,9 +1051,51 @@ mod tests { }); } + #[tokio::test] + async fn test_expire_specific_snapshot_ids_shared_manifests() { + let table = create_table_with_shared_manifests(5).await; + let catalog = MockCatalog::new(table.clone()); + + let action = table + .expire_snapshots() + .build() + .expire_snapshot_ids(vec![1001, 1003]); + + let snapshots_to_expire = action.identify_snapshots_to_expire().unwrap(); + + assert_eq!(snapshots_to_expire.len(), 2); + + let mut ids_to_expire: Vec = snapshots_to_expire + .iter() + .map(|s| s.snapshot_id()) + .collect(); + + ids_to_expire.sort(); + + assert_eq!(ids_to_expire, vec![1001, 1003]); + + let result = action.execute(&catalog).await.unwrap(); + + assert_eq!(result.deleted_manifest_lists_count, 2); + assert_eq!(result.deleted_manifest_files_count, 0); + assert_eq!(result.deleted_data_files_count, 0); + assert_eq!(result.deleted_position_delete_files_count, 0); + assert_eq!(result.deleted_equality_delete_files_count, 0); + assert_eq!(result.deleted_statistics_files_count, 0); + + assert_eq!(catalog.update_table_calls.lock().unwrap().len(), 1); + + let mut commit = catalog.update_table_calls.lock().unwrap().pop().unwrap(); + let updates = commit.take_updates(); + assert_eq!(updates.len(), 1); + assert_eq!(updates[0], TableUpdate::RemoveSnapshots { + snapshot_ids: vec![1001, 1003] + }); + } + #[tokio::test] async fn test_expire_current_snapshot_error() { - let table = create_table_with_snapshots(3).await; + let table = create_table_with_isolated_manifests(3).await; let current_snapshot_id = table.metadata().current_snapshot_id().unwrap(); let action = table @@ -937,7 +1117,7 @@ mod tests { #[tokio::test] async fn test_expire_readonly_table_error() { - let table = create_table_with_snapshots(3).await; + let table = create_table_with_isolated_manifests(3).await; let catalog = MockCatalog::new(table.clone()); let readonly_table = TableBuilder::new() @@ -963,7 +1143,7 @@ mod tests { #[tokio::test] async fn test_retain_last_minimum_validation() { - let table = create_table_with_snapshots(3).await; + let table = create_table_with_isolated_manifests(3).await; let catalog = MockCatalog::new(table.clone()); let action = table.expire_snapshots().build().retain_last(0); @@ -991,7 +1171,7 @@ mod tests { #[tokio::test] async fn test_max_concurrent_deletes_configuration() { - let table = create_table_with_snapshots(3).await; + let table = create_table_with_isolated_manifests(3).await; let catalog = MockCatalog::new(table.clone()); let action = table.expire_snapshots().build().max_concurrent_deletes(5); @@ -1019,8 +1199,8 @@ mod tests { } #[tokio::test] - async fn test_empty_snapshot_ids_list() { - let table = create_table_with_snapshots(3).await; + async fn test_empty_snapshot_ids_list_isolated_manifests() { + let table = create_table_with_isolated_manifests(3).await; let catalog = MockCatalog::new(table.clone()); let action = table.expire_snapshots().build().expire_snapshot_ids(vec![]); @@ -1045,9 +1225,36 @@ mod tests { }); } + #[tokio::test] + async fn test_empty_snapshot_ids_list_shared_manifests() { + let table = create_table_with_shared_manifests(3).await; + let catalog = MockCatalog::new(table.clone()); + + let action = table.expire_snapshots().build().expire_snapshot_ids(vec![]); + + let snapshots_to_expire = action.identify_snapshots_to_expire().unwrap(); + + assert_eq!(snapshots_to_expire.len(), 2); + + let result = action.execute(&catalog).await.unwrap(); + + assert_eq!(result.deleted_manifest_lists_count, 2); + assert_eq!(result.deleted_manifest_files_count, 0); + assert_eq!(result.deleted_data_files_count, 0); + + assert_eq!(catalog.update_table_calls.lock().unwrap().len(), 1); + + let mut commit = catalog.update_table_calls.lock().unwrap().pop().unwrap(); + let updates = commit.take_updates(); + assert_eq!(updates.len(), 1); + assert_eq!(updates[0], TableUpdate::RemoveSnapshots { + snapshot_ids: vec![1000, 1001] + }); + } + #[tokio::test] async fn test_nonexistent_snapshot_ids() { - let table = create_table_with_snapshots(3).await; + let table = create_table_with_isolated_manifests(3).await; let catalog = MockCatalog::new(table.clone()); let action = table @@ -1070,7 +1277,7 @@ mod tests { #[tokio::test] async fn test_builder_pattern() { - let table = create_table_with_snapshots(5).await; + let table = create_table_with_isolated_manifests(5).await; let action = table .expire_snapshots() @@ -1088,7 +1295,7 @@ mod tests { #[tokio::test] async fn test_collect_files_to_delete_logic() { - let table = create_table_with_snapshots(4).await; + let table = create_table_with_isolated_manifests(4).await; let action = table.expire_snapshots().build().retain_last(2); @@ -1115,7 +1322,7 @@ mod tests { #[tokio::test] async fn test_default_configuration() { - let table = create_table_with_snapshots(3).await; + let table = create_table_with_isolated_manifests(3).await; let action = table.expire_snapshots().build(); assert_eq!(action.config.older_than_ms, None); @@ -1126,7 +1333,7 @@ mod tests { #[tokio::test] async fn test_dry_run() { - let table = create_table_with_snapshots(3).await; + let table = create_table_with_isolated_manifests(3).await; let catalog = MockCatalog::new(table.clone()); let action = table.expire_snapshots().build().dry_run(true); From c86c6023d2ba8a2104e2df7e8b6be13bf0d17618 Mon Sep 17 00:00:00 2001 From: Connor McArthur Date: Fri, 20 Jun 2025 09:59:00 -0400 Subject: [PATCH 3/5] clean up warnings --- crates/iceberg/src/maintenance/expire_snapshots.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/maintenance/expire_snapshots.rs b/crates/iceberg/src/maintenance/expire_snapshots.rs index 3a7183d07..9556f00f9 100644 --- a/crates/iceberg/src/maintenance/expire_snapshots.rs +++ b/crates/iceberg/src/maintenance/expire_snapshots.rs @@ -235,7 +235,7 @@ impl ExpireSnapshotsAction { expired_manifests.insert(manifest_entry.manifest_path.clone()); } } - Err(e) => { + Err(_) => { // If we end up here, the failure mode is that we will skip some snapshots. // This could happen if the manifest list was already deleted. This is not a // critical error, so we will just skip this snapshot. It can be cleaned up @@ -358,7 +358,7 @@ impl ExpireSnapshotsAction { Ok(deleted_path) => { self.process_file_deletion(&mut result, deleted_path).await; } - Err(e) => { + Err(_) => { // This is not a critical error, so we will just continue. The result is // that an orphaned file will be left behind. } From 8790433164969f950f0afdecada5cbde47db8cf4 Mon Sep 17 00:00:00 2001 From: Connor McArthur Date: Fri, 20 Jun 2025 14:12:01 -0400 Subject: [PATCH 4/5] PR feedback: reorganize and separate procedure, action --- .../src/maintenance/expire_snapshots.rs | 358 ++++++++++-------- 1 file changed, 194 insertions(+), 164 deletions(-) diff --git a/crates/iceberg/src/maintenance/expire_snapshots.rs b/crates/iceberg/src/maintenance/expire_snapshots.rs index 9556f00f9..9fc010995 100644 --- a/crates/iceberg/src/maintenance/expire_snapshots.rs +++ b/crates/iceberg/src/maintenance/expire_snapshots.rs @@ -22,6 +22,7 @@ //! in a consistent state. use std::collections::HashSet; +use std::sync::Arc; use async_trait::async_trait; use serde::{Deserialize, Serialize}; @@ -30,9 +31,21 @@ use crate::error::Result; use crate::runtime::JoinHandle; use crate::spec::SnapshotRef; use crate::table::Table; -use crate::transaction::Transaction; +use crate::transaction::{ActionCommit, ApplyTransactionAction, Transaction, TransactionAction}; use crate::{Catalog, Error, ErrorKind, TableUpdate}; +/// Trait for performing the broader expire snapshots operation +/// +/// Given a configuration for the procedure, you can then call `execute` on tables to perform the +/// configured operation. +/// +/// This is the primary entrypoint for code in this module. +#[async_trait] +pub trait ExpireSnapshotsProcedure: Send + Sync { + /// Execute the expire snapshots operation + async fn execute(&self, table: &Table, catalog: &dyn Catalog) -> Result; +} + /// Result of the expire snapshots operation. Contains information about how many files were /// deleted. #[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -83,70 +96,38 @@ impl Default for ExpireSnapshotsConfig { } } -/// Trait for performing expire snapshots operations -/// -/// This trait provides a low-level API for expiring snapshots that can be -/// extended with different implementations for different environments. -#[async_trait] -pub trait ExpireSnapshots: Send + Sync { - /// Execute the expire snapshots operation - async fn execute(&self, catalog: &dyn Catalog) -> Result; -} - -/// Implementation of the expire snapshots operation +/// Action for expiring snapshots, this can be constructed and applied to a transaction to directly +/// remove snapshots from the table. pub struct ExpireSnapshotsAction { - table: Table, - config: ExpireSnapshotsConfig, + snapshot_ids_to_expire: Vec, } impl ExpireSnapshotsAction { /// Create a new expire snapshots action - pub fn new(table: Table) -> Self { + pub fn new(snapshot_ids_to_expire: Vec) -> Self { Self { - table, - config: ExpireSnapshotsConfig::default(), + snapshot_ids_to_expire, } } +} - /// Set the timestamp threshold for expiring snapshots - pub fn expire_older_than(mut self, timestamp_ms: i64) -> Self { - self.config.older_than_ms = Some(timestamp_ms); - self - } - - /// Set the dry run flag - pub fn dry_run(mut self, dry_run: bool) -> Self { - self.config.dry_run = dry_run; - self - } - - /// Set the minimum number of snapshots to retain. If the number of snapshots is less than 1, - /// it will be automatically adjusted to 1, following the behavior in Spark. - pub fn retain_last(mut self, num_snapshots: u32) -> Self { - if num_snapshots < 1 { - self.config.retain_last = Some(1); - } else { - self.config.retain_last = Some(num_snapshots); - } - self - } - - /// Set specific snapshot IDs to expire. An empty list is equivalent to the default behavior - /// of expiring all but `retain_last` snapshots! When only expiring specific snapshots, please - /// ensure that the list of snapshot IDs is non-empty before using this method. - pub fn expire_snapshot_ids(mut self, snapshot_ids: Vec) -> Self { - self.config.snapshot_ids = snapshot_ids; - self +#[async_trait] +impl TransactionAction for ExpireSnapshotsAction { + async fn commit(self: Arc, table: &Table) -> Result { + Ok(ActionCommit::new( + vec![TableUpdate::RemoveSnapshots { + snapshot_ids: self.snapshot_ids_to_expire.clone(), + }], + vec![], + )) } +} - /// Set the maximum number of concurrent file deletions - pub fn max_concurrent_deletes(mut self, max_deletes: u32) -> Self { - if max_deletes > 0 { - self.config.max_concurrent_deletes = Some(max_deletes); - } - self - } +struct ExpireSnapshotsProcedureImpl { + config: ExpireSnapshotsConfig, +} +impl ExpireSnapshotsProcedureImpl { /// Determine which snapshots should be expired based on the configuration. This will: /// /// - Sort snapshots by timestamp (oldest first) @@ -158,8 +139,8 @@ impl ExpireSnapshotsAction { /// - Never expire the current snapshot! /// /// Returns a Vec of SnapshotRefs that should be expired (references removed, and deleted). - fn identify_snapshots_to_expire(&self) -> Result> { - let metadata = self.table.metadata(); + fn identify_snapshots_to_expire(&self, table: &Table) -> Result> { + let metadata = table.metadata(); let all_snapshots: Vec = metadata.snapshots().cloned().collect(); if all_snapshots.is_empty() { @@ -216,11 +197,12 @@ impl ExpireSnapshotsAction { /// Collect all files that should be deleted along with the expired snapshots async fn collect_files_to_delete( &self, + table: &Table, expired_snapshots: &[SnapshotRef], ) -> Result> { let mut files_to_delete = Vec::new(); - let file_io = self.table.file_io(); - let metadata = self.table.metadata(); + let file_io = table.file_io(); + let metadata = table.metadata(); // Collect files from snapshots that are being expired let mut expired_manifest_lists = HashSet::new(); @@ -301,7 +283,11 @@ impl ExpireSnapshotsAction { /// Delete files concurrently with respect to max_concurrent_deletes setting /// Should not be called if dry_run is true, but this is checked for extra safety. - async fn delete_files(&self, files_to_delete: Vec) -> Result { + async fn delete_files( + &self, + table: &Table, + files_to_delete: Vec, + ) -> Result { let mut result = ExpireSnapshotsResult::default(); if self.config.dry_run { @@ -311,7 +297,7 @@ impl ExpireSnapshotsAction { return Ok(result); } - let file_io = self.table.file_io(); + let file_io = table.file_io(); if files_to_delete.is_empty() { return Ok(result); @@ -371,7 +357,7 @@ impl ExpireSnapshotsAction { } #[async_trait] -impl ExpireSnapshots for ExpireSnapshotsAction { +impl ExpireSnapshotsProcedure for ExpireSnapshotsProcedureImpl { /// The main entrypoint for the expire snapshots action. This will: /// /// - Validate the table state @@ -379,21 +365,23 @@ impl ExpireSnapshots for ExpireSnapshotsAction { /// - Update the table metadata to remove expired snapshots /// - Collect files to delete /// - Delete the files - async fn execute(&self, catalog: &dyn Catalog) -> Result { - if self.table.readonly() { + async fn execute(&self, table: &Table, catalog: &dyn Catalog) -> Result { + if table.readonly() { return Err(Error::new( ErrorKind::FeatureUnsupported, "Cannot expire snapshots on a readonly table", )); } - let snapshots_to_expire = self.identify_snapshots_to_expire()?; + let snapshots_to_expire = self.identify_snapshots_to_expire(table)?; if snapshots_to_expire.is_empty() { return Ok(ExpireSnapshotsResult::default()); } - let files_to_delete = self.collect_files_to_delete(&snapshots_to_expire).await?; + let files_to_delete = self + .collect_files_to_delete(table, &snapshots_to_expire) + .await?; if self.config.dry_run { let mut result = ExpireSnapshotsResult::default(); @@ -403,54 +391,96 @@ impl ExpireSnapshots for ExpireSnapshotsAction { return Ok(result); } - // update the table metadata to remove the expired snapshots _before_ deleting anything! - // TODO: make this retry - let mut transaction = Transaction::new(&self.table); - let mut snapshot_ids: Vec = snapshots_to_expire + let mut snapshot_ids_to_expire: Vec = snapshots_to_expire .iter() .map(|s| s.snapshot_id()) .collect(); + // sort for a deterministic output - snapshot_ids.sort(); - transaction.apply( - vec![TableUpdate::RemoveSnapshots { snapshot_ids }], - // no requirements here. if the table's main branch was rewound while this operation is - // running, this will potentially corrupt the table by deleting the wrong snapshots. - // but, if this fails because of a concurrent update, we have to repeat the logic. - // TODO: verify that this is running against the latest metadata version. if the commit - // fails, refresh the metadata and try again if and only if new snapshots were added. - vec![], - )?; + snapshot_ids_to_expire.sort(); + + let mut transaction = Transaction::new(table); + let action = ExpireSnapshotsAction::new(snapshot_ids_to_expire); + transaction = action.apply(transaction)?; transaction.commit(catalog).await?; - let result = self.delete_files(files_to_delete).await?; + let result = self.delete_files(table, files_to_delete).await?; Ok(result) } } /// Builder for creating expire snapshots operations -pub struct ExpireSnapshotsBuilder { - table: Table, +pub struct ExpireSnapshotsProcedureBuilder { + config: ExpireSnapshotsConfig, } -impl ExpireSnapshotsBuilder { - /// Create a new builder for the given table - pub fn new(table: Table) -> Self { - Self { table } +impl Default for ExpireSnapshotsProcedureBuilder { + fn default() -> Self { + Self::new() + } +} + +impl ExpireSnapshotsProcedureBuilder { + /// Create a new builder for the expire snapshots procedure + pub fn new() -> Self { + Self { + config: ExpireSnapshotsConfig::default(), + } } /// Build an expire snapshots action with default configuration - pub fn build(self) -> ExpireSnapshotsAction { - ExpireSnapshotsAction::new(self.table) + pub fn build(self) -> ExpireSnapshotsProcedureImpl { + ExpireSnapshotsProcedureImpl { + config: self.config, + } + } + + /// Set the timestamp threshold for expiring snapshots + pub fn expire_older_than(mut self, timestamp_ms: i64) -> Self { + self.config.older_than_ms = Some(timestamp_ms); + self + } + + /// Set the dry run flag + pub fn dry_run(mut self, dry_run: bool) -> Self { + self.config.dry_run = dry_run; + self + } + + /// Set the minimum number of snapshots to retain. If the number of snapshots is less than 1, + /// it will be automatically adjusted to 1, following the behavior in Spark. + pub fn retain_last(mut self, num_snapshots: u32) -> Self { + if num_snapshots < 1 { + self.config.retain_last = Some(1); + } else { + self.config.retain_last = Some(num_snapshots); + } + self + } + + /// Set specific snapshot IDs to expire. An empty list is equivalent to the default behavior + /// of expiring all but `retain_last` snapshots! When only expiring specific snapshots, please + /// ensure that the list of snapshot IDs is non-empty before using this method. + pub fn expire_snapshot_ids(mut self, snapshot_ids: Vec) -> Self { + self.config.snapshot_ids = snapshot_ids; + self + } + + /// Set the maximum number of concurrent file deletions + pub fn max_concurrent_deletes(mut self, max_deletes: u32) -> Self { + if max_deletes > 0 { + self.config.max_concurrent_deletes = Some(max_deletes); + } + self } } // Extension trait to add expire_snapshots method to Table impl Table { /// Create a new expire snapshots builder for this table - pub fn expire_snapshots(&self) -> ExpireSnapshotsBuilder { - ExpireSnapshotsBuilder::new(self.clone()) + pub fn expire_snapshots(&self) -> ExpireSnapshotsProcedureBuilder { + ExpireSnapshotsProcedureBuilder::new() } } @@ -777,8 +807,8 @@ mod tests { let table = create_table_with_isolated_manifests(0).await; let catalog = MockCatalog::new(table.clone()); - let action = table.expire_snapshots().build(); - let result = action.execute(&catalog).await.unwrap(); + let procedure = table.expire_snapshots().build(); + let result = procedure.execute(&table, &catalog).await.unwrap(); // Should complete successfully with no deletions assert_eq!(result.deleted_data_files_count, 0); @@ -796,8 +826,8 @@ mod tests { let table = create_table_with_isolated_manifests(1).await; let catalog = MockCatalog::new(table.clone()); - let action = table.expire_snapshots().build(); - let result = action.execute(&catalog).await.unwrap(); + let procedure = table.expire_snapshots().build(); + let result = procedure.execute(&table, &catalog).await.unwrap(); assert_eq!(result.deleted_data_files_count, 0); assert_eq!(result.deleted_manifest_files_count, 0); @@ -814,8 +844,8 @@ mod tests { let table = create_table_with_shared_manifests(1).await; let catalog = MockCatalog::new(table.clone()); - let action = table.expire_snapshots().build(); - let result = action.execute(&catalog).await.unwrap(); + let procedure = table.expire_snapshots().build(); + let result = procedure.execute(&table, &catalog).await.unwrap(); assert_eq!(result.deleted_data_files_count, 0); assert_eq!(result.deleted_manifest_files_count, 0); @@ -832,9 +862,9 @@ mod tests { let table = create_table_with_isolated_manifests(5).await; let catalog = MockCatalog::new(table.clone()); - let action = table.expire_snapshots().build().retain_last(2); + let procedure = table.expire_snapshots().retain_last(2).build(); - let snapshots_to_expire = action.identify_snapshots_to_expire().unwrap(); + let snapshots_to_expire = procedure.identify_snapshots_to_expire(&table).unwrap(); assert_eq!(snapshots_to_expire.len(), 3); @@ -847,7 +877,7 @@ mod tests { assert_eq!(ids_to_expire, vec![1000, 1001, 1002]); - let result = action.execute(&catalog).await.unwrap(); + let result = procedure.execute(&table, &catalog).await.unwrap(); assert_eq!(result.deleted_manifest_lists_count, 3); assert_eq!(result.deleted_manifest_files_count, 3); @@ -870,9 +900,9 @@ mod tests { let table = create_table_with_shared_manifests(5).await; let catalog = MockCatalog::new(table.clone()); - let action = table.expire_snapshots().build().retain_last(2); + let procedure = table.expire_snapshots().retain_last(2).build(); - let snapshots_to_expire = action.identify_snapshots_to_expire().unwrap(); + let snapshots_to_expire = procedure.identify_snapshots_to_expire(&table).unwrap(); assert_eq!(snapshots_to_expire.len(), 3); @@ -885,7 +915,7 @@ mod tests { assert_eq!(ids_to_expire, vec![1000, 1001, 1002]); - let result = action.execute(&catalog).await.unwrap(); + let result = procedure.execute(&table, &catalog).await.unwrap(); assert_eq!(result.deleted_manifest_lists_count, 3); assert_eq!(result.deleted_manifest_files_count, 0); @@ -917,13 +947,13 @@ mod tests { .unwrap() .timestamp_ms(); - let action = table + let procedure = table .expire_snapshots() - .build() .expire_older_than(middle_timestamp) - .retain_last(1); // Keep at least 1 + .retain_last(1) + .build(); // Keep at least 1 - let snapshots_to_expire = action.identify_snapshots_to_expire().unwrap(); + let snapshots_to_expire = procedure.identify_snapshots_to_expire(&table).unwrap(); // Should expire snapshots older than the middle one, but keep at least 1 // So we expect snapshots 1000 and 1001 to be expired @@ -937,7 +967,7 @@ mod tests { assert_eq!(ids_to_expire, vec![1000, 1001]); - let result = action.execute(&catalog).await.unwrap(); + let result = procedure.execute(&table, &catalog).await.unwrap(); assert_eq!(result.deleted_manifest_lists_count, 2); assert_eq!(result.deleted_manifest_files_count, 2); @@ -970,13 +1000,13 @@ mod tests { .unwrap() .timestamp_ms(); - let action = table + let procedure = table .expire_snapshots() - .build() .expire_older_than(middle_timestamp) - .retain_last(1); // Keep at least 1 + .retain_last(1) + .build(); // Keep at least 1 - let snapshots_to_expire = action.identify_snapshots_to_expire().unwrap(); + let snapshots_to_expire = procedure.identify_snapshots_to_expire(&table).unwrap(); // Should expire snapshots older than the middle one, but keep at least 1 // So we expect snapshots 1000 and 1001 to be expired @@ -990,7 +1020,7 @@ mod tests { assert_eq!(ids_to_expire, vec![1000, 1001]); - let result = action.execute(&catalog).await.unwrap(); + let result = procedure.execute(&table, &catalog).await.unwrap(); assert_eq!(result.deleted_manifest_lists_count, 2); assert_eq!(result.deleted_manifest_files_count, 0); @@ -1014,12 +1044,12 @@ mod tests { let table = create_table_with_isolated_manifests(5).await; let catalog = MockCatalog::new(table.clone()); - let action = table + let procedure = table .expire_snapshots() - .build() - .expire_snapshot_ids(vec![1001, 1003]); + .expire_snapshot_ids(vec![1001, 1003]) + .build(); - let snapshots_to_expire = action.identify_snapshots_to_expire().unwrap(); + let snapshots_to_expire = procedure.identify_snapshots_to_expire(&table).unwrap(); assert_eq!(snapshots_to_expire.len(), 2); @@ -1032,7 +1062,7 @@ mod tests { assert_eq!(ids_to_expire, vec![1001, 1003]); - let result = action.execute(&catalog).await.unwrap(); + let result = procedure.execute(&table, &catalog).await.unwrap(); assert_eq!(result.deleted_manifest_lists_count, 2); assert_eq!(result.deleted_manifest_files_count, 2); @@ -1056,12 +1086,12 @@ mod tests { let table = create_table_with_shared_manifests(5).await; let catalog = MockCatalog::new(table.clone()); - let action = table + let procedure = table .expire_snapshots() - .build() - .expire_snapshot_ids(vec![1001, 1003]); + .expire_snapshot_ids(vec![1001, 1003]) + .build(); - let snapshots_to_expire = action.identify_snapshots_to_expire().unwrap(); + let snapshots_to_expire = procedure.identify_snapshots_to_expire(&table).unwrap(); assert_eq!(snapshots_to_expire.len(), 2); @@ -1074,7 +1104,7 @@ mod tests { assert_eq!(ids_to_expire, vec![1001, 1003]); - let result = action.execute(&catalog).await.unwrap(); + let result = procedure.execute(&table, &catalog).await.unwrap(); assert_eq!(result.deleted_manifest_lists_count, 2); assert_eq!(result.deleted_manifest_files_count, 0); @@ -1098,12 +1128,12 @@ mod tests { let table = create_table_with_isolated_manifests(3).await; let current_snapshot_id = table.metadata().current_snapshot_id().unwrap(); - let action = table + let procedure = table .expire_snapshots() - .build() - .expire_snapshot_ids(vec![current_snapshot_id]); + .expire_snapshot_ids(vec![current_snapshot_id]) + .build(); - let result = action.identify_snapshots_to_expire(); + let result = procedure.identify_snapshots_to_expire(&table); assert!(result.is_err()); let error = result.unwrap_err(); @@ -1128,8 +1158,8 @@ mod tests { .build() .unwrap(); - let action = readonly_table.expire_snapshots().build(); - let result = action.execute(&catalog).await; + let procedure = readonly_table.expire_snapshots().build(); + let result = procedure.execute(&readonly_table, &catalog).await; assert!(result.is_err()); let error = result.unwrap_err(); @@ -1146,11 +1176,11 @@ mod tests { let table = create_table_with_isolated_manifests(3).await; let catalog = MockCatalog::new(table.clone()); - let action = table.expire_snapshots().build().retain_last(0); + let procedure = table.expire_snapshots().retain_last(0).build(); - assert_eq!(action.config.retain_last, Some(1)); + assert_eq!(procedure.config.retain_last, Some(1)); - let result = action.execute(&catalog).await.unwrap(); + let result = procedure.execute(&table, &catalog).await.unwrap(); assert_eq!(result.deleted_manifest_lists_count, 2); assert_eq!(result.deleted_manifest_files_count, 2); @@ -1174,15 +1204,15 @@ mod tests { let table = create_table_with_isolated_manifests(3).await; let catalog = MockCatalog::new(table.clone()); - let action = table.expire_snapshots().build().max_concurrent_deletes(5); + let procedure = table.expire_snapshots().max_concurrent_deletes(5).build(); - assert_eq!(action.config.max_concurrent_deletes, Some(5)); + assert_eq!(procedure.config.max_concurrent_deletes, Some(5)); - let action2 = table.expire_snapshots().build().max_concurrent_deletes(0); + let procedure2 = table.expire_snapshots().max_concurrent_deletes(0).build(); - assert_eq!(action2.config.max_concurrent_deletes, None); + assert_eq!(procedure2.config.max_concurrent_deletes, None); - let result = action.execute(&catalog).await.unwrap(); + let result = procedure.execute(&table, &catalog).await.unwrap(); assert_eq!(result.deleted_manifest_lists_count, 2); assert_eq!(result.deleted_manifest_files_count, 2); @@ -1203,13 +1233,13 @@ mod tests { let table = create_table_with_isolated_manifests(3).await; let catalog = MockCatalog::new(table.clone()); - let action = table.expire_snapshots().build().expire_snapshot_ids(vec![]); + let procedure = table.expire_snapshots().expire_snapshot_ids(vec![]).build(); - let snapshots_to_expire = action.identify_snapshots_to_expire().unwrap(); + let snapshots_to_expire = procedure.identify_snapshots_to_expire(&table).unwrap(); assert_eq!(snapshots_to_expire.len(), 2); - let result = action.execute(&catalog).await.unwrap(); + let result = procedure.execute(&table, &catalog).await.unwrap(); assert_eq!(result.deleted_manifest_lists_count, 2); assert_eq!(result.deleted_manifest_files_count, 2); @@ -1230,13 +1260,13 @@ mod tests { let table = create_table_with_shared_manifests(3).await; let catalog = MockCatalog::new(table.clone()); - let action = table.expire_snapshots().build().expire_snapshot_ids(vec![]); + let procedure = table.expire_snapshots().expire_snapshot_ids(vec![]).build(); - let snapshots_to_expire = action.identify_snapshots_to_expire().unwrap(); + let snapshots_to_expire = procedure.identify_snapshots_to_expire(&table).unwrap(); assert_eq!(snapshots_to_expire.len(), 2); - let result = action.execute(&catalog).await.unwrap(); + let result = procedure.execute(&table, &catalog).await.unwrap(); assert_eq!(result.deleted_manifest_lists_count, 2); assert_eq!(result.deleted_manifest_files_count, 0); @@ -1257,16 +1287,16 @@ mod tests { let table = create_table_with_isolated_manifests(3).await; let catalog = MockCatalog::new(table.clone()); - let action = table + let procedure = table .expire_snapshots() - .build() - .expire_snapshot_ids(vec![9999, 8888]); + .expire_snapshot_ids(vec![9999, 8888]) + .build(); - let snapshots_to_expire = action.identify_snapshots_to_expire().unwrap(); + let snapshots_to_expire = procedure.identify_snapshots_to_expire(&table).unwrap(); assert_eq!(snapshots_to_expire.len(), 0); - let result = action.execute(&catalog).await.unwrap(); + let result = procedure.execute(&table, &catalog).await.unwrap(); assert_eq!(result.deleted_manifest_lists_count, 0); assert_eq!(result.deleted_manifest_files_count, 0); @@ -1279,31 +1309,31 @@ mod tests { async fn test_builder_pattern() { let table = create_table_with_isolated_manifests(5).await; - let action = table + let procedure = table .expire_snapshots() - .build() .expire_older_than(Utc::now().timestamp_millis()) .retain_last(3) .max_concurrent_deletes(10) - .expire_snapshot_ids(vec![1001, 1002]); + .expire_snapshot_ids(vec![1001, 1002]) + .build(); - assert!(action.config.older_than_ms.is_some()); - assert_eq!(action.config.retain_last, Some(3)); - assert_eq!(action.config.max_concurrent_deletes, Some(10)); - assert_eq!(action.config.snapshot_ids, vec![1001, 1002]); + assert!(procedure.config.older_than_ms.is_some()); + assert_eq!(procedure.config.retain_last, Some(3)); + assert_eq!(procedure.config.max_concurrent_deletes, Some(10)); + assert_eq!(procedure.config.snapshot_ids, vec![1001, 1002]); } #[tokio::test] async fn test_collect_files_to_delete_logic() { let table = create_table_with_isolated_manifests(4).await; - let action = table.expire_snapshots().build().retain_last(2); + let procedure = table.expire_snapshots().retain_last(2).build(); - let snapshots_to_expire = action.identify_snapshots_to_expire().unwrap(); + let snapshots_to_expire = procedure.identify_snapshots_to_expire(&table).unwrap(); assert_eq!(snapshots_to_expire.len(), 2); - let files_to_delete = action - .collect_files_to_delete(&snapshots_to_expire) + let files_to_delete = procedure + .collect_files_to_delete(&table, &snapshots_to_expire) .await .unwrap(); @@ -1323,21 +1353,21 @@ mod tests { #[tokio::test] async fn test_default_configuration() { let table = create_table_with_isolated_manifests(3).await; - let action = table.expire_snapshots().build(); + let procedure = table.expire_snapshots().build(); - assert_eq!(action.config.older_than_ms, None); - assert_eq!(action.config.retain_last, Some(1)); - assert_eq!(action.config.max_concurrent_deletes, None); - assert!(action.config.snapshot_ids.is_empty()); + assert_eq!(procedure.config.older_than_ms, None); + assert_eq!(procedure.config.retain_last, Some(1)); + assert_eq!(procedure.config.max_concurrent_deletes, None); + assert!(procedure.config.snapshot_ids.is_empty()); } #[tokio::test] async fn test_dry_run() { let table = create_table_with_isolated_manifests(3).await; let catalog = MockCatalog::new(table.clone()); - let action = table.expire_snapshots().build().dry_run(true); + let procedure = table.expire_snapshots().dry_run(true).build(); - let result = action.execute(&catalog).await.unwrap(); + let result = procedure.execute(&table, &catalog).await.unwrap(); assert_eq!(result.deleted_manifest_lists_count, 2); assert_eq!(result.deleted_manifest_files_count, 2); From 226d4c13881ddc2db3aa4053afbeb7987d441c4e Mon Sep 17 00:00:00 2001 From: Connor McArthur Date: Fri, 20 Jun 2025 14:16:08 -0400 Subject: [PATCH 5/5] PR feedback: fix visibility issues, warnings --- .../src/maintenance/expire_snapshots.rs | 37 ++++++++----------- crates/iceberg/src/transaction/mod.rs | 2 +- 2 files changed, 16 insertions(+), 23 deletions(-) diff --git a/crates/iceberg/src/maintenance/expire_snapshots.rs b/crates/iceberg/src/maintenance/expire_snapshots.rs index 9fc010995..c5043c2eb 100644 --- a/crates/iceberg/src/maintenance/expire_snapshots.rs +++ b/crates/iceberg/src/maintenance/expire_snapshots.rs @@ -34,18 +34,6 @@ use crate::table::Table; use crate::transaction::{ActionCommit, ApplyTransactionAction, Transaction, TransactionAction}; use crate::{Catalog, Error, ErrorKind, TableUpdate}; -/// Trait for performing the broader expire snapshots operation -/// -/// Given a configuration for the procedure, you can then call `execute` on tables to perform the -/// configured operation. -/// -/// This is the primary entrypoint for code in this module. -#[async_trait] -pub trait ExpireSnapshotsProcedure: Send + Sync { - /// Execute the expire snapshots operation - async fn execute(&self, table: &Table, catalog: &dyn Catalog) -> Result; -} - /// Result of the expire snapshots operation. Contains information about how many files were /// deleted. #[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -113,7 +101,7 @@ impl ExpireSnapshotsAction { #[async_trait] impl TransactionAction for ExpireSnapshotsAction { - async fn commit(self: Arc, table: &Table) -> Result { + async fn commit(self: Arc, _table: &Table) -> Result { Ok(ActionCommit::new( vec![TableUpdate::RemoveSnapshots { snapshot_ids: self.snapshot_ids_to_expire.clone(), @@ -123,11 +111,15 @@ impl TransactionAction for ExpireSnapshotsAction { } } -struct ExpireSnapshotsProcedureImpl { +/// Procedure for expiring snapshots. Should be constructed via the `ExpireSnapshotsProcedureBuilder`. +/// +/// Once constructed, `execute` can be called on a catalog + table to perform the expire snapshots +/// operation as configured. +pub struct ExpireSnapshotsProcedure { config: ExpireSnapshotsConfig, } -impl ExpireSnapshotsProcedureImpl { +impl ExpireSnapshotsProcedure { /// Determine which snapshots should be expired based on the configuration. This will: /// /// - Sort snapshots by timestamp (oldest first) @@ -354,18 +346,19 @@ impl ExpireSnapshotsProcedureImpl { Ok(result) } -} -#[async_trait] -impl ExpireSnapshotsProcedure for ExpireSnapshotsProcedureImpl { - /// The main entrypoint for the expire snapshots action. This will: + /// The main entrypoint for the expire snapshots procedure. This will: /// /// - Validate the table state /// - Identify snapshots to expire /// - Update the table metadata to remove expired snapshots /// - Collect files to delete /// - Delete the files - async fn execute(&self, table: &Table, catalog: &dyn Catalog) -> Result { + pub async fn execute( + &self, + table: &Table, + catalog: &dyn Catalog, + ) -> Result { if table.readonly() { return Err(Error::new( ErrorKind::FeatureUnsupported, @@ -430,8 +423,8 @@ impl ExpireSnapshotsProcedureBuilder { } /// Build an expire snapshots action with default configuration - pub fn build(self) -> ExpireSnapshotsProcedureImpl { - ExpireSnapshotsProcedureImpl { + pub fn build(self) -> ExpireSnapshotsProcedure { + ExpireSnapshotsProcedure { config: self.config, } } diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index cddab3cfd..a2af9aaa3 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -76,7 +76,7 @@ impl Transaction { Ok(()) } - pub(crate) fn apply( + fn apply( &mut self, updates: Vec, requirements: Vec,