From 333c65f6650cf94cb548bcadbf4d9c705899bdd0 Mon Sep 17 00:00:00 2001 From: DerGut Date: Sat, 3 May 2025 22:47:12 +0200 Subject: [PATCH 1/9] Split out load_table util to make it reusable with locked namespace states Signed-off-by: DerGut --- crates/catalog/memory/src/catalog.rs | 32 +++++++++++++++++----------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index cf4ad7216..17dcba9d0 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -20,7 +20,7 @@ use std::collections::HashMap; use async_trait::async_trait; -use futures::lock::Mutex; +use futures::lock::{Mutex, MutexGuard}; use iceberg::io::FileIO; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; @@ -53,6 +53,23 @@ impl MemoryCatalog { warehouse_location, } } + + /// Loads a table from the locked namespace state. + async fn load_table_from_locked_namespace_state( + &self, + table_ident: &TableIdent, + root_namespace_state: &MutexGuard<'_, NamespaceState>, + ) -> Result { + let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?; + let metadata = self.read_metadata(&metadata_location).await?; + + Table::builder() + .identifier(table_ident.clone()) + .metadata(metadata) + .metadata_location(metadata_location.to_string()) + .file_io(self.file_io.clone()) + .build() + } } #[async_trait] @@ -223,17 +240,8 @@ impl Catalog for MemoryCatalog { async fn load_table(&self, table_ident: &TableIdent) -> Result
{ let root_namespace_state = self.root_namespace_state.lock().await; - let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?; - let input_file = self.file_io.new_input(metadata_location)?; - let metadata_content = input_file.read().await?; - let metadata = serde_json::from_slice::(&metadata_content)?; - - Table::builder() - .file_io(self.file_io.clone()) - .metadata_location(metadata_location.clone()) - .metadata(metadata) - .identifier(table_ident.clone()) - .build() + self.load_table_from_locked_namespace_state(table_ident, &root_namespace_state) + .await } /// Drop a table from the catalog. From 7cc0d6005afb7762848e7e1d67769a3ebd176f10 Mon Sep 17 00:00:00 2001 From: DerGut Date: Sat, 3 May 2025 22:49:21 +0200 Subject: [PATCH 2/9] Factor metadata writing out into util function Signed-off-by: DerGut --- crates/catalog/memory/src/catalog.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index 17dcba9d0..86b67763c 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -70,6 +70,17 @@ impl MemoryCatalog { .file_io(self.file_io.clone()) .build() } + + async fn write_metadata( + &self, + metadata: &TableMetadata, + metadata_location: &str, + ) -> Result<()> { + self.file_io + .new_output(metadata_location)? + .write(serde_json::to_vec(metadata)?.into()) + .await + } } #[async_trait] @@ -221,10 +232,7 @@ impl Catalog for MemoryCatalog { Uuid::new_v4() ); - self.file_io - .new_output(&metadata_location)? - .write(serde_json::to_vec(&metadata)?.into()) - .await?; + self.write_metadata(&metadata, &metadata_location).await?; root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?; From cd5dd5e73f8a92610ae52fda02ab81a2dc3ffad9 Mon Sep 17 00:00:00 2001 From: DerGut Date: Sat, 3 May 2025 23:05:15 +0200 Subject: [PATCH 3/9] Implement MemoryCatalog::update_table Signed-off-by: DerGut --- crates/catalog/memory/src/catalog.rs | 159 +++++++++++++++++-- crates/catalog/memory/src/namespace_state.rs | 19 +++ crates/iceberg/src/error.rs | 6 + 3 files changed, 173 insertions(+), 11 deletions(-) diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index 86b67763c..753c83ccb 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -26,7 +26,7 @@ use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, - TableIdent, + TableIdent, TableUpdate, }; use itertools::Itertools; use uuid::Uuid; @@ -61,7 +61,7 @@ impl MemoryCatalog { root_namespace_state: &MutexGuard<'_, NamespaceState>, ) -> Result
{ let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?; - let metadata = self.read_metadata(&metadata_location).await?; + let metadata = self.read_metadata(metadata_location).await?; Table::builder() .identifier(table_ident.clone()) @@ -71,6 +71,30 @@ impl MemoryCatalog { .build() } + async fn update_table(&self, table: &Table, updates: Vec) -> Result
{ + let (new_metadata, new_metadata_location) = apply_table_updates(table, updates)?; + + self.write_metadata(&new_metadata, &new_metadata_location) + .await?; + + let new_table = Table::builder() + .identifier(table.identifier().clone()) + .metadata(new_metadata) + .metadata_location(new_metadata_location.to_string()) + .file_io(self.file_io.clone()) + .build()?; + + Ok(new_table) + } + + async fn read_metadata(&self, location: &str) -> Result { + let input_file = self.file_io.new_input(location)?; + let metadata_content = input_file.read().await?; + let metadata = serde_json::from_slice::(&metadata_content)?; + + Ok(metadata) + } + async fn write_metadata( &self, metadata: &TableMetadata, @@ -286,23 +310,110 @@ impl Catalog for MemoryCatalog { Ok(()) } - /// Update a table to the catalog. - async fn update_table(&self, _commit: TableCommit) -> Result
{ - Err(Error::new( - ErrorKind::FeatureUnsupported, - "MemoryCatalog does not currently support updating tables.", - )) + /// Update a table in the catalog. + async fn update_table(&self, mut commit: TableCommit) -> Result
{ + let mut root_namespace_state = self.root_namespace_state.lock().await; + + let current_table = self + .load_table_from_locked_namespace_state(commit.identifier(), &root_namespace_state) + .await?; + + for requirement in commit.take_requirements() { + requirement.check(Some(current_table.metadata()))?; + } + + let updated_table = self + .update_table(¤t_table, commit.take_updates()) + .await?; + + root_namespace_state.update_table( + updated_table.identifier(), + updated_table + .metadata_location() + .ok_or(empty_metadata_location_err(&updated_table))? + .to_string(), + )?; + + Ok(updated_table) } } +fn apply_table_updates( + table: &Table, + updates: Vec, +) -> Result<(TableMetadata, String)> { + let metadata_location = table + .metadata_location() + .ok_or(empty_metadata_location_err(table))?; + + let mut builder = TableMetadataBuilder::new_from_metadata( + table.metadata().clone(), + Some(metadata_location.to_string()), + ); + + for update in updates { + builder = update.apply(builder)?; + } + + let new_metadata_location = bump_metadata_version(metadata_location)?; + + Ok((builder.build()?.metadata, new_metadata_location)) +} + +fn empty_metadata_location_err(table: &Table) -> Error { + Error::new( + ErrorKind::DataInvalid, + format!("Table metadata location is not set: {}", table.identifier()), + ) +} +/// Parses a metadata location of format `/metadata/-.metadata.json`, +/// increments the version and generates a new UUID. +/// It returns an error if the format is invalid. +fn bump_metadata_version(metadata_location: &str) -> Result { + let (path, file_name) = metadata_location.rsplit_once('/').ok_or(Error::new( + ErrorKind::Unexpected, + format!("Invalid metadata location: {}", metadata_location), + ))?; + + let prefix = path.strip_suffix("/metadata").ok_or(Error::new( + ErrorKind::Unexpected, + format!( + "Metadata location not under /metadata/ subdirectory: {}", + metadata_location + ), + ))?; + + let (version, _id) = file_name + .strip_suffix(".metadata.json") + .ok_or(Error::new( + ErrorKind::Unexpected, + format!("Invalid metadata file ending: {}", file_name), + ))? + .split_once('-') + .ok_or(Error::new( + ErrorKind::Unexpected, + format!("Invalid metadata file name: {}", file_name), + ))?; + + let new_version = version.parse::()? + 1; + let new_id = Uuid::new_v4(); + + Ok(format!( + "{}/metadata/{}-{}.metadata.json", + prefix, new_version, new_id + )) +} + #[cfg(test)] mod tests { use std::collections::HashSet; use std::hash::Hash; use std::iter::FromIterator; + use std::vec; use iceberg::io::FileIOBuilder; use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; + use iceberg::transaction::Transaction; use regex::Regex; use tempfile::TempDir; @@ -348,8 +459,8 @@ mod tests { .unwrap() } - async fn create_table(catalog: &C, table_ident: &TableIdent) { - let _ = catalog + async fn create_table(catalog: &C, table_ident: &TableIdent) -> Table { + catalog .create_table( &table_ident.namespace, TableCreation::builder() @@ -358,7 +469,7 @@ mod tests { .build(), ) .await - .unwrap(); + .unwrap() } async fn create_tables(catalog: &C, table_idents: Vec<&TableIdent>) { @@ -1694,4 +1805,30 @@ mod tests { ), ); } + + #[tokio::test] + async fn test_update_table() { + let catalog = new_memory_catalog(); + + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + let table_ident = TableIdent::new(namespace_ident, "test".to_string()); + let table = create_table(&catalog, &table_ident).await; + + // Assert the table doesn't contain the update yet + assert!(!table.metadata().properties().contains_key("key")); + + // Update table metadata + let updated_table = Transaction::new(&table) + .set_properties(HashMap::from([("key".to_string(), "value".to_string())])) + .unwrap() + .commit(&catalog) + .await + .unwrap(); + + assert_eq!( + updated_table.metadata().properties().get("key").unwrap(), + "value" + ); + } } diff --git a/crates/catalog/memory/src/namespace_state.rs b/crates/catalog/memory/src/namespace_state.rs index de1532203..a90aa850a 100644 --- a/crates/catalog/memory/src/namespace_state.rs +++ b/crates/catalog/memory/src/namespace_state.rs @@ -295,4 +295,23 @@ impl NamespaceState { Some(metadata_location) => Ok(metadata_location), } } + + /// Updates the metadata location of the given table or returns an error if doesn't exist + pub(crate) fn update_table( + &mut self, + table_ident: &TableIdent, + new_metadata_location: String, + ) -> Result<()> { + let namespace = self.get_mut_namespace(table_ident.namespace())?; + + let _ = namespace + .table_metadata_locations + .insert(table_ident.name().to_string(), new_metadata_location) + .ok_or(Error::new( + ErrorKind::Unexpected, + format!("No such table: {:?}", table_ident), + ))?; + + Ok(()) + } } diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index 1eef1bcc4..dd07bea57 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -282,6 +282,12 @@ define_from_err!( "handling invalid utf-8 characters" ); +define_from_err!( + core::num::ParseIntError, + ErrorKind::Unexpected, + "parsing integer from string" +); + define_from_err!( std::array::TryFromSliceError, ErrorKind::DataInvalid, From 13b4aa43b8faf702fef53fa51598da286ff3b6f6 Mon Sep 17 00:00:00 2001 From: DerGut Date: Sun, 4 May 2025 00:25:52 +0200 Subject: [PATCH 4/9] Add MetadataLocation struct Signed-off-by: DerGut --- crates/catalog/memory/src/catalog.rs | 96 +++------ crates/catalog/memory/src/namespace_state.rs | 201 ++++++++++++++++++- 2 files changed, 217 insertions(+), 80 deletions(-) diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index 753c83ccb..d5342a2d7 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -18,6 +18,7 @@ //! This module contains memory catalog implementation. use std::collections::HashMap; +use std::str::FromStr; use async_trait::async_trait; use futures::lock::{Mutex, MutexGuard}; @@ -29,9 +30,8 @@ use iceberg::{ TableIdent, TableUpdate, }; use itertools::Itertools; -use uuid::Uuid; -use crate::namespace_state::NamespaceState; +use crate::namespace_state::{MetadataLocation, NamespaceState}; /// namespace `location` property const LOCATION: &str = "location"; @@ -71,7 +71,11 @@ impl MemoryCatalog { .build() } - async fn update_table(&self, table: &Table, updates: Vec) -> Result
{ + async fn update_table( + &self, + table: &Table, + updates: Vec, + ) -> Result<(Table, MetadataLocation)> { let (new_metadata, new_metadata_location) = apply_table_updates(table, updates)?; self.write_metadata(&new_metadata, &new_metadata_location) @@ -84,11 +88,11 @@ impl MemoryCatalog { .file_io(self.file_io.clone()) .build()?; - Ok(new_table) + Ok((new_table, new_metadata_location)) } - async fn read_metadata(&self, location: &str) -> Result { - let input_file = self.file_io.new_input(location)?; + async fn read_metadata(&self, location: &MetadataLocation) -> Result { + let input_file = self.file_io.new_input(location.to_string())?; let metadata_content = input_file.read().await?; let metadata = serde_json::from_slice::(&metadata_content)?; @@ -98,10 +102,10 @@ impl MemoryCatalog { async fn write_metadata( &self, metadata: &TableMetadata, - metadata_location: &str, + metadata_location: &MetadataLocation, ) -> Result<()> { self.file_io - .new_output(metadata_location)? + .new_output(metadata_location.to_string())? .write(serde_json::to_vec(metadata)?.into()) .await } @@ -249,12 +253,7 @@ impl Catalog for MemoryCatalog { let metadata = TableMetadataBuilder::from_table_creation(table_creation)? .build()? .metadata; - let metadata_location = format!( - "{}/metadata/{}-{}.metadata.json", - &location, - 0, - Uuid::new_v4() - ); + let metadata_location = MetadataLocation::new(&location); self.write_metadata(&metadata, &metadata_location).await?; @@ -262,7 +261,7 @@ impl Catalog for MemoryCatalog { Table::builder() .file_io(self.file_io.clone()) - .metadata_location(metadata_location) + .metadata_location(metadata_location.to_string()) .metadata(metadata) .identifier(table_ident) .build() @@ -281,7 +280,7 @@ impl Catalog for MemoryCatalog { let mut root_namespace_state = self.root_namespace_state.lock().await; let metadata_location = root_namespace_state.remove_existing_table(table_ident)?; - self.file_io.delete(&metadata_location).await + self.file_io.delete(metadata_location.to_string()).await } /// Check if a table exists in the catalog. @@ -322,17 +321,11 @@ impl Catalog for MemoryCatalog { requirement.check(Some(current_table.metadata()))?; } - let updated_table = self + let (updated_table, new_metadata_location) = self .update_table(¤t_table, commit.take_updates()) .await?; - root_namespace_state.update_table( - updated_table.identifier(), - updated_table - .metadata_location() - .ok_or(empty_metadata_location_err(&updated_table))? - .to_string(), - )?; + root_namespace_state.update_table(updated_table.identifier(), new_metadata_location)?; Ok(updated_table) } @@ -341,10 +334,11 @@ impl Catalog for MemoryCatalog { fn apply_table_updates( table: &Table, updates: Vec, -) -> Result<(TableMetadata, String)> { - let metadata_location = table - .metadata_location() - .ok_or(empty_metadata_location_err(table))?; +) -> Result<(TableMetadata, MetadataLocation)> { + let metadata_location = table.metadata_location().ok_or(Error::new( + ErrorKind::DataInvalid, + format!("Table metadata location is not set: {}", table.identifier()), + ))?; let mut builder = TableMetadataBuilder::new_from_metadata( table.metadata().clone(), @@ -355,55 +349,11 @@ fn apply_table_updates( builder = update.apply(builder)?; } - let new_metadata_location = bump_metadata_version(metadata_location)?; + let new_metadata_location = MetadataLocation::from_str(metadata_location)?.with_next_version(); Ok((builder.build()?.metadata, new_metadata_location)) } -fn empty_metadata_location_err(table: &Table) -> Error { - Error::new( - ErrorKind::DataInvalid, - format!("Table metadata location is not set: {}", table.identifier()), - ) -} -/// Parses a metadata location of format `/metadata/-.metadata.json`, -/// increments the version and generates a new UUID. -/// It returns an error if the format is invalid. -fn bump_metadata_version(metadata_location: &str) -> Result { - let (path, file_name) = metadata_location.rsplit_once('/').ok_or(Error::new( - ErrorKind::Unexpected, - format!("Invalid metadata location: {}", metadata_location), - ))?; - - let prefix = path.strip_suffix("/metadata").ok_or(Error::new( - ErrorKind::Unexpected, - format!( - "Metadata location not under /metadata/ subdirectory: {}", - metadata_location - ), - ))?; - - let (version, _id) = file_name - .strip_suffix(".metadata.json") - .ok_or(Error::new( - ErrorKind::Unexpected, - format!("Invalid metadata file ending: {}", file_name), - ))? - .split_once('-') - .ok_or(Error::new( - ErrorKind::Unexpected, - format!("Invalid metadata file name: {}", file_name), - ))?; - - let new_version = version.parse::()? + 1; - let new_id = Uuid::new_v4(); - - Ok(format!( - "{}/metadata/{}-{}.metadata.json", - prefix, new_version, new_id - )) -} - #[cfg(test)] mod tests { use std::collections::HashSet; diff --git a/crates/catalog/memory/src/namespace_state.rs b/crates/catalog/memory/src/namespace_state.rs index a90aa850a..eaa0b852f 100644 --- a/crates/catalog/memory/src/namespace_state.rs +++ b/crates/catalog/memory/src/namespace_state.rs @@ -16,9 +16,12 @@ // under the License. use std::collections::{hash_map, HashMap}; +use std::fmt::Display; +use std::str::FromStr; use iceberg::{Error, ErrorKind, NamespaceIdent, Result, TableIdent}; use itertools::Itertools; +use uuid::Uuid; // Represents the state of a namespace #[derive(Debug, Clone, Default)] @@ -28,7 +31,7 @@ pub(crate) struct NamespaceState { // Namespaces nested inside this namespace namespaces: HashMap, // Mapping of tables to metadata locations in this namespace - table_metadata_locations: HashMap, + table_metadata_locations: HashMap, } fn no_such_namespace_err(namespace_ident: &NamespaceIdent) -> Result { @@ -253,7 +256,10 @@ impl NamespaceState { } // Returns the metadata location of the given table or an error if doesn't exist - pub(crate) fn get_existing_table_location(&self, table_ident: &TableIdent) -> Result<&String> { + pub(crate) fn get_existing_table_location( + &self, + table_ident: &TableIdent, + ) -> Result<&MetadataLocation> { let namespace = self.get_namespace(table_ident.namespace())?; match namespace.table_metadata_locations.get(table_ident.name()) { @@ -266,7 +272,7 @@ impl NamespaceState { pub(crate) fn insert_new_table( &mut self, table_ident: &TableIdent, - metadata_location: String, + location: MetadataLocation, ) -> Result<()> { let namespace = self.get_mut_namespace(table_ident.namespace())?; @@ -276,7 +282,7 @@ impl NamespaceState { { hash_map::Entry::Occupied(_) => table_already_exists_err(table_ident), hash_map::Entry::Vacant(entry) => { - let _ = entry.insert(metadata_location); + let _ = entry.insert(location); Ok(()) } @@ -284,7 +290,10 @@ impl NamespaceState { } // Removes the given table or returns an error if doesn't exist - pub(crate) fn remove_existing_table(&mut self, table_ident: &TableIdent) -> Result { + pub(crate) fn remove_existing_table( + &mut self, + table_ident: &TableIdent, + ) -> Result { let namespace = self.get_mut_namespace(table_ident.namespace())?; match namespace @@ -300,13 +309,13 @@ impl NamespaceState { pub(crate) fn update_table( &mut self, table_ident: &TableIdent, - new_metadata_location: String, + new_location: MetadataLocation, ) -> Result<()> { let namespace = self.get_mut_namespace(table_ident.namespace())?; let _ = namespace .table_metadata_locations - .insert(table_ident.name().to_string(), new_metadata_location) + .insert(table_ident.name().to_string(), new_location) .ok_or(Error::new( ErrorKind::Unexpected, format!("No such table: {:?}", table_ident), @@ -315,3 +324,181 @@ impl NamespaceState { Ok(()) } } + +/// Represents a location of the format: `/metadata/-.metadata.json` +#[derive(Clone, Debug, PartialEq)] +pub(crate) struct MetadataLocation { + prefix: String, + version: i32, + id: Uuid, +} + +impl MetadataLocation { + /// Creates a completely new metadata location starting at version 0. + /// Only used for creating a new table. For updates, see `with_next_version`. + pub(crate) fn new(prefix: &str) -> Self { + Self { + prefix: prefix.to_string(), + version: 0, + id: Uuid::new_v4(), + } + } + + /// Creates a new metadata location for an updated metadata file. + pub(crate) fn with_next_version(&self) -> Self { + Self { + prefix: self.prefix.clone(), + version: self.version + 1, + id: Uuid::new_v4(), + } + } + + fn parse_metadata_path_prefix(path: &str) -> Result { + let prefix = path.strip_suffix("/metadata").ok_or(Error::new( + ErrorKind::Unexpected, + format!( + "Metadata location not under \"/metadata\" subdirectory: {}", + path + ), + ))?; + + Ok(prefix.to_string()) + } + + /// Parses a file name of the format `-.metadata.json`. + fn parse_file_name(file_name: &str) -> Result<(i32, Uuid)> { + let (version, id) = file_name + .strip_suffix(".metadata.json") + .ok_or(Error::new( + ErrorKind::Unexpected, + format!("Invalid metadata file ending: {}", file_name), + ))? + .split_once('-') + .ok_or(Error::new( + ErrorKind::Unexpected, + format!("Invalid metadata file name format: {}", file_name), + ))?; + + Ok((version.parse::()?, Uuid::parse_str(id)?)) + } +} + +impl Display for MetadataLocation { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}/metadata/{}-{}.metadata.json", + self.prefix, self.version, self.id + ) + } +} + +impl FromStr for MetadataLocation { + type Err = Error; + + fn from_str(s: &str) -> Result { + let (path, file_name) = s.rsplit_once('/').ok_or(Error::new( + ErrorKind::Unexpected, + format!("Invalid metadata location: {}", s), + ))?; + + let prefix = Self::parse_metadata_path_prefix(path)?; + let (version, id) = Self::parse_file_name(file_name)?; + + Ok(MetadataLocation { + prefix, + version, + id, + }) + } +} + +#[cfg(test)] +mod test { + use std::str::FromStr; + + use uuid::Uuid; + + use super::MetadataLocation; + + #[test] + fn test_metadata_location_from_string() { + let test_cases = vec![ + // No prefix + ("/metadata/1234567-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json", Ok(MetadataLocation{ + prefix: "".to_string(), + version: 1234567, + id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), + })), + // Some prefix + ("/abc/metadata/1234567-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json", Ok(MetadataLocation{ + prefix: "/abc".to_string(), + version: 1234567, + id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), + })), + // Longer prefix + ("/abc/def/metadata/1234567-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json", Ok(MetadataLocation{ + prefix: "/abc/def".to_string(), + version: 1234567, + id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), + })), + // Prefix with special characters + ("https://127.0.0.1/metadata/1234567-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json", Ok(MetadataLocation{ + prefix: "https://127.0.0.1".to_string(), + version: 1234567, + id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), + })), + // Another id + ("/abc/metadata/1234567-81056704-ce5b-41c4-bb83-eb6408081af6.metadata.json", Ok(MetadataLocation{ + prefix: "/abc".to_string(), + version: 1234567, + id: Uuid::from_str("81056704-ce5b-41c4-bb83-eb6408081af6").unwrap(), + })), + // Version 0 + ("/abc/metadata/0-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json", Ok(MetadataLocation{ + prefix: "/abc".to_string(), + version: 0, + id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), + })), + // Negative version + ("/metadata/-123-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json", Err("".to_string())), + // Invalid uuid + ("/metadata/1234567-no-valid-id.metadata.json", Err("".to_string())), + // Non-numeric version + ("/metadata/noversion-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json", Err("".to_string())), + // No /metadata subdirectory + ("/wrongsubdir/1234567-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json", Err("".to_string())), + // No .metadata.json suffix + ("/metadata/1234567-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata", Err("".to_string())), + ("/metadata/1234567-2cd22b57-5127-4198-92ba-e4e67c79821b.wrong.file", Err("".to_string())), + ]; + + for (input, expected) in test_cases { + match MetadataLocation::from_str(input) { + Ok(metadata_location) => { + assert!(expected.is_ok()); + assert_eq!(metadata_location, expected.unwrap()); + } + Err(_) => assert!(expected.is_err()), + } + } + } + + #[test] + fn test_metadata_location_with_next_version() { + let test_cases = vec![ + MetadataLocation::new("/abc"), + MetadataLocation::from_str( + "/abc/def/metadata/1234567-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json", + ) + .unwrap(), + ]; + + for input in test_cases { + let next = input.with_next_version(); + assert_eq!(next.prefix, input.prefix); + assert_eq!(next.version, input.version + 1); + assert_ne!(next.id, input.id); + } + } +} From e43954a8d4eaa557362afc81ddd2885ce4d0b6da Mon Sep 17 00:00:00 2001 From: DerGut Date: Sun, 4 May 2025 13:07:40 +0200 Subject: [PATCH 5/9] Refactor update functions Signed-off-by: DerGut --- crates/catalog/memory/src/catalog.rs | 117 +++++++++++++------ crates/catalog/memory/src/namespace_state.rs | 19 ++- 2 files changed, 99 insertions(+), 37 deletions(-) diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index d5342a2d7..273cfa455 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -27,7 +27,7 @@ use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, - TableIdent, TableUpdate, + TableIdent, TableRequirement, TableUpdate, }; use itertools::Itertools; @@ -55,7 +55,7 @@ impl MemoryCatalog { } /// Loads a table from the locked namespace state. - async fn load_table_from_locked_namespace_state( + async fn load_table_from_locked_state( &self, table_ident: &TableIdent, root_namespace_state: &MutexGuard<'_, NamespaceState>, @@ -71,24 +71,51 @@ impl MemoryCatalog { .build() } - async fn update_table( + async fn update_table_in_locked_state( &self, - table: &Table, + mut commit: TableCommit, + locked_state: &MutexGuard<'_, NamespaceState>, + ) -> Result<(Table, MetadataLocation)> { + let current_table = self + .load_table_from_locked_state(commit.identifier(), locked_state) + .await?; + + // Checks whether the commit's expectations are met by the current table state. + let location = + check_current_table_state(¤t_table, commit.take_requirements()).await?; + + self.apply_table_updates_and_write_metadata( + ¤t_table, + &location, + commit.take_updates(), + ) + .await + } + + async fn apply_table_updates_and_write_metadata( + &self, + current_table: &Table, + current_location: &MetadataLocation, updates: Vec, ) -> Result<(Table, MetadataLocation)> { - let (new_metadata, new_metadata_location) = apply_table_updates(table, updates)?; + let new_location = current_location.with_next_version(); - self.write_metadata(&new_metadata, &new_metadata_location) - .await?; + // Build the new table metadata. + let new_metadata = + apply_table_updates(current_table, current_location, &new_location, updates)?; + // Write the updated metadata to it's new location. + self.write_metadata(&new_metadata, &new_location).await?; + + // Return a table representing the updated version. let new_table = Table::builder() - .identifier(table.identifier().clone()) + .identifier(current_table.identifier().clone()) .metadata(new_metadata) - .metadata_location(new_metadata_location.to_string()) + .metadata_location(new_location.to_string()) .file_io(self.file_io.clone()) .build()?; - Ok((new_table, new_metadata_location)) + Ok((new_table, new_location)) } async fn read_metadata(&self, location: &MetadataLocation) -> Result { @@ -271,7 +298,7 @@ impl Catalog for MemoryCatalog { async fn load_table(&self, table_ident: &TableIdent) -> Result
{ let root_namespace_state = self.root_namespace_state.lock().await; - self.load_table_from_locked_namespace_state(table_ident, &root_namespace_state) + self.load_table_from_locked_state(table_ident, &root_namespace_state) .await } @@ -310,48 +337,70 @@ impl Catalog for MemoryCatalog { } /// Update a table in the catalog. - async fn update_table(&self, mut commit: TableCommit) -> Result
{ - let mut root_namespace_state = self.root_namespace_state.lock().await; - - let current_table = self - .load_table_from_locked_namespace_state(commit.identifier(), &root_namespace_state) - .await?; - - for requirement in commit.take_requirements() { - requirement.check(Some(current_table.metadata()))?; - } + async fn update_table(&self, commit: TableCommit) -> Result
{ + let mut locked_namespace_state = self.root_namespace_state.lock().await; + // Updates the current table version and writes a new metadata file. let (updated_table, new_metadata_location) = self - .update_table(¤t_table, commit.take_updates()) + .update_table_in_locked_state(commit, &locked_namespace_state) .await?; - root_namespace_state.update_table(updated_table.identifier(), new_metadata_location)?; + // Flip the pointer to reference the new metadata file. + locked_namespace_state + .commit_table_update(updated_table.identifier(), new_metadata_location)?; Ok(updated_table) } } +/// Verifies that the a TableCommit's requirements are met by the current table state. +/// If not, there's a conflict and the client should retry the commit. +async fn check_current_table_state( + current_table: &Table, + requirements: Vec, +) -> Result { + let location = + MetadataLocation::from_str(current_table.metadata_location().ok_or(Error::new( + ErrorKind::DataInvalid, + format!( + "Table metadata location is not set: {}", + current_table.identifier() + ), + ))?)?; + + // Check that the commit's point of view is still reflected by the current state of the table. + for requirement in requirements { + requirement + .check(Some(current_table.metadata())) + .map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Conflict: One or more requirements failed, the client my retry", + ) + .with_source(e) + })?; + } + + Ok(location) +} + fn apply_table_updates( table: &Table, + current_location: &MetadataLocation, + new_location: &MetadataLocation, updates: Vec, -) -> Result<(TableMetadata, MetadataLocation)> { - let metadata_location = table.metadata_location().ok_or(Error::new( - ErrorKind::DataInvalid, - format!("Table metadata location is not set: {}", table.identifier()), - ))?; - +) -> Result { let mut builder = TableMetadataBuilder::new_from_metadata( table.metadata().clone(), - Some(metadata_location.to_string()), - ); + Some(current_location.to_string()), + ) + .set_location(new_location.to_string()); for update in updates { builder = update.apply(builder)?; } - let new_metadata_location = MetadataLocation::from_str(metadata_location)?.with_next_version(); - - Ok((builder.build()?.metadata, new_metadata_location)) + Ok(builder.build()?.metadata) } #[cfg(test)] diff --git a/crates/catalog/memory/src/namespace_state.rs b/crates/catalog/memory/src/namespace_state.rs index eaa0b852f..de008e9f5 100644 --- a/crates/catalog/memory/src/namespace_state.rs +++ b/crates/catalog/memory/src/namespace_state.rs @@ -306,7 +306,7 @@ impl NamespaceState { } /// Updates the metadata location of the given table or returns an error if doesn't exist - pub(crate) fn update_table( + pub(crate) fn commit_table_update( &mut self, table_ident: &TableIdent, new_location: MetadataLocation, @@ -317,7 +317,7 @@ impl NamespaceState { .table_metadata_locations .insert(table_ident.name().to_string(), new_location) .ok_or(Error::new( - ErrorKind::Unexpected, + ErrorKind::TableNotFound, format!("No such table: {:?}", table_ident), ))?; @@ -379,7 +379,20 @@ impl MetadataLocation { format!("Invalid metadata file name format: {}", file_name), ))?; - Ok((version.parse::()?, Uuid::parse_str(id)?)) + let version = version.parse::().map_err(|_| { + Error::new( + ErrorKind::Unexpected, + format!("Metadata version not a number: {}", version), + ) + })?; + if version < 0 { + return Err(Error::new( + ErrorKind::Unexpected, + format!("Negative metadata version: {}", version), + )); + } + + Ok((version, Uuid::parse_str(id)?)) } } From b06d8880fa2d3dc45b310de5cd59d974c685b63f Mon Sep 17 00:00:00 2001 From: DerGut Date: Sun, 4 May 2025 13:09:28 +0200 Subject: [PATCH 6/9] Add missing table test and some assertions Signed-off-by: DerGut --- crates/catalog/memory/src/catalog.rs | 57 +++++++++++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index 273cfa455..92bf35498 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -411,7 +411,9 @@ mod tests { use std::vec; use iceberg::io::FileIOBuilder; - use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; + use iceberg::spec::{ + NestedField, NullOrder, PartitionSpec, PrimitiveType, Schema, SortOrder, Type, + }; use iceberg::transaction::Transaction; use regex::Regex; use tempfile::TempDir; @@ -1829,5 +1831,58 @@ mod tests { updated_table.metadata().properties().get("key").unwrap(), "value" ); + + assert_eq!(table.identifier(), updated_table.identifier()); + assert_eq!(table.metadata().uuid(), updated_table.metadata().uuid()); + assert!(table.metadata().last_updated_ms() < updated_table.metadata().last_updated_ms()); + assert_ne!(table.metadata_location(), updated_table.metadata_location()); + assert!( + table.metadata().metadata_log().len() < updated_table.metadata().metadata_log().len() + ); + } + + #[tokio::test] + async fn test_update_table_fails_if_table_doesnt_exist() { + let catalog = new_memory_catalog(); + + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + let table_ident = TableIdent::new(namespace_ident, "test".to_string()); + + // This table is not known to the catalog. + let table = build_table(table_ident); + + let err = Transaction::new(&table) + .set_properties(HashMap::from([("key".to_string(), "value".to_string())])) + .unwrap() + .commit(&catalog) + .await + .unwrap_err(); + assert_eq!(err.kind(), ErrorKind::TableNotFound); + } + + fn build_table(ident: TableIdent) -> Table { + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + + let temp_dir = TempDir::new().unwrap(); + let location = temp_dir.path().to_str().unwrap().to_string(); + + let table_creation = TableCreation::builder() + .name(ident.name().to_string()) + .schema(simple_table_schema()) + .location(location) + .build(); + let metadata = TableMetadataBuilder::from_table_creation(table_creation) + .unwrap() + .build() + .unwrap() + .metadata; + + Table::builder() + .identifier(ident) + .metadata(metadata) + .file_io(file_io) + .build() + .unwrap() } } From 68f7c712faab58edaede0a81839bd37b4cf1f6df Mon Sep 17 00:00:00 2001 From: DerGut Date: Sun, 4 May 2025 13:29:16 +0200 Subject: [PATCH 7/9] Refactor version parsing and update functions Signed-off-by: DerGut --- crates/catalog/memory/src/catalog.rs | 115 ++++++++++--------- crates/catalog/memory/src/namespace_state.rs | 27 +++-- 2 files changed, 73 insertions(+), 69 deletions(-) diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index 92bf35498..8f5a75b0f 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -82,7 +82,7 @@ impl MemoryCatalog { // Checks whether the commit's expectations are met by the current table state. let location = - check_current_table_state(¤t_table, commit.take_requirements()).await?; + Self::check_current_table_state(¤t_table, commit.take_requirements()).await?; self.apply_table_updates_and_write_metadata( ¤t_table, @@ -102,7 +102,7 @@ impl MemoryCatalog { // Build the new table metadata. let new_metadata = - apply_table_updates(current_table, current_location, &new_location, updates)?; + Self::apply_table_updates(current_table, current_location, &new_location, updates)?; // Write the updated metadata to it's new location. self.write_metadata(&new_metadata, &new_location).await?; @@ -136,6 +136,56 @@ impl MemoryCatalog { .write(serde_json::to_vec(metadata)?.into()) .await } + + /// Verifies that the a TableCommit's requirements are met by the current table state. + /// If not, there's a conflict and the client should retry the commit. + async fn check_current_table_state( + current_table: &Table, + requirements: Vec, + ) -> Result { + let location = + MetadataLocation::from_str(current_table.metadata_location().ok_or(Error::new( + ErrorKind::DataInvalid, + format!( + "Table metadata location is not set: {}", + current_table.identifier() + ), + ))?)?; + + // Check that the commit's point of view is still reflected by the current state of the table. + for requirement in requirements { + requirement + .check(Some(current_table.metadata())) + .map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Conflict: One or more requirements failed, the client my retry", + ) + .with_source(e) + })?; + } + + Ok(location) + } + + fn apply_table_updates( + table: &Table, + current_location: &MetadataLocation, + new_location: &MetadataLocation, + updates: Vec, + ) -> Result { + let mut builder = TableMetadataBuilder::new_from_metadata( + table.metadata().clone(), + Some(current_location.to_string()), + ) + .set_location(new_location.to_string()); + + for update in updates { + builder = update.apply(builder)?; + } + + Ok(builder.build()?.metadata) + } } #[async_trait] @@ -341,66 +391,19 @@ impl Catalog for MemoryCatalog { let mut locked_namespace_state = self.root_namespace_state.lock().await; // Updates the current table version and writes a new metadata file. - let (updated_table, new_metadata_location) = self + let (staged_updated_table, new_metadata_location) = self .update_table_in_locked_state(commit, &locked_namespace_state) .await?; // Flip the pointer to reference the new metadata file. locked_namespace_state - .commit_table_update(updated_table.identifier(), new_metadata_location)?; + .commit_table_update(staged_updated_table.identifier(), new_metadata_location)?; - Ok(updated_table) - } -} + // After the update is committed, the table is now the current version. + let updated_table = staged_updated_table; -/// Verifies that the a TableCommit's requirements are met by the current table state. -/// If not, there's a conflict and the client should retry the commit. -async fn check_current_table_state( - current_table: &Table, - requirements: Vec, -) -> Result { - let location = - MetadataLocation::from_str(current_table.metadata_location().ok_or(Error::new( - ErrorKind::DataInvalid, - format!( - "Table metadata location is not set: {}", - current_table.identifier() - ), - ))?)?; - - // Check that the commit's point of view is still reflected by the current state of the table. - for requirement in requirements { - requirement - .check(Some(current_table.metadata())) - .map_err(|e| { - Error::new( - ErrorKind::Unexpected, - "Conflict: One or more requirements failed, the client my retry", - ) - .with_source(e) - })?; - } - - Ok(location) -} - -fn apply_table_updates( - table: &Table, - current_location: &MetadataLocation, - new_location: &MetadataLocation, - updates: Vec, -) -> Result { - let mut builder = TableMetadataBuilder::new_from_metadata( - table.metadata().clone(), - Some(current_location.to_string()), - ) - .set_location(new_location.to_string()); - - for update in updates { - builder = update.apply(builder)?; + Ok(updated_table) } - - Ok(builder.build()?.metadata) } #[cfg(test)] @@ -411,9 +414,7 @@ mod tests { use std::vec; use iceberg::io::FileIOBuilder; - use iceberg::spec::{ - NestedField, NullOrder, PartitionSpec, PrimitiveType, Schema, SortOrder, Type, - }; + use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; use iceberg::transaction::Transaction; use regex::Regex; use tempfile::TempDir; diff --git a/crates/catalog/memory/src/namespace_state.rs b/crates/catalog/memory/src/namespace_state.rs index de008e9f5..c5eab995a 100644 --- a/crates/catalog/memory/src/namespace_state.rs +++ b/crates/catalog/memory/src/namespace_state.rs @@ -379,18 +379,21 @@ impl MetadataLocation { format!("Invalid metadata file name format: {}", file_name), ))?; - let version = version.parse::().map_err(|_| { - Error::new( - ErrorKind::Unexpected, - format!("Metadata version not a number: {}", version), - ) - })?; - if version < 0 { - return Err(Error::new( - ErrorKind::Unexpected, - format!("Negative metadata version: {}", version), - )); - } + let version = version + .parse::() + .map_err(|e| { + Error::new(ErrorKind::Unexpected, "Metadata version not a number").with_source(e) + }) + .and_then(|v| { + if v < 0 { + Err(Error::new( + ErrorKind::Unexpected, + format!("Negative metadata version: {}", version), + )) + } else { + Ok(v) + } + })?; Ok((version, Uuid::parse_str(id)?)) } From 0cd81c119319a87567a5de8241e295c73700ede1 Mon Sep 17 00:00:00 2001 From: DerGut Date: Sun, 4 May 2025 15:06:47 +0200 Subject: [PATCH 8/9] Add test for conflicting commits Signed-off-by: DerGut --- crates/catalog/memory/src/catalog.rs | 52 ++++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 6 deletions(-) diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index 8f5a75b0f..f014e9002 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -414,7 +414,9 @@ mod tests { use std::vec; use iceberg::io::FileIOBuilder; - use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; + use iceberg::spec::{ + NestedField, NullOrder, PartitionSpec, PrimitiveType, Schema, SortOrder, Type, + }; use iceberg::transaction::Transaction; use regex::Regex; use tempfile::TempDir; @@ -480,6 +482,14 @@ mod tests { } } + async fn create_table_with_namespace(catalog: &C) -> Table { + let namespace_ident = NamespaceIdent::new("abc".into()); + create_namespace(catalog, &namespace_ident).await; + + let table_ident = TableIdent::new(namespace_ident, "test".to_string()); + create_table(catalog, &table_ident).await + } + fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) { assert_eq!(table.identifier(), expected_table_ident); @@ -1812,10 +1822,7 @@ mod tests { async fn test_update_table() { let catalog = new_memory_catalog(); - let namespace_ident = NamespaceIdent::new("a".into()); - create_namespace(&catalog, &namespace_ident).await; - let table_ident = TableIdent::new(namespace_ident, "test".to_string()); - let table = create_table(&catalog, &table_ident).await; + let table = create_table_with_namespace(&catalog).await; // Assert the table doesn't contain the update yet assert!(!table.metadata().properties().contains_key("key")); @@ -1842,15 +1849,48 @@ mod tests { ); } + #[tokio::test] + async fn test_update_table_fails_if_commit_conflicts() { + let catalog = new_memory_catalog(); + let base_table = create_table_with_namespace(&catalog).await; + + // Update the table by adding a new sort order. + let _sorted_table = Transaction::new(&base_table) + .replace_sort_order() + .asc("foo", NullOrder::First) + .unwrap() + .apply() + .unwrap() + .commit(&catalog) + .await + .unwrap(); + + // Try to update the -now old- table again with a different sort order. + let err = Transaction::new(&base_table) + .replace_sort_order() + .desc("foo", NullOrder::Last) + .unwrap() + .apply() + .unwrap() + .commit(&catalog) + .await + .unwrap_err(); + + // The second transaction should fail because it didn't take the new update + // into account. + assert_eq!(err.kind(), ErrorKind::Unexpected); + assert!(err.message().to_lowercase().contains("conflict")); + } + #[tokio::test] async fn test_update_table_fails_if_table_doesnt_exist() { let catalog = new_memory_catalog(); let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; - let table_ident = TableIdent::new(namespace_ident, "test".to_string()); // This table is not known to the catalog. + let table_ident = TableIdent::new(namespace_ident, "test".to_string()); let table = build_table(table_ident); let err = Transaction::new(&table) From d20bdc1476a849d5273572f326904ea84db2586e Mon Sep 17 00:00:00 2001 From: DerGut Date: Sun, 4 May 2025 15:07:15 +0200 Subject: [PATCH 9/9] Fix formatting of test assert Signed-off-by: DerGut --- crates/catalog/memory/src/catalog.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index f014e9002..4a45c13ba 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -1812,7 +1812,7 @@ mod tests { .unwrap_err() .to_string(), format!( - "TableAlreadyExists => Cannot create table {:? }. Table already exists.", + "TableAlreadyExists => Cannot create table {:?}. Table already exists.", &dst_table_ident ), );