Skip to content

Add support for update_table() function in Sql Catalog #1356

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/catalog/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ typed-builder = { workspace = true }
uuid = { workspace = true, features = ["v4"] }

[dev-dependencies]
arrow-array = { version = "55" }
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
itertools = { workspace = true }
parquet = "55"
regex = "1.10.5"
sqlx = { version = "0.8.1", features = [
"tls-rustls",
Expand Down
179 changes: 172 additions & 7 deletions crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ use iceberg::io::FileIO;
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
use iceberg::{
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
TableIdent,
Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent,
};
use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers};
use sqlx::{Any, AnyPool, Row, Transaction};
Expand Down Expand Up @@ -769,27 +768,118 @@ impl Catalog for SqlCatalog {
Ok(())
}

async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
Err(Error::new(
ErrorKind::FeatureUnsupported,
"Updating a table is not supported yet",
))
async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
let table_ident = commit.identifier().clone();
if !self.table_exists(&table_ident).await? {
return no_such_table_err(&table_ident);
}
let current = self.load_table(&table_ident).await?;

// Extract requirements and updates
let requirements = commit.take_requirements();
let updates = commit.take_updates();

// Take each requirement and check against metadata.
for requirement in requirements {
requirement.check(Some(current.metadata()))?;
}

// Take existing metadata and apply each update.
let mut updater = TableMetadataBuilder::new_from_metadata(
current.metadata().clone(),
current.metadata_location().map(|s| s.to_string()),
);
for update in updates {
updater = update.apply(updater)?;
}
let updated_metadata_build = updater.build()?;
let updated_metadata = updated_metadata_build.metadata;

if updated_metadata == *current.metadata() {
println!("No-op commit");
return Ok(current);
}

// Generate a new metadata file location
let location = current.metadata().location();
let timestamp_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis();
let new_metadata_location = format!(
"{}/metadata/{}-{}.metadata.json",
location,
timestamp_ms,
Uuid::new_v4()
);

// Write the new metadata to the file
let file = self.fileio.new_output(&new_metadata_location)?;
file.write(serde_json::to_vec(&updated_metadata)?.into())
.await?;

// Update the catalog table with the new metadata location
let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
self.execute(
&format!(
"UPDATE {CATALOG_TABLE_NAME}
SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?,
{CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} = ?
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
AND {CATALOG_FIELD_TABLE_NAME} = ?
AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
AND (
{CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
)"
),
vec![
Some(&new_metadata_location),
current.metadata_location(),
Some(&self.name),
Some(table_ident.name()),
Some(&table_ident.namespace().join(".")),
],
Some(&mut tx),
)
.await?;

tx.commit().await.map_err(from_sqlx_error)?;

// Return the updated table
Ok(Table::builder()
.file_io(self.fileio.clone())
.identifier(table_ident.clone())
.metadata_location(new_metadata_location)
.metadata(updated_metadata)
.build()?)
}
}

#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::sync::Arc;

use arrow_array::{Int32Array, RecordBatch};
use iceberg::io::FileIOBuilder;
use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
use iceberg::table::Table;
use iceberg::transaction::Transaction;
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent};
use itertools::Itertools;
use parquet::file::properties::WriterProperties;
use regex::Regex;
use sqlx::migrate::MigrateDatabase;
use tempfile::TempDir;
use uuid::Uuid;

use crate::catalog::NAMESPACE_LOCATION_PROPERTY_KEY;
use crate::{SqlBindStyle, SqlCatalog, SqlCatalogConfig};
Expand Down Expand Up @@ -866,6 +956,47 @@ mod tests {
}
}

async fn add_rows(
table: &Table,
num_rows: u32,
) -> Result<Vec<iceberg::spec::DataFile>, Box<dyn std::error::Error>> {
let writer_properties = WriterProperties::builder().build();
let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
let file_name_generator = DefaultFileNameGenerator::new(
"datafile".to_string(),
None,
iceberg::spec::DataFileFormat::Parquet,
);
let parquet_writer_builder = ParquetWriterBuilder::new(
writer_properties,
table.metadata().current_schema().clone(),
table.file_io().clone(),
location_generator,
file_name_generator,
);
let file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0);
let mut data_file_writer = file_writer_builder.build().await.unwrap();

// Generate num_rows.
let schema = simple_table_schema();
let mut foo_array = Vec::<i32>::with_capacity(num_rows as usize);
for i in 0..num_rows {
foo_array.push(i as i32);
}

// Package into a single batch and write to Parquet file.
let batch = RecordBatch::try_new(
Arc::new(iceberg::arrow::schema_to_arrow_schema(&schema).unwrap()),
vec![Arc::new(Int32Array::from(foo_array))],
)
.unwrap();
data_file_writer.write(batch).await?;

let data_files = data_file_writer.close().await?;
assert_eq!(data_files.len(), 1);
Ok(data_files)
}

fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
assert_eq!(table.identifier(), expected_table_ident);

Expand Down Expand Up @@ -1779,4 +1910,38 @@ mod tests {
"Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
);
}

#[tokio::test]
async fn test_update_table_append() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc).await;
let namespace_ident = NamespaceIdent::new("a".into());
create_namespaces(&catalog, &vec![&namespace_ident]).await;
let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
create_table(&catalog, &table_ident).await;

assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
table_ident.clone()
],);

let table = &catalog.load_table(&table_ident).await.unwrap();

// Generate some rows to add to table.
let data_files = add_rows(table, 10).await.unwrap();

// Append to the table in a transaction.
let mut transaction = Transaction::new(table);
let mut fast_append = transaction
.fast_append(Some(Uuid::new_v4()), vec![])
.expect("Created");
let _ = fast_append.add_data_files(data_files);
transaction = fast_append.apply().await.expect("Appended");
let _ = transaction.commit(&catalog).await;

let table_v2 = &catalog.load_table(&table_ident).await.unwrap();
assert_eq!(
table.metadata().snapshots().len() + 1,
table_v2.metadata().snapshots().len()
);
}
}
Loading