Skip to content

feat(transaction): Remove current_table, updates, and requirements from Transaction #1451

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 35 additions & 32 deletions crates/iceberg/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,60 +45,55 @@ use crate::{Catalog, TableCommit, TableRequirement, TableUpdate};

/// Table transaction.
pub struct Transaction {
base_table: Table,
current_table: Table,
table: Table,
actions: Vec<BoxedTransactionAction>,
updates: Vec<TableUpdate>,
requirements: Vec<TableRequirement>,
}

impl Transaction {
/// Creates a new transaction.
pub fn new(table: &Table) -> Self {
Self {
base_table: table.clone(),
current_table: table.clone(),
table: table.clone(),
actions: vec![],
updates: vec![],
requirements: vec![],
}
}

fn update_table_metadata(&mut self, updates: &[TableUpdate]) -> Result<()> {
let mut metadata_builder = self.current_table.metadata().clone().into_builder(None);
fn update_table_metadata(&self, table: &mut Table, updates: &[TableUpdate]) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need self here?

let mut metadata_builder = table.metadata().clone().into_builder(None);
for update in updates {
metadata_builder = update.clone().apply(metadata_builder)?;
}

self.current_table
.with_metadata(Arc::new(metadata_builder.build()?.metadata));
table.with_metadata(Arc::new(metadata_builder.build()?.metadata));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this pr, and it's not mandatory. But I think it's better to change the with_metadata(mut self) to consume table rather than modify it.


Ok(())
}

fn apply(
&mut self,
&self,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

table: &mut Table,
updates: Vec<TableUpdate>,
requirements: Vec<TableRequirement>,
existing_updates: &mut Vec<TableUpdate>,
existing_requirements: &mut Vec<TableRequirement>,
) -> Result<()> {
for requirement in &requirements {
requirement.check(Some(self.current_table.metadata()))?;
requirement.check(Some(table.metadata()))?;
}

self.update_table_metadata(&updates)?;
self.update_table_metadata(table, &updates)?;

self.updates.extend(updates);
existing_updates.extend(updates);

// For the requirements, it does not make sense to add a requirement more than once
// For example, you cannot assert that the current schema has two different IDs
for new_requirement in requirements {
if self
.requirements
if existing_requirements
.iter()
.map(discriminant)
.all(|d| d != discriminant(&new_requirement))
{
self.requirements.push(new_requirement);
existing_requirements.push(new_requirement);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move this check to start of this method. Also I don't think we should filter here, we should return error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, I'm not sure if we actually need this check, maybe we could remove this for now?

}
}

Expand Down Expand Up @@ -129,8 +124,10 @@ impl Transaction {
}
};
let mut snapshot_id = generate_random_id();

// todo revisit this, this check won't be 100% valid because it's not checking the staging table
while self
.current_table
.table
.metadata()
.snapshots()
.any(|s| s.snapshot_id() == snapshot_id)
Copy link
Contributor Author

@CTTY CTTY Jun 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should move the generation of snapshot_id to SnapshotProducer::new. The catch of doing this: if retry happens, every attempt will have a different snapshot_id. I have not seen any side effects yet

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I also think we should move this to SnapshotProducer.

Expand All @@ -157,41 +154,47 @@ impl Transaction {

/// Commit transaction.
pub async fn commit(mut self, catalog: &dyn Catalog) -> Result<Table> {
if self.actions.is_empty() && self.updates.is_empty() {
if self.actions.is_empty() {
// nothing to commit
return Ok(self.base_table.clone());
return Ok(self.table.clone());
}

self.do_commit(catalog).await
}

async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result<Table> {
let base_table_identifier = self.base_table.identifier().to_owned();
let base_table_identifier = self.table.identifier().to_owned();

let refreshed = catalog.load_table(&base_table_identifier.clone()).await?;

if self.base_table.metadata() != refreshed.metadata()
|| self.base_table.metadata_location() != refreshed.metadata_location()
let mut existing_updates: Vec<TableUpdate> = vec![];
let mut existing_requirements: Vec<TableRequirement> = vec![];

if self.table.metadata() != refreshed.metadata()
|| self.table.metadata_location() != refreshed.metadata_location()
{
// current base is stale, use refreshed as base and re-apply transaction actions
self.base_table = refreshed.clone();
self.table = refreshed.clone();
}

let current_table = self.base_table.clone();
let mut current_table = self.table.clone();

for action in self.actions.clone() {
let mut action_commit = action.commit(&current_table).await?;
// apply changes to current_table
for action in &self.actions {
let mut action_commit = Arc::clone(action).commit(&current_table).await?;
// apply action commit to current_table
self.apply(
&mut current_table,
action_commit.take_updates(),
action_commit.take_requirements(),
&mut existing_updates,
&mut existing_requirements,
)?;
}

let table_commit = TableCommit::builder()
.ident(base_table_identifier)
.updates(self.updates.clone())
.requirements(self.requirements.clone())
.updates(existing_updates)
.requirements(existing_requirements)
.build();

catalog.update_table(table_commit).await
Expand Down
Loading