From 2ae7484f779ea7a1cfb58e04d91ade27abb0516f Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 17 Jun 2025 16:44:55 -0700 Subject: [PATCH 1/5] Remove current_table, updates, and requirements from Transaction --- crates/iceberg/src/transaction/mod.rs | 67 ++++++++++++++------------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index a2af9aaa3..6bc5e8fc6 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -45,60 +45,55 @@ use crate::{Catalog, TableCommit, TableRequirement, TableUpdate}; /// Table transaction. pub struct Transaction { - base_table: Table, - current_table: Table, + table: Table, actions: Vec, - updates: Vec, - requirements: Vec, } impl Transaction { /// Creates a new transaction. pub fn new(table: &Table) -> Self { Self { - base_table: table.clone(), - current_table: table.clone(), + table: table.clone(), actions: vec![], - updates: vec![], - requirements: vec![], } } - fn update_table_metadata(&mut self, updates: &[TableUpdate]) -> Result<()> { - let mut metadata_builder = self.current_table.metadata().clone().into_builder(None); + fn update_table_metadata(&self, table: &mut Table, updates: &[TableUpdate]) -> Result<()> { + let mut metadata_builder = table.metadata().clone().into_builder(None); for update in updates { metadata_builder = update.clone().apply(metadata_builder)?; } - self.current_table - .with_metadata(Arc::new(metadata_builder.build()?.metadata)); + table.with_metadata(Arc::new(metadata_builder.build()?.metadata)); Ok(()) } fn apply( - &mut self, + &self, + table: &mut Table, updates: Vec, requirements: Vec, + existing_updates: &mut Vec, + existing_requirements: &mut Vec, ) -> Result<()> { for requirement in &requirements { - requirement.check(Some(self.current_table.metadata()))?; + requirement.check(Some(table.metadata()))?; } - self.update_table_metadata(&updates)?; + self.update_table_metadata(table, &updates)?; - self.updates.extend(updates); + existing_updates.extend(updates); // For the requirements, it does not make sense to add a requirement more than once // For example, you cannot assert that the current schema has two different IDs for new_requirement in requirements { - if self - .requirements + if existing_requirements .iter() .map(discriminant) .all(|d| d != discriminant(&new_requirement)) { - self.requirements.push(new_requirement); + existing_requirements.push(new_requirement); } } @@ -129,8 +124,10 @@ impl Transaction { } }; let mut snapshot_id = generate_random_id(); + + // todo revisit this, this check won't be 100% valid because it's not checking the staging table while self - .current_table + .table .metadata() .snapshots() .any(|s| s.snapshot_id() == snapshot_id) @@ -157,41 +154,47 @@ impl Transaction { /// Commit transaction. pub async fn commit(mut self, catalog: &dyn Catalog) -> Result { - if self.actions.is_empty() && self.updates.is_empty() { + if self.actions.is_empty() { // nothing to commit - return Ok(self.base_table.clone()); + return Ok(self.table.clone()); } self.do_commit(catalog).await } async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result
{ - let base_table_identifier = self.base_table.identifier().to_owned(); + let base_table_identifier = self.table.identifier().to_owned(); let refreshed = catalog.load_table(&base_table_identifier.clone()).await?; - if self.base_table.metadata() != refreshed.metadata() - || self.base_table.metadata_location() != refreshed.metadata_location() + let mut existing_updates: Vec = vec![]; + let mut existing_requirements: Vec = vec![]; + + if self.table.metadata() != refreshed.metadata() + || self.table.metadata_location() != refreshed.metadata_location() { // current base is stale, use refreshed as base and re-apply transaction actions - self.base_table = refreshed.clone(); + self.table = refreshed.clone(); } - let current_table = self.base_table.clone(); + let mut current_table = self.table.clone(); - for action in self.actions.clone() { - let mut action_commit = action.commit(¤t_table).await?; - // apply changes to current_table + for action in &self.actions { + let mut action_commit = Arc::clone(action).commit(¤t_table).await?; + // apply action commit to current_table self.apply( + &mut current_table, action_commit.take_updates(), action_commit.take_requirements(), + &mut existing_updates, + &mut existing_requirements, )?; } let table_commit = TableCommit::builder() .ident(base_table_identifier) - .updates(self.updates.clone()) - .requirements(self.requirements.clone()) + .updates(existing_updates) + .requirements(existing_requirements) .build(); catalog.update_table(table_commit).await From c4833f45c7de3729ccfa3e7c798ea71d5fc500c6 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 20 Jun 2025 14:49:56 -0700 Subject: [PATCH 2/5] Move snapshot_id generation to SnapshotProducer, fix tx.apply --- crates/iceberg/src/transaction/append.rs | 6 +-- crates/iceberg/src/transaction/mod.rs | 63 ++++------------------ crates/iceberg/src/transaction/snapshot.rs | 26 ++++++++- 3 files changed, 37 insertions(+), 58 deletions(-) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 5195b99a3..9ecbb54d9 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -33,7 +33,6 @@ use crate::transaction::{ActionCommit, TransactionAction}; pub struct FastAppendAction { check_duplicate: bool, // below are properties used to create SnapshotProducer when commit - snapshot_id: i64, commit_uuid: Option, key_metadata: Option>, snapshot_properties: HashMap, @@ -41,10 +40,9 @@ pub struct FastAppendAction { } impl FastAppendAction { - pub(crate) fn new(snapshot_id: i64) -> Self { + pub(crate) fn new() -> Self { Self { check_duplicate: true, - snapshot_id, commit_uuid: None, key_metadata: None, snapshot_properties: HashMap::default(), @@ -95,7 +93,7 @@ impl TransactionAction for FastAppendAction { } let snapshot_producer = SnapshotProducer::new( - self.snapshot_id, + table, self.commit_uuid.unwrap_or_else(Uuid::now_v7), self.key_metadata.clone(), self.snapshot_properties.clone(), diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 6bc5e8fc6..8c4c3e1cb 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -28,11 +28,8 @@ mod update_location; mod update_properties; mod upgrade_format_version; -use std::mem::discriminant; use std::sync::Arc; -use uuid::Uuid; - use crate::error::Result; use crate::table::Table; use crate::transaction::action::BoxedTransactionAction; @@ -58,7 +55,7 @@ impl Transaction { } } - fn update_table_metadata(&self, table: &mut Table, updates: &[TableUpdate]) -> Result<()> { + fn update_table_metadata(table: &mut Table, updates: &[TableUpdate]) -> Result<()> { let mut metadata_builder = table.metadata().clone().into_builder(None); for update in updates { metadata_builder = update.clone().apply(metadata_builder)?; @@ -70,35 +67,22 @@ impl Transaction { } fn apply( - &self, table: &mut Table, - updates: Vec, - requirements: Vec, + mut action_commit: ActionCommit, existing_updates: &mut Vec, existing_requirements: &mut Vec, ) -> Result<()> { + let updates = action_commit.take_updates(); + let requirements = action_commit.take_requirements(); + for requirement in &requirements { requirement.check(Some(table.metadata()))?; } - self.update_table_metadata(table, &updates)?; + Self::update_table_metadata(table, &updates)?; existing_updates.extend(updates); - - // For the requirements, it does not make sense to add a requirement more than once - // For example, you cannot assert that the current schema has two different IDs - for new_requirement in requirements { - if existing_requirements - .iter() - .map(discriminant) - .all(|d| d != discriminant(&new_requirement)) - { - existing_requirements.push(new_requirement); - } - } - - // # TODO - // Support auto commit later. + existing_requirements.extend(requirements); Ok(()) } @@ -113,33 +97,9 @@ impl Transaction { UpdatePropertiesAction::new() } - fn generate_unique_snapshot_id(&self) -> i64 { - let generate_random_id = || -> i64 { - let (lhs, rhs) = Uuid::new_v4().as_u64_pair(); - let snapshot_id = (lhs ^ rhs) as i64; - if snapshot_id < 0 { - -snapshot_id - } else { - snapshot_id - } - }; - let mut snapshot_id = generate_random_id(); - - // todo revisit this, this check won't be 100% valid because it's not checking the staging table - while self - .table - .metadata() - .snapshots() - .any(|s| s.snapshot_id() == snapshot_id) - { - snapshot_id = generate_random_id(); - } - snapshot_id - } - /// Creates a fast append action. pub fn fast_append(&self) -> FastAppendAction { - FastAppendAction::new(self.generate_unique_snapshot_id()) + FastAppendAction::new() } /// Creates replace sort order action. @@ -180,12 +140,11 @@ impl Transaction { let mut current_table = self.table.clone(); for action in &self.actions { - let mut action_commit = Arc::clone(action).commit(¤t_table).await?; + let action_commit = Arc::clone(action).commit(¤t_table).await?; // apply action commit to current_table - self.apply( + Self::apply( &mut current_table, - action_commit.take_updates(), - action_commit.take_requirements(), + action_commit, &mut existing_updates, &mut existing_requirements, )?; diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 80a9f68a1..16b94b088 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -74,14 +74,14 @@ pub(crate) struct SnapshotProducer { impl SnapshotProducer { pub(crate) fn new( - snapshot_id: i64, + table: &Table, commit_uuid: Uuid, key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, ) -> Self { Self { - snapshot_id, + snapshot_id: Self::generate_unique_snapshot_id(table), commit_uuid, key_metadata, snapshot_properties, @@ -155,6 +155,28 @@ impl SnapshotProducer { Ok(()) } + fn generate_unique_snapshot_id(table: &Table) -> i64 { + let generate_random_id = || -> i64 { + let (lhs, rhs) = Uuid::new_v4().as_u64_pair(); + let snapshot_id = (lhs ^ rhs) as i64; + if snapshot_id < 0 { + -snapshot_id + } else { + snapshot_id + } + }; + let mut snapshot_id = generate_random_id(); + + while table + .metadata() + .snapshots() + .any(|s| s.snapshot_id() == snapshot_id) + { + snapshot_id = generate_random_id(); + } + snapshot_id + } + fn new_manifest_output(&mut self, table: &Table) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", From b2c49aee15bb2304d96c01d0b8e2faa8649c6e43 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 20 Jun 2025 14:59:16 -0700 Subject: [PATCH 3/5] fmt --- crates/iceberg/src/transaction/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 8c4c3e1cb..952b9f162 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -74,7 +74,7 @@ impl Transaction { ) -> Result<()> { let updates = action_commit.take_updates(); let requirements = action_commit.take_requirements(); - + for requirement in &requirements { requirement.check(Some(table.metadata()))?; } From b12f39b65a5fe6d15245bebc74465c928fbf1b88 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 20 Jun 2025 15:16:03 -0700 Subject: [PATCH 4/5] make tx.apply great again --- crates/iceberg/src/table.rs | 4 +++- crates/iceberg/src/transaction/mod.rs | 20 ++++++++++---------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index d94c2b1a7..f601dacbc 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -162,8 +162,10 @@ pub struct Table { } impl Table { - pub(crate) fn with_metadata(&mut self, metadata: TableMetadataRef) { + /// Sets the [`Table`] metadata and returns an updated instance with the new metadata applied. + pub(crate) fn with_metadata(mut self, metadata: TableMetadataRef) -> Self { self.metadata = metadata; + self } /// Returns a TableBuilder to build a table diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 952b9f162..5487d77d7 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -55,23 +55,23 @@ impl Transaction { } } - fn update_table_metadata(table: &mut Table, updates: &[TableUpdate]) -> Result<()> { + fn update_table_metadata(table: Table, updates: &[TableUpdate]) -> Result
{ let mut metadata_builder = table.metadata().clone().into_builder(None); for update in updates { metadata_builder = update.clone().apply(metadata_builder)?; } - table.with_metadata(Arc::new(metadata_builder.build()?.metadata)); - - Ok(()) + Ok(table.with_metadata(Arc::new(metadata_builder.build()?.metadata))) } + /// Applies an [`ActionCommit`] to the given [`Table`], returning a new [`Table`] with updated metadata. + /// Also appends any derived [`TableUpdate`]s and [`TableRequirement`]s to the provided vectors. fn apply( - table: &mut Table, + table: Table, mut action_commit: ActionCommit, existing_updates: &mut Vec, existing_requirements: &mut Vec, - ) -> Result<()> { + ) -> Result
{ let updates = action_commit.take_updates(); let requirements = action_commit.take_requirements(); @@ -79,12 +79,12 @@ impl Transaction { requirement.check(Some(table.metadata()))?; } - Self::update_table_metadata(table, &updates)?; + let updated_table = Self::update_table_metadata(table, &updates)?; existing_updates.extend(updates); existing_requirements.extend(requirements); - Ok(()) + Ok(updated_table) } /// Sets table to a new version. @@ -142,8 +142,8 @@ impl Transaction { for action in &self.actions { let action_commit = Arc::clone(action).commit(¤t_table).await?; // apply action commit to current_table - Self::apply( - &mut current_table, + current_table = Self::apply( + current_table, action_commit, &mut existing_updates, &mut existing_requirements, From 5f51d831c0a4fde0de759ce73403c585a4c7d657 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 20 Jun 2025 15:19:43 -0700 Subject: [PATCH 5/5] minor --- crates/iceberg/src/transaction/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 5487d77d7..5979ef365 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -127,9 +127,6 @@ impl Transaction { let refreshed = catalog.load_table(&base_table_identifier.clone()).await?; - let mut existing_updates: Vec = vec![]; - let mut existing_requirements: Vec = vec![]; - if self.table.metadata() != refreshed.metadata() || self.table.metadata_location() != refreshed.metadata_location() { @@ -138,6 +135,8 @@ impl Transaction { } let mut current_table = self.table.clone(); + let mut existing_updates: Vec = vec![]; + let mut existing_requirements: Vec = vec![]; for action in &self.actions { let action_commit = Arc::clone(action).commit(¤t_table).await?;