From 1e8232707f265b4b692e89dccdbaf199b089dbd6 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 19 May 2025 15:23:54 -0700 Subject: [PATCH 1/6] add snapshot validation logic --- crates/iceberg/src/lib.rs | 2 + crates/iceberg/src/spec/snapshot.rs | 1 + crates/iceberg/src/transaction/append.rs | 7 +- crates/iceberg/src/transaction/mod.rs | 1 + crates/iceberg/src/transaction/snapshot.rs | 8 +- crates/iceberg/src/transaction/validate.rs | 225 +++++++++++++++++++++ 6 files changed, 242 insertions(+), 2 deletions(-) create mode 100644 crates/iceberg/src/transaction/validate.rs diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 556ff3e02..8e72ed07d 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -1,4 +1,6 @@ +#![feature(let_chains)] // Licensed to the Apache Software Foundation (ASF) under one + // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index a2716ad97..d6105273a 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -42,6 +42,7 @@ pub type SnapshotRef = Arc; #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(rename_all = "lowercase")] /// The operation field is used by some operations, like snapshot expiration, to skip processing certain snapshots. +#[derive(Hash)] pub enum Operation { /// Only data files were added and no files were removed. Append, diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 574904b28..504bcb464 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -16,17 +16,20 @@ // under the License. use std::collections::{HashMap, HashSet}; +use std::sync::Arc; use arrow_array::StringArray; use futures::TryStreamExt; use uuid::Uuid; use crate::error::Result; -use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation}; +use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation, Snapshot, SnapshotRef}; +use crate::table::Table; use crate::transaction::Transaction; use crate::transaction::snapshot::{ DefaultManifestProcess, SnapshotProduceAction, SnapshotProduceOperation, }; +use crate::transaction::validate::SnapshotValidator; use crate::writer::file_writer::ParquetWriter; use crate::{Error, ErrorKind}; @@ -209,6 +212,8 @@ impl SnapshotProduceOperation for FastAppendOperation { } } +impl SnapshotValidator for FastAppendOperation {} + #[cfg(test)] mod tests { use crate::scan::tests::TableTestFixture; diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index ba79d60bb..43460bdc8 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -20,6 +20,7 @@ mod append; mod snapshot; mod sort_order; +mod validate; use std::cmp::Ordering; use std::collections::HashMap; diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index a15e17f1d..96b0ffa75 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -29,12 +29,13 @@ use crate::spec::{ PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, Snapshot, SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, Summary, update_snapshot_summaries, }; +use crate::transaction::validate::SnapshotValidator; use crate::transaction::Transaction; use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; const META_ROOT_PATH: &str = "metadata"; -pub(crate) trait SnapshotProduceOperation: Send + Sync { +pub(crate) trait SnapshotProduceOperation: Send + SnapshotValidator + Sync { fn operation(&self) -> Operation; #[allow(unused)] fn delete_entries( @@ -307,6 +308,11 @@ impl<'a> SnapshotProduceAction<'a> { .await?; let next_seq_num = self.tx.current_table.metadata().next_sequence_number(); + snapshot_produce_operation.validate( + &self.tx.current_table, + self.tx.current_table.metadata().current_snapshot(), + ); + let summary = self .summary(&snapshot_produce_operation) .map_err(|err| { diff --git a/crates/iceberg/src/transaction/validate.rs b/crates/iceberg/src/transaction/validate.rs new file mode 100644 index 000000000..7416e4183 --- /dev/null +++ b/crates/iceberg/src/transaction/validate.rs @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use crate::spec::{ManifestContentType, ManifestFile, Operation, SnapshotRef, TableMetadata}; +use crate::table::Table; + +pub(crate) trait SnapshotValidator { + fn validate(&self, table: &Table, snapshot: Option<&SnapshotRef>) -> () {} + + #[allow(dead_code)] + async fn validation_history( + &self, + base: &Table, + to_snapshot: Option<&SnapshotRef>, + from_snapshot: Option<&SnapshotRef>, + matching_operations: HashSet, + manifest_content_type: ManifestContentType, + ) -> (Vec, HashSet) { + let mut manifests = vec![]; + let mut new_snapshots = HashSet::new(); + let mut last_snapshot: Option<&SnapshotRef> = None; + + let snapshots = Self::ancestors_between(to_snapshot, from_snapshot, base.metadata()); + for current_snapshot in &snapshots { + last_snapshot = Some(current_snapshot); + + if matching_operations.contains(¤t_snapshot.summary().operation) { + new_snapshots.insert(current_snapshot.snapshot_id().clone()); + current_snapshot + .load_manifest_list(base.file_io(), base.metadata()) + .await + .expect("Failed to load manifest list!") + .entries() + .into_iter() + .for_each(|manifest| { + if manifest.content == manifest_content_type + && manifest.added_snapshot_id == current_snapshot.snapshot_id() + { + manifests.push(manifest.clone()); + } + }); + } + } + + if last_snapshot.is_some() + && last_snapshot.unwrap().parent_snapshot_id() + != from_snapshot.map(|snapshot| snapshot.snapshot_id()) + { + panic!("Cannot determine history between starting snapshot {} and the last known ancestor {}", + from_snapshot.map_or_else( + || "None".to_string(), + |snapshot| snapshot.snapshot_id().to_string()), + last_snapshot.map_or_else( + || "None".to_string(), + |snapshot| snapshot.parent_snapshot_id().unwrap().to_string())); + } + + (manifests, new_snapshots) + } + + fn ancestors_between( + to_snapshot: Option<&SnapshotRef>, + from_snapshot: Option<&SnapshotRef>, + table_metadata: &TableMetadata, + ) -> Vec { + let mut snapshots = Vec::new(); + let mut current_snapshot = to_snapshot; + while let Some(snapshot) = current_snapshot { + snapshots.push(Arc::clone(&snapshot)); + match snapshot.parent_snapshot_id() { + Some(parent_snapshot_id) + if from_snapshot.is_some() + && parent_snapshot_id == from_snapshot.unwrap().snapshot_id() => + { + break + } + Some(parent_snapshot_id) => { + current_snapshot = table_metadata.snapshot_by_id(parent_snapshot_id) + } + None => break, + } + } + + snapshots + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use crate::spec::{ + DataContentType, DataFileBuilder, DataFileFormat, Literal, ManifestContentType, Operation, + SnapshotRef, Struct, + }; + use crate::transaction::tests::{make_v2_minimal_table, make_v2_table}; + use crate::transaction::validate::SnapshotValidator; + use crate::transaction::{Table, Transaction}; + use crate::TableUpdate; + + struct TestValidator {} + + impl SnapshotValidator for TestValidator {} + + async fn make_v2_table_with_updates() -> (Table, Vec) { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + let mut action = tx.fast_append(None, vec![]).unwrap(); + + let data_file_1 = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/1.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap(); + + let data_file_2 = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/2.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap(); + + action.add_data_files(vec![data_file_1.clone()]).unwrap(); + let tx = action.apply().await.unwrap(); + let mut action = tx.fast_append(None, vec![]).unwrap(); + action.add_data_files(vec![data_file_2.clone()]).unwrap(); + let tx = action.apply().await.unwrap(); + + (table.clone(), tx.updates) + } + + #[tokio::test] + async fn test_validation_history() { + let (table, updates) = make_v2_table_with_updates().await; + let parent_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + SnapshotRef::new(snapshot.clone()) + } else { + unreachable!() + }; + let current_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[2] { + SnapshotRef::new(snapshot.clone()) + } else { + unreachable!() + }; + + let test_validator = TestValidator {}; + + // specifying from_snapshot, validating up to the from_snapshot + let (manifests, snapshots) = test_validator + .validation_history( + &table, + Some(¤t_snapshot), + Some(&parent_snapshot), + HashSet::from([Operation::Append]), + ManifestContentType::Data, + ) + .await; + + manifests + .iter() + .for_each(|manifest| assert_eq!(manifest.content, ManifestContentType::Data)); + assert_eq!(snapshots.into_iter().collect::>(), vec![ + current_snapshot.snapshot_id() + ]); + } + + #[test] + fn test_ancestor_between() { + let table = make_v2_table(); + let current_snapshot = table.metadata().current_snapshot(); + let parent_snapshot_id = current_snapshot.unwrap().parent_snapshot_id().unwrap(); + let parent_snapshot = table.metadata().snapshot_by_id(parent_snapshot_id); + + // not specifying from_snapshot, listing all ancestors + let all_ancestors = + TestValidator::ancestors_between(current_snapshot, None, table.metadata()); + assert_eq!( + vec![ + current_snapshot.unwrap().snapshot_id(), + current_snapshot.unwrap().parent_snapshot_id().unwrap() + ], + all_ancestors + .iter() + .map(|snapshot| snapshot.snapshot_id()) + .collect::>() + ); + + // specifying from_snapshot, listing only 1 snapshot + let ancestors = + TestValidator::ancestors_between(current_snapshot, parent_snapshot, table.metadata()); + assert_eq!( + vec![current_snapshot.unwrap().snapshot_id()], + ancestors + .iter() + .map(|snapshot| snapshot.snapshot_id()) + .collect::>() + ); + } +} From 26b434a50a18e29970aaa5bebf5975f7bd4598f5 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 19 May 2025 15:42:58 -0700 Subject: [PATCH 2/6] fmt --- crates/iceberg/src/transaction/append.rs | 4 +--- crates/iceberg/src/transaction/snapshot.rs | 2 +- crates/iceberg/src/transaction/validate.rs | 24 +++++++++++++--------- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 504bcb464..fdfc15417 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -16,15 +16,13 @@ // under the License. use std::collections::{HashMap, HashSet}; -use std::sync::Arc; use arrow_array::StringArray; use futures::TryStreamExt; use uuid::Uuid; use crate::error::Result; -use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation, Snapshot, SnapshotRef}; -use crate::table::Table; +use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation}; use crate::transaction::Transaction; use crate::transaction::snapshot::{ DefaultManifestProcess, SnapshotProduceAction, SnapshotProduceOperation, diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 96b0ffa75..d10f43293 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -29,8 +29,8 @@ use crate::spec::{ PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, Snapshot, SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, Summary, update_snapshot_summaries, }; -use crate::transaction::validate::SnapshotValidator; use crate::transaction::Transaction; +use crate::transaction::validate::SnapshotValidator; use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; const META_ROOT_PATH: &str = "metadata"; diff --git a/crates/iceberg/src/transaction/validate.rs b/crates/iceberg/src/transaction/validate.rs index 7416e4183..7db8ba562 100644 --- a/crates/iceberg/src/transaction/validate.rs +++ b/crates/iceberg/src/transaction/validate.rs @@ -22,7 +22,7 @@ use crate::spec::{ManifestContentType, ManifestFile, Operation, SnapshotRef, Tab use crate::table::Table; pub(crate) trait SnapshotValidator { - fn validate(&self, table: &Table, snapshot: Option<&SnapshotRef>) -> () {} + fn validate(&self, _table: &Table, _snapshot: Option<&SnapshotRef>) -> () {} #[allow(dead_code)] async fn validation_history( @@ -63,13 +63,17 @@ pub(crate) trait SnapshotValidator { && last_snapshot.unwrap().parent_snapshot_id() != from_snapshot.map(|snapshot| snapshot.snapshot_id()) { - panic!("Cannot determine history between starting snapshot {} and the last known ancestor {}", - from_snapshot.map_or_else( - || "None".to_string(), - |snapshot| snapshot.snapshot_id().to_string()), - last_snapshot.map_or_else( - || "None".to_string(), - |snapshot| snapshot.parent_snapshot_id().unwrap().to_string())); + panic!( + "Cannot determine history between starting snapshot {} and the last known ancestor {}", + from_snapshot.map_or_else( + || "None".to_string(), + |snapshot| snapshot.snapshot_id().to_string() + ), + last_snapshot.map_or_else( + || "None".to_string(), + |snapshot| snapshot.parent_snapshot_id().unwrap().to_string() + ) + ); } (manifests, new_snapshots) @@ -89,7 +93,7 @@ pub(crate) trait SnapshotValidator { if from_snapshot.is_some() && parent_snapshot_id == from_snapshot.unwrap().snapshot_id() => { - break + break; } Some(parent_snapshot_id) => { current_snapshot = table_metadata.snapshot_by_id(parent_snapshot_id) @@ -106,6 +110,7 @@ pub(crate) trait SnapshotValidator { mod tests { use std::collections::HashSet; + use crate::TableUpdate; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Literal, ManifestContentType, Operation, SnapshotRef, Struct, @@ -113,7 +118,6 @@ mod tests { use crate::transaction::tests::{make_v2_minimal_table, make_v2_table}; use crate::transaction::validate::SnapshotValidator; use crate::transaction::{Table, Transaction}; - use crate::TableUpdate; struct TestValidator {} From 6a08af8bd0af7523146bd542c057057ef870d545 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 19 May 2025 15:45:25 -0700 Subject: [PATCH 3/6] minor --- crates/iceberg/src/lib.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 8e72ed07d..556ff3e02 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -1,6 +1,4 @@ -#![feature(let_chains)] // Licensed to the Apache Software Foundation (ASF) under one - // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file From fa109bc0f633ea0ea2da31aeeb53b2347293a34a Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 19 May 2025 15:53:11 -0700 Subject: [PATCH 4/6] minor --- crates/iceberg/src/transaction/validate.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/transaction/validate.rs b/crates/iceberg/src/transaction/validate.rs index 7db8ba562..51afee5d7 100644 --- a/crates/iceberg/src/transaction/validate.rs +++ b/crates/iceberg/src/transaction/validate.rs @@ -22,7 +22,7 @@ use crate::spec::{ManifestContentType, ManifestFile, Operation, SnapshotRef, Tab use crate::table::Table; pub(crate) trait SnapshotValidator { - fn validate(&self, _table: &Table, _snapshot: Option<&SnapshotRef>) -> () {} + fn validate(&self, _table: &Table, _snapshot: Option<&SnapshotRef>) {} #[allow(dead_code)] async fn validation_history( @@ -42,13 +42,13 @@ pub(crate) trait SnapshotValidator { last_snapshot = Some(current_snapshot); if matching_operations.contains(¤t_snapshot.summary().operation) { - new_snapshots.insert(current_snapshot.snapshot_id().clone()); + new_snapshots.insert(current_snapshot.snapshot_id()); current_snapshot .load_manifest_list(base.file_io(), base.metadata()) .await .expect("Failed to load manifest list!") .entries() - .into_iter() + .iter() .for_each(|manifest| { if manifest.content == manifest_content_type && manifest.added_snapshot_id == current_snapshot.snapshot_id() @@ -87,7 +87,7 @@ pub(crate) trait SnapshotValidator { let mut snapshots = Vec::new(); let mut current_snapshot = to_snapshot; while let Some(snapshot) = current_snapshot { - snapshots.push(Arc::clone(&snapshot)); + snapshots.push(Arc::clone(snapshot)); match snapshot.parent_snapshot_id() { Some(parent_snapshot_id) if from_snapshot.is_some() From 29a50fc17281b9450fa5c56203e5d13a1a3cea09 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 21 May 2025 16:48:39 -0700 Subject: [PATCH 5/6] remove validate usage --- crates/iceberg/src/transaction/snapshot.rs | 5 ----- crates/iceberg/src/transaction/validate.rs | 1 + 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index d10f43293..b9dbaf9f8 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -308,11 +308,6 @@ impl<'a> SnapshotProduceAction<'a> { .await?; let next_seq_num = self.tx.current_table.metadata().next_sequence_number(); - snapshot_produce_operation.validate( - &self.tx.current_table, - self.tx.current_table.metadata().current_snapshot(), - ); - let summary = self .summary(&snapshot_produce_operation) .map_err(|err| { diff --git a/crates/iceberg/src/transaction/validate.rs b/crates/iceberg/src/transaction/validate.rs index 51afee5d7..f36db3c8f 100644 --- a/crates/iceberg/src/transaction/validate.rs +++ b/crates/iceberg/src/transaction/validate.rs @@ -22,6 +22,7 @@ use crate::spec::{ManifestContentType, ManifestFile, Operation, SnapshotRef, Tab use crate::table::Table; pub(crate) trait SnapshotValidator { + #[allow(dead_code)] fn validate(&self, _table: &Table, _snapshot: Option<&SnapshotRef>) {} #[allow(dead_code)] From e329b10370325522e5124b5d6182dac426e2e02f Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 23 May 2025 14:39:12 -0700 Subject: [PATCH 6/6] remove Option for to_snapshot --- crates/iceberg/src/transaction/validate.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/crates/iceberg/src/transaction/validate.rs b/crates/iceberg/src/transaction/validate.rs index f36db3c8f..935f93af6 100644 --- a/crates/iceberg/src/transaction/validate.rs +++ b/crates/iceberg/src/transaction/validate.rs @@ -29,7 +29,7 @@ pub(crate) trait SnapshotValidator { async fn validation_history( &self, base: &Table, - to_snapshot: Option<&SnapshotRef>, + to_snapshot: &SnapshotRef, from_snapshot: Option<&SnapshotRef>, matching_operations: HashSet, manifest_content_type: ManifestContentType, @@ -80,13 +80,15 @@ pub(crate) trait SnapshotValidator { (manifests, new_snapshots) } + /// find ancestors in (from_snapshot, to_snapshot] + /// TODO: Return an iterator instead of a vector fn ancestors_between( - to_snapshot: Option<&SnapshotRef>, + to_snapshot: &SnapshotRef, from_snapshot: Option<&SnapshotRef>, table_metadata: &TableMetadata, ) -> Vec { let mut snapshots = Vec::new(); - let mut current_snapshot = to_snapshot; + let mut current_snapshot = Some(to_snapshot); while let Some(snapshot) = current_snapshot { snapshots.push(Arc::clone(snapshot)); match snapshot.parent_snapshot_id() { @@ -180,7 +182,7 @@ mod tests { let (manifests, snapshots) = test_validator .validation_history( &table, - Some(¤t_snapshot), + ¤t_snapshot, Some(&parent_snapshot), HashSet::from([Operation::Append]), ManifestContentType::Data, @@ -204,7 +206,7 @@ mod tests { // not specifying from_snapshot, listing all ancestors let all_ancestors = - TestValidator::ancestors_between(current_snapshot, None, table.metadata()); + TestValidator::ancestors_between(current_snapshot.unwrap(), None, table.metadata()); assert_eq!( vec![ current_snapshot.unwrap().snapshot_id(), @@ -218,7 +220,7 @@ mod tests { // specifying from_snapshot, listing only 1 snapshot let ancestors = - TestValidator::ancestors_between(current_snapshot, parent_snapshot, table.metadata()); + TestValidator::ancestors_between(current_snapshot.unwrap(), parent_snapshot, table.metadata()); assert_eq!( vec![current_snapshot.unwrap().snapshot_id()], ancestors