-
Notifications
You must be signed in to change notification settings - Fork 276
feat(transaction): Remove current_table, updates, and requirements from Transaction #1451
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,60 +45,55 @@ use crate::{Catalog, TableCommit, TableRequirement, TableUpdate}; | |
|
||
/// Table transaction. | ||
pub struct Transaction { | ||
base_table: Table, | ||
current_table: Table, | ||
table: Table, | ||
actions: Vec<BoxedTransactionAction>, | ||
updates: Vec<TableUpdate>, | ||
requirements: Vec<TableRequirement>, | ||
} | ||
|
||
impl Transaction { | ||
/// Creates a new transaction. | ||
pub fn new(table: &Table) -> Self { | ||
Self { | ||
base_table: table.clone(), | ||
current_table: table.clone(), | ||
table: table.clone(), | ||
actions: vec![], | ||
updates: vec![], | ||
requirements: vec![], | ||
} | ||
} | ||
|
||
fn update_table_metadata(&mut self, updates: &[TableUpdate]) -> Result<()> { | ||
let mut metadata_builder = self.current_table.metadata().clone().into_builder(None); | ||
fn update_table_metadata(&self, table: &mut Table, updates: &[TableUpdate]) -> Result<()> { | ||
let mut metadata_builder = table.metadata().clone().into_builder(None); | ||
for update in updates { | ||
metadata_builder = update.clone().apply(metadata_builder)?; | ||
} | ||
|
||
self.current_table | ||
.with_metadata(Arc::new(metadata_builder.build()?.metadata)); | ||
table.with_metadata(Arc::new(metadata_builder.build()?.metadata)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not related to this pr, and it's not mandatory. But I think it's better to change the |
||
|
||
Ok(()) | ||
} | ||
|
||
fn apply( | ||
&mut self, | ||
&self, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto. |
||
table: &mut Table, | ||
updates: Vec<TableUpdate>, | ||
requirements: Vec<TableRequirement>, | ||
existing_updates: &mut Vec<TableUpdate>, | ||
existing_requirements: &mut Vec<TableRequirement>, | ||
) -> Result<()> { | ||
for requirement in &requirements { | ||
requirement.check(Some(self.current_table.metadata()))?; | ||
requirement.check(Some(table.metadata()))?; | ||
} | ||
|
||
self.update_table_metadata(&updates)?; | ||
self.update_table_metadata(table, &updates)?; | ||
|
||
self.updates.extend(updates); | ||
existing_updates.extend(updates); | ||
|
||
// For the requirements, it does not make sense to add a requirement more than once | ||
// For example, you cannot assert that the current schema has two different IDs | ||
for new_requirement in requirements { | ||
if self | ||
.requirements | ||
if existing_requirements | ||
.iter() | ||
.map(discriminant) | ||
.all(|d| d != discriminant(&new_requirement)) | ||
{ | ||
self.requirements.push(new_requirement); | ||
existing_requirements.push(new_requirement); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should move this check to start of this method. Also I don't think we should filter here, we should return error. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In fact, I'm not sure if we actually need this check, maybe we could remove this for now? |
||
} | ||
} | ||
|
||
|
@@ -129,8 +124,10 @@ impl Transaction { | |
} | ||
}; | ||
let mut snapshot_id = generate_random_id(); | ||
|
||
// todo revisit this, this check won't be 100% valid because it's not checking the staging table | ||
while self | ||
.current_table | ||
.table | ||
.metadata() | ||
.snapshots() | ||
.any(|s| s.snapshot_id() == snapshot_id) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we should move the generation of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1, I also think we should move this to |
||
|
@@ -157,41 +154,47 @@ impl Transaction { | |
|
||
/// Commit transaction. | ||
pub async fn commit(mut self, catalog: &dyn Catalog) -> Result<Table> { | ||
if self.actions.is_empty() && self.updates.is_empty() { | ||
if self.actions.is_empty() { | ||
// nothing to commit | ||
return Ok(self.base_table.clone()); | ||
return Ok(self.table.clone()); | ||
} | ||
|
||
self.do_commit(catalog).await | ||
} | ||
|
||
async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result<Table> { | ||
let base_table_identifier = self.base_table.identifier().to_owned(); | ||
let base_table_identifier = self.table.identifier().to_owned(); | ||
|
||
let refreshed = catalog.load_table(&base_table_identifier.clone()).await?; | ||
|
||
if self.base_table.metadata() != refreshed.metadata() | ||
|| self.base_table.metadata_location() != refreshed.metadata_location() | ||
let mut existing_updates: Vec<TableUpdate> = vec![]; | ||
let mut existing_requirements: Vec<TableRequirement> = vec![]; | ||
|
||
if self.table.metadata() != refreshed.metadata() | ||
|| self.table.metadata_location() != refreshed.metadata_location() | ||
{ | ||
// current base is stale, use refreshed as base and re-apply transaction actions | ||
self.base_table = refreshed.clone(); | ||
self.table = refreshed.clone(); | ||
} | ||
|
||
let current_table = self.base_table.clone(); | ||
let mut current_table = self.table.clone(); | ||
|
||
for action in self.actions.clone() { | ||
let mut action_commit = action.commit(¤t_table).await?; | ||
// apply changes to current_table | ||
for action in &self.actions { | ||
let mut action_commit = Arc::clone(action).commit(¤t_table).await?; | ||
// apply action commit to current_table | ||
self.apply( | ||
&mut current_table, | ||
action_commit.take_updates(), | ||
action_commit.take_requirements(), | ||
&mut existing_updates, | ||
&mut existing_requirements, | ||
)?; | ||
} | ||
|
||
let table_commit = TableCommit::builder() | ||
.ident(base_table_identifier) | ||
.updates(self.updates.clone()) | ||
.requirements(self.requirements.clone()) | ||
.updates(existing_updates) | ||
.requirements(existing_requirements) | ||
.build(); | ||
|
||
catalog.update_table(table_commit).await | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need
self
here?