From e2b7a6d43fdac7751ee669ed50d9f51e84b2d0b7 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Wed, 30 Apr 2025 16:54:54 +0800 Subject: [PATCH 01/24] feat:Add hadoop catalog mode --- Cargo.toml | 1 + crates/catalog/hadoop/Cargo.toml | 43 ++ crates/catalog/hadoop/src/catalog.rs | 736 +++++++++++++++++++++++++++ crates/catalog/hadoop/src/lib.rs | 25 + crates/catalog/hadoop/src/utils.rs | 140 +++++ 5 files changed, 945 insertions(+) create mode 100644 crates/catalog/hadoop/Cargo.toml create mode 100644 crates/catalog/hadoop/src/catalog.rs create mode 100644 crates/catalog/hadoop/src/lib.rs create mode 100644 crates/catalog/hadoop/src/utils.rs diff --git a/Cargo.toml b/Cargo.toml index 185b5fc01..b39e5a408 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,6 +75,7 @@ http = "1.1" iceberg = { version = "0.4.0", path = "./crates/iceberg" } iceberg-catalog-memory = { version = "0.4.0", path = "./crates/catalog/memory" } iceberg-catalog-rest = { version = "0.4.0", path = "./crates/catalog/rest" } +iceberg-catalog-hadoop = { version = "0.4.0", path = "./crates/catalog/hadoop" } iceberg-datafusion = { version = "0.4.0", path = "./crates/integrations/datafusion" } indicatif = "0.17" itertools = "0.13" diff --git a/crates/catalog/hadoop/Cargo.toml b/crates/catalog/hadoop/Cargo.toml new file mode 100644 index 000000000..5ed671fa2 --- /dev/null +++ b/crates/catalog/hadoop/Cargo.toml @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +edition = { workspace = true } +homepage = { workspace = true } +name = "iceberg-catalog-hadoop" +rust-version = { workspace = true } +version = { workspace = true } + +categories = ["database"] +description = "Apache Iceberg Rust Hadoop Catalog" +keywords = ["iceberg", "sql", "catalog","s3","hdfs"] +license = { workspace = true } +repository = { workspace = true } + +[dependencies] +anyhow = { workspace = true } +async-trait = { workspace = true } +aws-config = { workspace = true } +iceberg = { workspace = true } +serde_json = { workspace = true } +typed-builder = { workspace = true } +uuid = { workspace = true, features = ["v4"] } +aws-sdk-s3 = "1.16.0" +[dev-dependencies] +iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } +itertools = { workspace = true } +tokio = { workspace = true } diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs new file mode 100644 index 000000000..d31f7b425 --- /dev/null +++ b/crates/catalog/hadoop/src/catalog.rs @@ -0,0 +1,736 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use anyhow::anyhow; +use async_trait::async_trait; +use iceberg::io::{FileIO, FileIOBuilder}; +use iceberg::spec::{TableMetadata, TableMetadataBuilder}; +use iceberg::table::Table; +use iceberg::{ + Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, + TableIdent, +}; +use typed_builder::TypedBuilder; + +use crate::utils::{ + create_metadata_location, create_sdk_config, get_default_table_location, valid_s3_namespaces, +}; + +/// Hadoop catalog configuration. +#[derive(Debug, TypedBuilder)] +pub struct HadoopCatalogConfig { + /// Properties for the catalog. The available properties are: + /// when using s3 filesystem + /// - `profile_name`: The name of the AWS profile to use. + /// - `region_name`: The AWS region to use. + /// - `aws_access_key_id`: The AWS access key ID to use. + /// - `aws_secret_access_key`: The AWS secret access key to use. + /// - `aws_session_token`: The AWS session token to use. + /// when using hdfs filesystem (like the properties in hdfs-site.xml) + #[builder(default)] + properties: HashMap, + /// Endpoint URL for the catalog. + #[builder(default, setter(strip_option(fallback = endpoint_url_opt)))] + endpoint_url: Option, + /// warehouse for the catalog. + /// This is the root directory for the catalog. + /// when using s3 filesystem like s3:///. + /// when using hdfs filesystem like hdfs://:/. + #[builder(default, setter(strip_option(fallback = warehouse_opt)))] + warehouse: Option, +} + +/// Hadoop catalog implementation. +#[derive(Debug)] +pub struct HadoopCatalog { + config: HadoopCatalogConfig, + file_io: FileIO, + s3_client: Option, +} + +impl HadoopCatalog { + /// Creates a new Hadoop catalog. + pub async fn new(config: HadoopCatalogConfig) -> Result { + let mut s3_client: Option = None; + if let Some(warehouse_url) = &config.warehouse { + let file_io = FileIO::from_path(&warehouse_url)? + .with_props(&config.properties) + .build()?; + if warehouse_url.starts_with("s3://") { + let aws_config = + create_sdk_config(&config.properties, config.endpoint_url.clone()).await; + s3_client = Some(aws_sdk_s3::Client::new(&aws_config)); + } else if warehouse_url.starts_with("hdfs://") { + } + + Ok(Self { + config: config, + file_io, + s3_client: s3_client, + }) + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "warehouse_url is required", + )); + } + } +} + +#[async_trait] +impl Catalog for HadoopCatalog { + /// List namespaces from s3tables catalog. + /// + /// S3Tables doesn't support nested namespaces. If parent is provided, it will + /// return an empty list. + async fn list_namespaces( + &self, + parent: Option<&NamespaceIdent>, + ) -> Result> { + if parent.is_some() { + return Ok(vec![]); + } + let mut result = Vec::new(); + if self.s3_client.is_some() { + let s3_client = self.s3_client.as_ref().unwrap(); + match self.config.warehouse.clone() { + Some(warehouse_url) => { + let bucket = warehouse_url.split("/").nth(2).unwrap_or(""); + let prefix = warehouse_url + .split("/") + .skip(3) + .collect::>() + .join("/"); + let list = s3_client + .list_objects_v2() + .bucket(bucket) + .prefix(prefix) + .send() + .await + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to list objects: {}", e), + ) + })?; + for object in list.contents.unwrap_or_default() { + let key = object.key.unwrap_or_default(); + if key.ends_with("/") { + let namespace = key.split("/").nth(3).unwrap_or(""); + result.push(NamespaceIdent::new(namespace.to_string())); + } + } + } + None => { + return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); + } + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "s3 client is not initialized", + )); + } + + return Ok(result); + } + + /// Creates a new namespace with the given identifier and properties. + /// + /// Attempts to create a namespace defined by the `namespace`. The `properties` + /// parameter is ignored. + /// + /// The following naming rules apply to namespaces: + /// + /// - Names must be between 3 (min) and 63 (max) characters long. + /// - Names can consist only of lowercase letters, numbers, and underscores (_). + /// - Names must begin and end with a letter or number. + /// - Names must not contain hyphens (-) or periods (.). + /// + /// This function can return an error in the following situations: + /// + /// - Errors from the underlying database creation process, converted using + /// `from_aws_sdk_error`. + async fn create_namespace( + &self, + namespace: &NamespaceIdent, + _properties: HashMap, + ) -> Result { + if self.s3_client.is_some() { + if valid_s3_namespaces(&namespace).is_err() { + return Err(Error::new(ErrorKind::DataInvalid, "Invalid namespace name")); + } + let s3_client = self.s3_client.as_ref().unwrap(); + match self.config.warehouse.clone() { + Some(warehouse_url) => { + let bucket = warehouse_url.split("/").nth(2).unwrap_or(""); + let prefix = format!( + "{}/{}", + &warehouse_url + .split("/") + .skip(3) + .collect::>() + .join("/"), + namespace.join("/") + ); + s3_client + .put_object() + .bucket(bucket) + .key(prefix) + .send() + .await + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to create namespace: {}", e), + ) + })?; + } + None => { + return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); + } + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "s3 client is not initialized", + )); + } + + Ok(Namespace::new(namespace.clone())) + } + + /// Retrieves a namespace by its identifier. + /// + /// Validates the given namespace identifier and then queries the + /// underlying database client to fetch the corresponding namespace data. + /// Constructs a `Namespace` object with the retrieved data and returns it. + /// + /// This function can return an error in any of the following situations: + /// - If there is an error querying the database, returned by + /// `from_aws_sdk_error`. + async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result { + if self.s3_client.is_some() { + if valid_s3_namespaces(&namespace).is_err() { + return Err(Error::new(ErrorKind::DataInvalid, "Invalid namespace name")); + } + let s3_client = self.s3_client.as_ref().unwrap(); + match self.config.warehouse.clone() { + Some(warehouse_url) => { + let bucket = warehouse_url.split("/").nth(2).unwrap_or(""); + let prefix = format!( + "{}/{}", + &warehouse_url + .split("/") + .skip(3) + .collect::>() + .join("/"), + namespace.join("/") + ); + s3_client + .get_object() + .bucket(bucket) + .key(prefix) + .send() + .await + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to get namespace: {}", e), + ) + })?; + } + None => { + return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); + } + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "s3 client is not initialized", + )); + } + + Ok(Namespace::new(namespace.clone())) + } + + /// Checks if a namespace exists within the s3tables catalog. + /// + /// Validates the namespace identifier by querying the s3tables catalog + /// to determine if the specified namespace exists. + /// + /// # Returns + /// A `Result` indicating the outcome of the check: + /// - `Ok(true)` if the namespace exists. + /// - `Ok(false)` if the namespace does not exist, identified by a specific + /// `IsNotFoundException` variant. + /// - `Err(...)` if an error occurs during validation or the s3tables catalog + /// query, with the error encapsulating the issue. + async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result { + if self.s3_client.is_some() { + if valid_s3_namespaces(&namespace).is_err() { + return Err(Error::new(ErrorKind::DataInvalid, "Invalid namespace name")); + } + let s3_client = self.s3_client.as_ref().unwrap(); + match self.config.warehouse.clone() { + Some(warehouse_url) => { + let bucket = warehouse_url.split("/").nth(2).unwrap_or(""); + let prefix = format!( + "{}/{}", + &warehouse_url + .split("/") + .skip(3) + .collect::>() + .join("/"), + namespace.join("/") + ); + s3_client + .head_object() + .bucket(bucket) + .key(prefix) + .send() + .await + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to check namespace: {}", e), + ) + })?; + } + None => { + return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); + } + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "s3 client is not initialized", + )); + } + + Ok(true) + } + + /// Updates the properties of an existing namespace. + /// + /// S3Tables doesn't support updating namespace properties, so this function + /// will always return an error. + async fn update_namespace( + &self, + _namespace: &NamespaceIdent, + _properties: HashMap, + ) -> Result<()> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Update namespace is not supported for hadoop catalog", + )) + } + + /// Drops an existing namespace from the s3tables catalog. + /// + /// Validates the namespace identifier and then deletes the corresponding + /// namespace from the s3tables catalog. + /// + /// This function can return an error in the following situations: + /// - Errors from the underlying database deletion process, converted using + /// `from_aws_sdk_error`. + async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> { + if self.s3_client.is_some() { + let s3_client = self.s3_client.as_ref().unwrap(); + match self.config.warehouse.clone() { + Some(warehouse_url) => { + let bucket = warehouse_url.split("/").nth(2).unwrap_or(""); + let prefix = format!( + "{}/{}", + &warehouse_url + .split("/") + .skip(3) + .collect::>() + .join("/"), + namespace.join("/") + ); + s3_client + .delete_object() + .bucket(bucket) + .key(prefix) + .send() + .await + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to delete namespace: {}", e), + ) + })?; + } + None => { + return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); + } + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "s3 client is not initialized", + )); + } + + Ok(()) + } + + /// Lists all tables within a given namespace. + /// + /// Retrieves all tables associated with the specified namespace and returns + /// their identifiers. + /// + /// This function can return an error in the following situations: + /// - Errors from the underlying database query process, converted using + /// `from_aws_sdk_error`. + async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { + if self.s3_client.is_some() { + let s3_client = self.s3_client.as_ref().unwrap(); + match self.config.warehouse.clone() { + Some(warehouse_url) => { + let bucket = warehouse_url.split("/").nth(2).unwrap_or(""); + let prefix = format!( + "{}/{}", + &warehouse_url + .split("/") + .skip(3) + .collect::>() + .join("/"), + namespace.join("/") + ); + let list = s3_client + .list_objects_v2() + .bucket(bucket) + .prefix(prefix) + .send() + .await + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to list tables: {}", e), + ) + })?; + let mut result = Vec::new(); + for object in list.contents.unwrap_or_default() { + let key = object.key.unwrap_or_default(); + if key.ends_with("/") { + let table_name = key.split("/").nth(4).unwrap_or(""); + + let table_version_hint_path = format!( + "{}/{}/{}/metadata/version-hint.text", + &warehouse_url + .split("/") + .skip(3) + .collect::>() + .join("/"), + namespace.join("/"), + &table_name + ); + + let table_version_hint = s3_client + .get_object() + .bucket(bucket) + .key(table_version_hint_path) + .send() + .await + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to get table version hint: {}", e), + ) + }); + if table_version_hint.is_ok() { + result.push(TableIdent::new( + namespace.clone(), + table_name.to_string(), + )); + } + } + } + return Ok(result); + } + None => { + return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); + } + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "s3 client is not initialized", + )); + } + + Ok(vec![]) + } + + /// Creates a new table within a specified namespace. + /// + /// Attempts to create a table defined by the `creation` parameter. The metadata + /// location is generated by the s3tables catalog, looks like: + /// + /// s3://{RANDOM WAREHOUSE LOCATION}/metadata/{VERSION}-{UUID}.metadata.json + /// + /// We have to get this random warehouse location after the table is created. + /// + /// This function can return an error in the following situations: + /// - If the location of the table is set by user, identified by a specific + /// `DataInvalid` variant. + /// - Errors from the underlying database creation process, converted using + /// `from_aws_sdk_error`. + async fn create_table( + &self, + namespace: &NamespaceIdent, + creation: TableCreation, + ) -> Result { + let table_name = creation.name.clone(); + let location = match self.config.warehouse.clone() { + Some(warehouse_url) => { + get_default_table_location(&namespace, &table_name, &warehouse_url) + } + None => { + return Err(Error::new( + ErrorKind::DataInvalid, + "warehouse_url is required", + )); + } + }; + let metadata = TableMetadataBuilder::from_table_creation(creation)? + .build()? + .metadata; + + let metadata_location = create_metadata_location(&location, 0)?; + + self.file_io + .new_output(&metadata_location)? + .write(serde_json::to_vec(&metadata)?.into()) + .await?; + Table::builder() + .file_io(self.file_io.clone()) + .metadata_location(metadata_location) + .metadata(metadata) + .identifier(TableIdent::new(namespace.clone(), table_name)) + .build() + } + + /// Loads an existing table from the s3tables catalog. + /// + /// Retrieves the metadata location of the specified table and constructs a + /// `Table` object with the retrieved metadata. + /// + /// This function can return an error in the following situations: + /// - If the table does not have a metadata location, identified by a specific + /// `Unexpected` variant. + /// - Errors from the underlying database query process, converted using + /// `from_aws_sdk_error`. + async fn load_table(&self, table_ident: &TableIdent) -> Result
{ + if self.s3_client.is_some() { + let s3_client = self.s3_client.as_ref().unwrap(); + match self.config.warehouse.clone() { + Some(warehouse_url) => { + let bucket = warehouse_url.split("/").nth(2).unwrap_or(""); + let table_name = table_ident.name.clone(); + let table_version_hint_path = format!( + "{}/{}/{}/metadata/version-hint.text", + &warehouse_url + .split("/") + .skip(3) + .collect::>() + .join("/"), + table_ident.namespace.join("/"), + &table_name + ); + let table_version_hint_result = s3_client + .get_object() + .bucket(bucket) + .key(table_version_hint_path) + .send() + .await + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to get table version hint: {}", e), + ) + }); + let location = match self.config.warehouse.clone() { + Some(warehouse_url) => { + get_default_table_location(&table_ident.namespace, &table_name, &warehouse_url) + } + None => { + return Err(Error::new( + ErrorKind::DataInvalid, + "warehouse_url is required", + )); + } + }; + + + if table_version_hint_result.is_ok() { + let table_version_hint =String::from_utf8_lossy( table_version_hint_result.unwrap().body().bytes().unwrap_or_default()).to_string(); + let metadata_location = create_metadata_location(&location, table_version_hint.parse().unwrap_or(0))?; + return Ok(Table::builder() + .file_io(self.file_io.clone()) + .metadata_location(metadata_location) + .identifier(table_ident.clone()) + .build()?); + } + } + None => { + return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); + } + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "s3 client is not initialized", + )); + } + + Err(Error::new( + ErrorKind::Unexpected, + "Table does not have a metadata location", + )) + } + + + /// Asynchronously drops a table from the database. + /// + /// # Errors + /// Returns an error if: + /// - The namespace provided in `table` cannot be validated + /// or does not exist. + /// - The underlying database client encounters an error while + /// attempting to drop the table. This includes scenarios where + /// the table does not exist. + /// - Any network or communication error occurs with the database backend. + async fn drop_table(&self, table: &TableIdent) -> Result<()> { + if self.s3_client.is_some() { + let s3_client = self.s3_client.as_ref().unwrap(); + match self.config.warehouse.clone() { + Some(warehouse_url) => { + let bucket = warehouse_url.split("/").nth(2).unwrap_or(""); + let table_name = table.name.clone(); + let table_path = format!( + "{}/{}/{}", + &warehouse_url + .split("/") + .skip(3) + .collect::>() + .join("/"), + table.namespace.join("/"), + &table_name + ); + s3_client + .delete_object() + .bucket(bucket) + .key(table_path) + .send() + .await + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to delete table: {}", e), + ) + })?; + } + None => { + return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); + } + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "s3 client is not initialized", + )); + } + + Ok(()) + } + + /// Asynchronously checks the existence of a specified table + /// in the database. + /// + /// # Returns + /// - `Ok(true)` if the table exists in the database. + /// - `Ok(false)` if the table does not exist in the database. + /// - `Err(...)` if an error occurs during the process + async fn table_exists(&self, table: &TableIdent) -> Result { + if self.s3_client.is_some() { + let s3_client = self.s3_client.as_ref().unwrap(); + match self.config.warehouse.clone() { + Some(warehouse_url) => { + let bucket = warehouse_url.split("/").nth(2).unwrap_or(""); + let table_name = table.name.clone(); + let table_version_hint_path = format!( + "{}/{}/{}/metadata/version-hint.text", + &warehouse_url + .split("/") + .skip(3) + .collect::>() + .join("/"), + table.namespace.join("/"), + &table_name + ); + s3_client + .head_object() + .bucket(bucket) + .key(table_version_hint_path) + .send() + .await + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to check table: {}", e), + ) + })?; + } + None => { + return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); + } + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "s3 client is not initialized", + )); + } + + Ok(true) + } + + + async fn update_table(&self, _commit: TableCommit) -> Result
{ + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Updating a table is not supported yet", + )) + } + /// Asynchronously renames a table within the database + /// or moves it between namespaces (databases). + /// + /// # Returns + /// - `Ok(())` on successful rename or move of the table. + /// - `Err(...)` if an error occurs during the process. + async fn rename_table(&self, _src: &TableIdent,_destt: &TableIdent) -> Result<()> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Updating a table is not supported yet", + )) + } +} diff --git a/crates/catalog/hadoop/src/lib.rs b/crates/catalog/hadoop/src/lib.rs new file mode 100644 index 000000000..a140769d2 --- /dev/null +++ b/crates/catalog/hadoop/src/lib.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Iceberg Hadoop catalog implementation. + +#![deny(missing_docs)] + +mod catalog; +mod utils; + +pub use catalog::*; \ No newline at end of file diff --git a/crates/catalog/hadoop/src/utils.rs b/crates/catalog/hadoop/src/utils.rs new file mode 100644 index 000000000..66b808ea7 --- /dev/null +++ b/crates/catalog/hadoop/src/utils.rs @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use aws_config::{BehaviorVersion, Region, SdkConfig}; +use iceberg::NamespaceIdent; +use iceberg::{Error, ErrorKind, Namespace, Result}; +use uuid::Uuid; + +/// Property aws profile name +pub const AWS_PROFILE_NAME: &str = "profile_name"; +/// Property aws region +pub const AWS_REGION_NAME: &str = "region_name"; +/// Property aws access key +pub const AWS_ACCESS_KEY_ID: &str = "aws_access_key_id"; +/// Property aws secret access key +pub const AWS_SECRET_ACCESS_KEY: &str = "aws_secret_access_key"; +/// Property aws session token +pub const AWS_SESSION_TOKEN: &str = "aws_session_token"; + +/// Creates an aws sdk configuration based on +/// provided properties and an optional endpoint URL. +pub(crate) async fn create_sdk_config( + properties: &HashMap, + endpoint_url: Option, +) -> SdkConfig { + let mut config = aws_config::defaults(BehaviorVersion::latest()); + + if properties.is_empty() { + return config.load().await; + } + + if let Some(endpoint_url) = endpoint_url { + config = config.endpoint_url(endpoint_url); + } + + if let Some(profile_name) = properties.get(AWS_PROFILE_NAME) { + config = config.profile_name(profile_name); + } + + if let Some(region_name) = properties.get(AWS_REGION_NAME) { + let region = Region::new(region_name.clone()); + config = config.region(region); + } + + config.load().await +} + +/// Create metadata location from `location` and `version` +pub(crate) fn create_metadata_location( + warehouse_location: impl AsRef, + version: i32, +) -> Result { + if version < 0 { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Table metadata version: '{}' must be a non-negative integer", + version + ), + )); + }; + + let version = format!("{:0>5}", version); + let id = Uuid::new_v4(); + let metadata_location = format!( + "{}/metadata/{}-{}.metadata.json", + warehouse_location.as_ref(), + version, + id + ); + + Ok(metadata_location) +} + +pub(crate) fn valid_s3_namespaces(namespace: &NamespaceIdent) -> Result { + for name in namespace.iter() { + if name.len() < 3 || name.len() > 63 { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Namespace name must be between 3 and 63 characters long, but got {}", + name.len() + ), + )); + } + if !name + .chars() + .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Namespace name can only contain lowercase letters, numbers, and underscores, but got {}", + &name + ), + )); + } + if name.starts_with('-') || name.ends_with('-') { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Namespace name cannot start or end with a hyphen, but got {}", + &name + ), + )); + } + } + + Ok(true) +} + +/// Get default table location from `Namespace` properties +pub(crate) fn get_default_table_location( + namespace: &NamespaceIdent, + table_name: impl AsRef, + warehouse: impl AsRef, +) -> String { + return format!( + "{}/{}/{}", + warehouse.as_ref(), + namespace.join("/"), + table_name.as_ref() + ); +} From 6a08fd33a217b3f1d253a594e5be388013febaf9 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Tue, 6 May 2025 16:44:29 +0800 Subject: [PATCH 02/24] feat:Add hadoop catalog mode(load s3 table meta) --- crates/catalog/hadoop/Cargo.toml | 5 +- crates/catalog/hadoop/src/catalog.rs | 139 ++++++++++++++++++--------- crates/catalog/hadoop/src/utils.rs | 41 +++++--- crates/iceberg/src/io/storage_s3.rs | 19 ++++ 4 files changed, 145 insertions(+), 59 deletions(-) diff --git a/crates/catalog/hadoop/Cargo.toml b/crates/catalog/hadoop/Cargo.toml index 5ed671fa2..e2b36cfce 100644 --- a/crates/catalog/hadoop/Cargo.toml +++ b/crates/catalog/hadoop/Cargo.toml @@ -36,8 +36,9 @@ iceberg = { workspace = true } serde_json = { workspace = true } typed-builder = { workspace = true } uuid = { workspace = true, features = ["v4"] } -aws-sdk-s3 = "1.16.0" +aws-sdk-s3 = {version="1.84.0",features = ["behavior-version-latest"]} +tokio = { workspace = true } [dev-dependencies] iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } itertools = { workspace = true } -tokio = { workspace = true } + diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index d31f7b425..a7372f053 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -19,13 +19,14 @@ use std::collections::HashMap; use anyhow::anyhow; use async_trait::async_trait; -use iceberg::io::{FileIO, FileIOBuilder}; +use iceberg::io::{FileIO, FileIOBuilder, S3_ENDPOINT}; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, }; +use tokio::io::AsyncReadExt; use typed_builder::TypedBuilder; use crate::utils::{ @@ -69,21 +70,40 @@ impl HadoopCatalog { pub async fn new(config: HadoopCatalogConfig) -> Result { let mut s3_client: Option = None; if let Some(warehouse_url) = &config.warehouse { - let file_io = FileIO::from_path(&warehouse_url)? - .with_props(&config.properties) - .build()?; - if warehouse_url.starts_with("s3://") { - let aws_config = - create_sdk_config(&config.properties, config.endpoint_url.clone()).await; - s3_client = Some(aws_sdk_s3::Client::new(&aws_config)); + if warehouse_url.starts_with("s3://") || warehouse_url.starts_with("s3a://") { + let mut io_props = config.properties.clone(); + if config.endpoint_url.is_some() { + io_props.insert( + S3_ENDPOINT.to_string(), + config.endpoint_url.clone().unwrap_or_default(), + ); + } + let file_io = FileIO::from_path(&warehouse_url)? + .with_props(&io_props) + .build()?; + let aws_config = create_sdk_config(&config.properties, config.endpoint_url.clone()); + s3_client = Some(aws_sdk_s3::Client::from_conf(aws_config)); + return Ok(Self { + config: config, + file_io, + s3_client: s3_client, + }); } else if warehouse_url.starts_with("hdfs://") { + //todo hdfs native client + let file_io = FileIO::from_path(&warehouse_url)? + .with_props(&config.properties) + .build()?; + return Ok(Self { + config: config, + file_io: file_io, + s3_client: None, + }); } - Ok(Self { - config: config, - file_io, - s3_client: s3_client, - }) + return Err(Error::new( + ErrorKind::DataInvalid, + "warehouse_url is not supported", + )); } else { return Err(Error::new( ErrorKind::DataInvalid, @@ -544,6 +564,12 @@ impl Catalog for HadoopCatalog { let s3_client = self.s3_client.as_ref().unwrap(); match self.config.warehouse.clone() { Some(warehouse_url) => { + println!( + "Loading table: {}.{} from warehouse: {}", + table_ident.namespace.join("."), + table_ident.name, + warehouse_url + ); let bucket = warehouse_url.split("/").nth(2).unwrap_or(""); let table_name = table_ident.name.clone(); let table_version_hint_path = format!( @@ -556,39 +582,68 @@ impl Catalog for HadoopCatalog { table_ident.namespace.join("/"), &table_name ); + println!("table_version_hint_path: {}", &table_version_hint_path); let table_version_hint_result = s3_client .get_object() .bucket(bucket) .key(table_version_hint_path) .send() - .await - .map_err(|e| { - Error::new( + .await; + let location = match self.config.warehouse.clone() { + Some(warehouse_url) => get_default_table_location( + &table_ident.namespace, + &table_name, + &warehouse_url, + ), + None => { + return Err(Error::new( + ErrorKind::DataInvalid, + "warehouse_url is required", + )); + } + }; + + println!("location: {}", &location); + + match table_version_hint_result { + Ok(table_version_hint_result_output) => { + let mut buf = Vec::new(); + table_version_hint_result_output + .body + .into_async_read() + .read_to_end(&mut buf) + .await?; + println!("buf: {:?}", &buf); + let table_version_hint = String::from_utf8_lossy(&buf); + println!("table_version_hint: {}", &table_version_hint); + let metadata_location = format!( + "{}/{}/{}/metadata/v{}.metadata.json", + &warehouse_url, + table_ident.namespace.join("/"), + &table_name, + &table_version_hint + ); + println!("metadata_location: {}", &metadata_location); + let metadata_content = + self.file_io.new_input(&metadata_location)?.read().await?; + println!("metadata_content: {:?}", &metadata_content); + let metadata = + serde_json::from_slice::(&metadata_content)?; + + return Ok(Table::builder() + .file_io(self.file_io.clone()) + .metadata_location(metadata_location) + .metadata(metadata) + .identifier(table_ident.clone()) + .build()?); + } + + Err(e) => { + return Err(Error::new( ErrorKind::DataInvalid, format!("Failed to get table version hint: {}", e), - ) - }); - let location = match self.config.warehouse.clone() { - Some(warehouse_url) => { - get_default_table_location(&table_ident.namespace, &table_name, &warehouse_url) - } - None => { - return Err(Error::new( - ErrorKind::DataInvalid, - "warehouse_url is required", - )); - } - }; - - - if table_version_hint_result.is_ok() { - let table_version_hint =String::from_utf8_lossy( table_version_hint_result.unwrap().body().bytes().unwrap_or_default()).to_string(); - let metadata_location = create_metadata_location(&location, table_version_hint.parse().unwrap_or(0))?; - return Ok(Table::builder() - .file_io(self.file_io.clone()) - .metadata_location(metadata_location) - .identifier(table_ident.clone()) - .build()?); + )); + } } } None => { @@ -608,7 +663,6 @@ impl Catalog for HadoopCatalog { )) } - /// Asynchronously drops a table from the database. /// /// # Errors @@ -684,7 +738,7 @@ impl Catalog for HadoopCatalog { .skip(3) .collect::>() .join("/"), - table.namespace.join("/"), + table.namespace.join("/"), &table_name ); s3_client @@ -714,7 +768,6 @@ impl Catalog for HadoopCatalog { Ok(true) } - async fn update_table(&self, _commit: TableCommit) -> Result
{ Err(Error::new( ErrorKind::FeatureUnsupported, @@ -727,7 +780,7 @@ impl Catalog for HadoopCatalog { /// # Returns /// - `Ok(())` on successful rename or move of the table. /// - `Err(...)` if an error occurs during the process. - async fn rename_table(&self, _src: &TableIdent,_destt: &TableIdent) -> Result<()> { + async fn rename_table(&self, _src: &TableIdent, _destt: &TableIdent) -> Result<()> { Err(Error::new( ErrorKind::FeatureUnsupported, "Updating a table is not supported yet", diff --git a/crates/catalog/hadoop/src/utils.rs b/crates/catalog/hadoop/src/utils.rs index 66b808ea7..0e1516276 100644 --- a/crates/catalog/hadoop/src/utils.rs +++ b/crates/catalog/hadoop/src/utils.rs @@ -15,11 +15,10 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; - -use aws_config::{BehaviorVersion, Region, SdkConfig}; +use aws_sdk_s3::{Config, config::{Credentials,BehaviorVersion}}; use iceberg::NamespaceIdent; use iceberg::{Error, ErrorKind, Namespace, Result}; +use std::collections::HashMap; use uuid::Uuid; /// Property aws profile name @@ -32,33 +31,47 @@ pub const AWS_ACCESS_KEY_ID: &str = "aws_access_key_id"; pub const AWS_SECRET_ACCESS_KEY: &str = "aws_secret_access_key"; /// Property aws session token pub const AWS_SESSION_TOKEN: &str = "aws_session_token"; - +/// S3 Path Style Access. +pub const S3_PATH_STYLE_ACCESS: &str = "s3.path-style-access"; /// Creates an aws sdk configuration based on /// provided properties and an optional endpoint URL. -pub(crate) async fn create_sdk_config( +pub(crate) fn create_sdk_config( properties: &HashMap, endpoint_url: Option, -) -> SdkConfig { - let mut config = aws_config::defaults(BehaviorVersion::latest()); - +) -> Config { + let mut config =Config::builder() + .behavior_version(BehaviorVersion::latest()); + if properties.is_empty() { - return config.load().await; + return config.build(); } if let Some(endpoint_url) = endpoint_url { config = config.endpoint_url(endpoint_url); } - if let Some(profile_name) = properties.get(AWS_PROFILE_NAME) { - config = config.profile_name(profile_name); + + if let Some(path_style_access) = properties.get(S3_PATH_STYLE_ACCESS) { + config = config.force_path_style(path_style_access.parse::().unwrap_or(false)); } + if let (Some(access_key), Some(secret_key)) = ( + properties.get(AWS_ACCESS_KEY_ID), + properties.get(AWS_SECRET_ACCESS_KEY), + ) { + let session_token = properties.get(AWS_SESSION_TOKEN).cloned(); + let credentials_provider = + Credentials::new(access_key, secret_key, session_token, None, "properties"); + + config = config.credentials_provider(credentials_provider) + }; + if let Some(region_name) = properties.get(AWS_REGION_NAME) { - let region = Region::new(region_name.clone()); + let region = aws_sdk_s3::config::Region::new(region_name.clone()); config = config.region(region); } - config.load().await + config.build() } /// Create metadata location from `location` and `version` @@ -137,4 +150,4 @@ pub(crate) fn get_default_table_location( namespace.join("/"), table_name.as_ref() ); -} +} \ No newline at end of file diff --git a/crates/iceberg/src/io/storage_s3.rs b/crates/iceberg/src/io/storage_s3.rs index 8396888c4..c849fc3fc 100644 --- a/crates/iceberg/src/io/storage_s3.rs +++ b/crates/iceberg/src/io/storage_s3.rs @@ -66,6 +66,14 @@ pub const S3_DISABLE_EC2_METADATA: &str = "s3.disable-ec2-metadata"; /// Option to skip loading configuration from config file and the env. pub const S3_DISABLE_CONFIG_LOAD: &str = "s3.disable-config-load"; + +/// Property aws region +pub const AWS_REGION_NAME: &str = "region_name"; +/// Property aws access key +pub const AWS_ACCESS_KEY_ID: &str = "aws_access_key_id"; +/// Property aws secret access key +pub const AWS_SECRET_ACCESS_KEY: &str = "aws_secret_access_key"; + /// Parse iceberg props to s3 config. pub(crate) fn s3_config_parse(mut m: HashMap) -> Result { let mut cfg = S3Config::default(); @@ -75,15 +83,24 @@ pub(crate) fn s3_config_parse(mut m: HashMap) -> Result) -> Result Date: Thu, 8 May 2025 11:21:24 +0800 Subject: [PATCH 03/24] feat:Add hadoop catalog mode(s3 list_namepaces) --- crates/catalog/hadoop/src/catalog.rs | 100 +++++++++++++++++++-------- 1 file changed, 72 insertions(+), 28 deletions(-) diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index a7372f053..a75235f3e 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -17,9 +17,9 @@ use std::collections::HashMap; -use anyhow::anyhow; + use async_trait::async_trait; -use iceberg::io::{FileIO, FileIOBuilder, S3_ENDPOINT}; +use iceberg::io::{FileIO, S3_ENDPOINT}; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ @@ -123,24 +123,35 @@ impl Catalog for HadoopCatalog { &self, parent: Option<&NamespaceIdent>, ) -> Result> { - if parent.is_some() { - return Ok(vec![]); - } let mut result = Vec::new(); if self.s3_client.is_some() { let s3_client = self.s3_client.as_ref().unwrap(); match self.config.warehouse.clone() { Some(warehouse_url) => { let bucket = warehouse_url.split("/").nth(2).unwrap_or(""); - let prefix = warehouse_url - .split("/") - .skip(3) - .collect::>() - .join("/"); + let warehouse_prefix= warehouse_url + .split("/") + .skip(3) + .collect::>() + .join("/"); + let mut prefix = format!( + "{}/", + &warehouse_prefix + ); + + if let Some(parent) = parent { + prefix = format!( + "{}/{}/", + &warehouse_prefix, + parent.join("/") + ); + } + let list = s3_client .list_objects_v2() .bucket(bucket) - .prefix(prefix) + .prefix(&prefix) + .delimiter("/") .send() .await .map_err(|e| { @@ -149,11 +160,55 @@ impl Catalog for HadoopCatalog { format!("Failed to list objects: {}", e), ) })?; - for object in list.contents.unwrap_or_default() { - let key = object.key.unwrap_or_default(); - if key.ends_with("/") { - let namespace = key.split("/").nth(3).unwrap_or(""); - result.push(NamespaceIdent::new(namespace.to_string())); + for object in list.common_prefixes.unwrap_or_default() { + let key = object.prefix.unwrap_or_default(); + // Skip the prefix part and check if it's a first level directory + let relative_path = if key.starts_with(&prefix) { + &key[prefix.len()..] + } else { + &key + }; + // Only add namespace if it's a first level directory (has only one /) + + if relative_path.ends_with("/") { + let warehouse_relative_path = if key.starts_with(&warehouse_prefix) { + &key[warehouse_prefix.len()..] + } else { + &key + }; + + let namespaces = warehouse_relative_path.split("/").collect::>(); + + + let table_version_hint_path = format!( + "{}/{}/metadata/version-hint.text", + &warehouse_url + .split("/") + .skip(3) + .collect::>() + .join("/"), + namespaces.join("/"), + ); + + //check It's not table + let table_version_hint = s3_client + .get_object() + .bucket(bucket) + .key(table_version_hint_path) + .send() + .await + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to get table version hint: {}", e), + ) + }); + + + + if !table_version_hint.is_ok() && !namespaces.is_empty() { + result.push(NamespaceIdent::from_vec(namespaces.iter().filter(|e|!e.is_empty()).map(|e|e.to_string()).collect())?); + } } } } @@ -564,12 +619,6 @@ impl Catalog for HadoopCatalog { let s3_client = self.s3_client.as_ref().unwrap(); match self.config.warehouse.clone() { Some(warehouse_url) => { - println!( - "Loading table: {}.{} from warehouse: {}", - table_ident.namespace.join("."), - table_ident.name, - warehouse_url - ); let bucket = warehouse_url.split("/").nth(2).unwrap_or(""); let table_name = table_ident.name.clone(); let table_version_hint_path = format!( @@ -582,7 +631,6 @@ impl Catalog for HadoopCatalog { table_ident.namespace.join("/"), &table_name ); - println!("table_version_hint_path: {}", &table_version_hint_path); let table_version_hint_result = s3_client .get_object() .bucket(bucket) @@ -603,7 +651,6 @@ impl Catalog for HadoopCatalog { } }; - println!("location: {}", &location); match table_version_hint_result { Ok(table_version_hint_result_output) => { @@ -613,9 +660,8 @@ impl Catalog for HadoopCatalog { .into_async_read() .read_to_end(&mut buf) .await?; - println!("buf: {:?}", &buf); let table_version_hint = String::from_utf8_lossy(&buf); - println!("table_version_hint: {}", &table_version_hint); + let metadata_location = format!( "{}/{}/{}/metadata/v{}.metadata.json", &warehouse_url, @@ -623,10 +669,8 @@ impl Catalog for HadoopCatalog { &table_name, &table_version_hint ); - println!("metadata_location: {}", &metadata_location); let metadata_content = self.file_io.new_input(&metadata_location)?.read().await?; - println!("metadata_content: {:?}", &metadata_content); let metadata = serde_json::from_slice::(&metadata_content)?; From c38a229e92e62a85b0e4e8045344a205e3947778 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Thu, 8 May 2025 11:21:47 +0800 Subject: [PATCH 04/24] feat:Add hadoop catalog mode(s3 list_namepaces) --- crates/catalog/hadoop/src/catalog.rs | 31 ++++++++++++++-------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index a75235f3e..d47fd3387 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -129,24 +129,31 @@ impl Catalog for HadoopCatalog { match self.config.warehouse.clone() { Some(warehouse_url) => { let bucket = warehouse_url.split("/").nth(2).unwrap_or(""); - let warehouse_prefix= warehouse_url + let warehouse_prefix_origin= warehouse_url .split("/") .skip(3) .collect::>() .join("/"); + let mut warehouse_prefix= warehouse_prefix_origin.clone(); let mut prefix = format!( "{}/", &warehouse_prefix ); if let Some(parent) = parent { - prefix = format!( - "{}/{}/", + warehouse_prefix = format!( + "{}/{}", &warehouse_prefix, parent.join("/") ); + prefix = format!( + "{}/", + &warehouse_prefix + ); } + println!("prefix={}",&prefix); + let list = s3_client .list_objects_v2() .bucket(bucket) @@ -162,26 +169,17 @@ impl Catalog for HadoopCatalog { })?; for object in list.common_prefixes.unwrap_or_default() { let key = object.prefix.unwrap_or_default(); - // Skip the prefix part and check if it's a first level directory - let relative_path = if key.starts_with(&prefix) { - &key[prefix.len()..] - } else { - &key - }; // Only add namespace if it's a first level directory (has only one /) - if relative_path.ends_with("/") { - let warehouse_relative_path = if key.starts_with(&warehouse_prefix) { - &key[warehouse_prefix.len()..] - } else { - &key - }; + if key.ends_with("/") && key.starts_with(&warehouse_prefix) { + let warehouse_relative_path =&key[warehouse_prefix_origin.len()..]; + println!("warehouse_relative_path={}",&warehouse_relative_path); let namespaces = warehouse_relative_path.split("/").collect::>(); let table_version_hint_path = format!( - "{}/{}/metadata/version-hint.text", + "{}{}metadata/version-hint.text", &warehouse_url .split("/") .skip(3) @@ -189,6 +187,7 @@ impl Catalog for HadoopCatalog { .join("/"), namespaces.join("/"), ); + println!("table_version_hint_path={}",&table_version_hint_path); //check It's not table let table_version_hint = s3_client From 2ee80797ea499759646c432954a32e02ecbe85a8 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Thu, 8 May 2025 11:59:06 +0800 Subject: [PATCH 05/24] feat:Add hadoop catalog mode(s3 create_namepace) --- crates/catalog/hadoop/src/catalog.rs | 58 +++++++++++----------------- 1 file changed, 23 insertions(+), 35 deletions(-) diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index d47fd3387..fc4001cae 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -17,7 +17,6 @@ use std::collections::HashMap; - use async_trait::async_trait; use iceberg::io::{FileIO, S3_ENDPOINT}; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; @@ -129,31 +128,19 @@ impl Catalog for HadoopCatalog { match self.config.warehouse.clone() { Some(warehouse_url) => { let bucket = warehouse_url.split("/").nth(2).unwrap_or(""); - let warehouse_prefix_origin= warehouse_url - .split("/") - .skip(3) - .collect::>() - .join("/"); - let mut warehouse_prefix= warehouse_prefix_origin.clone(); - let mut prefix = format!( - "{}/", - &warehouse_prefix - ); + let warehouse_prefix_origin = warehouse_url + .split("/") + .skip(3) + .collect::>() + .join("/"); + let mut warehouse_prefix = warehouse_prefix_origin.clone(); + let mut prefix = format!("{}/", &warehouse_prefix); if let Some(parent) = parent { - warehouse_prefix = format!( - "{}/{}", - &warehouse_prefix, - parent.join("/") - ); - prefix = format!( - "{}/", - &warehouse_prefix - ); + warehouse_prefix = format!("{}/{}", &warehouse_prefix, parent.join("/")); + prefix = format!("{}/", &warehouse_prefix); } - println!("prefix={}",&prefix); - let list = s3_client .list_objects_v2() .bucket(bucket) @@ -170,13 +157,11 @@ impl Catalog for HadoopCatalog { for object in list.common_prefixes.unwrap_or_default() { let key = object.prefix.unwrap_or_default(); // Only add namespace if it's a first level directory (has only one /) - + if key.ends_with("/") && key.starts_with(&warehouse_prefix) { - let warehouse_relative_path =&key[warehouse_prefix_origin.len()..]; - println!("warehouse_relative_path={}",&warehouse_relative_path); - + let warehouse_relative_path = &key[warehouse_prefix_origin.len()..]; + let namespaces = warehouse_relative_path.split("/").collect::>(); - let table_version_hint_path = format!( "{}{}metadata/version-hint.text", @@ -187,7 +172,6 @@ impl Catalog for HadoopCatalog { .join("/"), namespaces.join("/"), ); - println!("table_version_hint_path={}",&table_version_hint_path); //check It's not table let table_version_hint = s3_client @@ -202,11 +186,15 @@ impl Catalog for HadoopCatalog { format!("Failed to get table version hint: {}", e), ) }); - - - + if !table_version_hint.is_ok() && !namespaces.is_empty() { - result.push(NamespaceIdent::from_vec(namespaces.iter().filter(|e|!e.is_empty()).map(|e|e.to_string()).collect())?); + result.push(NamespaceIdent::from_vec( + namespaces + .iter() + .filter(|e| !e.is_empty()) + .map(|e| e.to_string()) + .collect(), + )?); } } } @@ -255,7 +243,7 @@ impl Catalog for HadoopCatalog { Some(warehouse_url) => { let bucket = warehouse_url.split("/").nth(2).unwrap_or(""); let prefix = format!( - "{}/{}", + "{}/{}/", &warehouse_url .split("/") .skip(3) @@ -267,6 +255,7 @@ impl Catalog for HadoopCatalog { .put_object() .bucket(bucket) .key(prefix) + .content_length(0) .send() .await .map_err(|e| { @@ -650,7 +639,6 @@ impl Catalog for HadoopCatalog { } }; - match table_version_hint_result { Ok(table_version_hint_result_output) => { let mut buf = Vec::new(); @@ -660,7 +648,7 @@ impl Catalog for HadoopCatalog { .read_to_end(&mut buf) .await?; let table_version_hint = String::from_utf8_lossy(&buf); - + let metadata_location = format!( "{}/{}/{}/metadata/v{}.metadata.json", &warehouse_url, From 8942623e8ef0c0573e5556d2741c1a0b54c4c9be Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Thu, 8 May 2025 13:14:03 +0800 Subject: [PATCH 06/24] feat:Add hadoop catalog mode(s3 get_namepace) --- crates/catalog/hadoop/src/catalog.rs | 29 ++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index fc4001cae..a7960870a 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -298,7 +298,7 @@ impl Catalog for HadoopCatalog { Some(warehouse_url) => { let bucket = warehouse_url.split("/").nth(2).unwrap_or(""); let prefix = format!( - "{}/{}", + "{}/{}/", &warehouse_url .split("/") .skip(3) @@ -309,7 +309,7 @@ impl Catalog for HadoopCatalog { s3_client .get_object() .bucket(bucket) - .key(prefix) + .key(&prefix) .send() .await .map_err(|e| { @@ -318,6 +318,31 @@ impl Catalog for HadoopCatalog { format!("Failed to get namespace: {}", e), ) })?; + let table_version_hint_path = format!( + "{}metadata/version-hint.text", + &prefix, + ); + + //check It's not table + let table_version_hint = s3_client + .get_object() + .bucket(bucket) + .key(table_version_hint_path) + .send() + .await + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to get table version hint: {}", e), + ) + }); + if table_version_hint.is_ok() { + return Err(Error::new( + ErrorKind::DataInvalid, + "It's a table not namespace", + )); + } + } None => { return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); From 4e91545a6d5ed984b9d8214f80c255dbf1282905 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Thu, 8 May 2025 13:28:36 +0800 Subject: [PATCH 07/24] feat:Add hadoop catalog mode(s3 namespace_exists) --- crates/catalog/hadoop/src/catalog.rs | 75 +++++++++++++++++----------- 1 file changed, 46 insertions(+), 29 deletions(-) diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index a7960870a..d00ba83b5 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -318,31 +318,27 @@ impl Catalog for HadoopCatalog { format!("Failed to get namespace: {}", e), ) })?; - let table_version_hint_path = format!( - "{}metadata/version-hint.text", - &prefix, - ); - - //check It's not table - let table_version_hint = s3_client - .get_object() - .bucket(bucket) - .key(table_version_hint_path) - .send() - .await - .map_err(|e| { - Error::new( - ErrorKind::DataInvalid, - format!("Failed to get table version hint: {}", e), - ) - }); - if table_version_hint.is_ok() { - return Err(Error::new( - ErrorKind::DataInvalid, - "It's a table not namespace", - )); - } + let table_version_hint_path = format!("{}metadata/version-hint.text", &prefix,); + //check It's not table + let table_version_hint = s3_client + .get_object() + .bucket(bucket) + .key(table_version_hint_path) + .send() + .await + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to get table version hint: {}", e), + ) + }); + if table_version_hint.is_ok() { + return Err(Error::new( + ErrorKind::DataInvalid, + "It's a table not namespace", + )); + } } None => { return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); @@ -380,7 +376,7 @@ impl Catalog for HadoopCatalog { Some(warehouse_url) => { let bucket = warehouse_url.split("/").nth(2).unwrap_or(""); let prefix = format!( - "{}/{}", + "{}/{}/", &warehouse_url .split("/") .skip(3) @@ -388,18 +384,39 @@ impl Catalog for HadoopCatalog { .join("/"), namespace.join("/") ); - s3_client + match s3_client .head_object() .bucket(bucket) - .key(prefix) + .key(&prefix) + .send() + .await + { + Ok(_) => (), + Err(e) => { + return Ok(false); + } + }; + let table_version_hint_path = format!("{}metadata/version-hint.text", &prefix,); + + //check It's not table + let table_version_hint = s3_client + .get_object() + .bucket(bucket) + .key(table_version_hint_path) .send() .await .map_err(|e| { Error::new( ErrorKind::DataInvalid, - format!("Failed to check namespace: {}", e), + format!("Failed to get table version hint: {}", e), ) - })?; + }); + if table_version_hint.is_ok() { + return Err(Error::new( + ErrorKind::DataInvalid, + "It's a table not namespace", + )); + } } None => { return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); From 4f9cb2a1f65e66135567529003665ee11eb0914c Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Thu, 8 May 2025 14:37:50 +0800 Subject: [PATCH 08/24] feat:Add hadoop catalog mode(s3 drop_namepace) --- crates/catalog/hadoop/src/catalog.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index d00ba83b5..7958c3b66 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -462,7 +462,7 @@ impl Catalog for HadoopCatalog { Some(warehouse_url) => { let bucket = warehouse_url.split("/").nth(2).unwrap_or(""); let prefix = format!( - "{}/{}", + "{}/{}/", &warehouse_url .split("/") .skip(3) From a5b0bad9c19184af34d5fd85eac955a3c296f37c Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Thu, 8 May 2025 16:27:03 +0800 Subject: [PATCH 09/24] feat:Add hadoop catalog mode(s3 list_tables) --- crates/catalog/hadoop/src/catalog.rs | 31 +++++++++++++--------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index 7958c3b66..082bb601e 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -512,7 +512,7 @@ impl Catalog for HadoopCatalog { Some(warehouse_url) => { let bucket = warehouse_url.split("/").nth(2).unwrap_or(""); let prefix = format!( - "{}/{}", + "{}/{}/", &warehouse_url .split("/") .skip(3) @@ -523,7 +523,8 @@ impl Catalog for HadoopCatalog { let list = s3_client .list_objects_v2() .bucket(bucket) - .prefix(prefix) + .prefix(&prefix) + .delimiter("/") .send() .await .map_err(|e| { @@ -533,22 +534,18 @@ impl Catalog for HadoopCatalog { ) })?; let mut result = Vec::new(); - for object in list.contents.unwrap_or_default() { - let key = object.key.unwrap_or_default(); + for object in list.common_prefixes.unwrap_or_default() { + let key = object.prefix.unwrap_or_default(); if key.ends_with("/") { - let table_name = key.split("/").nth(4).unwrap_or(""); - - let table_version_hint_path = format!( - "{}/{}/{}/metadata/version-hint.text", - &warehouse_url - .split("/") - .skip(3) - .collect::>() - .join("/"), - namespace.join("/"), - &table_name - ); - + let mut table_name = if key.ends_with("/") { + key[..key.len() - 1].to_string() + } else { + key.clone() + }; + table_name = table_name.split("/").last().unwrap_or(&"").to_string(); + + let table_version_hint_path = + format!("{}metadata/version-hint.text", &key); let table_version_hint = s3_client .get_object() .bucket(bucket) From 45ab09705fef5f6f9dcf877faf37bec29fe932c7 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Fri, 9 May 2025 10:13:00 +0800 Subject: [PATCH 10/24] feat:Add hadoop catalog mode(s3 create table ,drop table) --- crates/catalog/hadoop/src/catalog.rs | 107 ++++++++++++++++++++++++--- crates/catalog/hadoop/src/utils.rs | 8 +- 2 files changed, 99 insertions(+), 16 deletions(-) diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index 082bb601e..0f09998ab 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -470,18 +470,33 @@ impl Catalog for HadoopCatalog { .join("/"), namespace.join("/") ); - s3_client - .delete_object() + let list = s3_client + .list_objects_v2() .bucket(bucket) - .key(prefix) + .prefix(&prefix) .send() .await .map_err(|e| { Error::new( ErrorKind::DataInvalid, - format!("Failed to delete namespace: {}", e), + format!("Failed to list objects: {}", e), ) })?; + + for object in list.contents.unwrap_or_default() { + s3_client + .delete_object() + .bucket(bucket) + .key(object.key.unwrap_or_default()) + .send() + .await + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to delete object: {}", e) + ) + })?; + } } None => { return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); @@ -613,16 +628,72 @@ impl Catalog for HadoopCatalog { )); } }; + + let table_version_hint_path = format!("{}/metadata/version-hint.text", &location,); + + if self.s3_client.is_some() { + let s3_client = self.s3_client.as_ref().unwrap(); + let table_version_hint_relative_path= + format!("{}/metadata/version-hint.text", &location + .split("/") + .skip(3) + .collect::>() + .join("/")); + + match self.config.warehouse.clone() { + Some(warehouse_url) => { + let bucket = warehouse_url.split("/").nth(2).unwrap_or(""); + println!( + "bucket: {}", + bucket.clone() + ); + let table_version_hint = s3_client + .get_object() + .bucket(bucket) + .key(&table_version_hint_relative_path) + .send() + .await + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to get table version hint: {}", e), + ) + }); + if table_version_hint.is_ok() { + return Err(Error::new(ErrorKind::DataInvalid, "Table already exists")); + } + + + if valid_s3_namespaces(&namespace).is_err() { + return Err(Error::new(ErrorKind::DataInvalid, "Invalid namespace name")); + } + } + None => { + return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); + } + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "s3 client is not initialized", + )); + } + let metadata = TableMetadataBuilder::from_table_creation(creation)? .build()? .metadata; - let metadata_location = create_metadata_location(&location, 0)?; + let metadata_location = create_metadata_location(&location, 1)?; self.file_io .new_output(&metadata_location)? .write(serde_json::to_vec(&metadata)?.into()) .await?; + + self.file_io + .new_output(&table_version_hint_path)? + .write("1".into()) + .await?; Table::builder() .file_io(self.file_io.clone()) .metadata_location(metadata_location) @@ -658,6 +729,7 @@ impl Catalog for HadoopCatalog { table_ident.namespace.join("/"), &table_name ); + let table_version_hint_result = s3_client .get_object() .bucket(bucket) @@ -751,7 +823,7 @@ impl Catalog for HadoopCatalog { let bucket = warehouse_url.split("/").nth(2).unwrap_or(""); let table_name = table.name.clone(); let table_path = format!( - "{}/{}/{}", + "{}/{}/{}/", &warehouse_url .split("/") .skip(3) @@ -760,18 +832,33 @@ impl Catalog for HadoopCatalog { table.namespace.join("/"), &table_name ); - s3_client - .delete_object() + let list = s3_client + .list_objects_v2() .bucket(bucket) - .key(table_path) + .prefix(&table_path) .send() .await .map_err(|e| { Error::new( ErrorKind::DataInvalid, - format!("Failed to delete table: {}", e), + format!("Failed to list objects: {}", e), ) })?; + + for object in list.contents.unwrap_or_default() { + s3_client + .delete_object() + .bucket(bucket) + .key(object.key.unwrap_or_default()) + .send() + .await + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to delete object: {}", e) + ) + })?; + } } None => { return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); diff --git a/crates/catalog/hadoop/src/utils.rs b/crates/catalog/hadoop/src/utils.rs index 0e1516276..8350306f3 100644 --- a/crates/catalog/hadoop/src/utils.rs +++ b/crates/catalog/hadoop/src/utils.rs @@ -19,7 +19,6 @@ use aws_sdk_s3::{Config, config::{Credentials,BehaviorVersion}}; use iceberg::NamespaceIdent; use iceberg::{Error, ErrorKind, Namespace, Result}; use std::collections::HashMap; -use uuid::Uuid; /// Property aws profile name pub const AWS_PROFILE_NAME: &str = "profile_name"; @@ -89,13 +88,10 @@ pub(crate) fn create_metadata_location( )); }; - let version = format!("{:0>5}", version); - let id = Uuid::new_v4(); let metadata_location = format!( - "{}/metadata/{}-{}.metadata.json", + "{}/metadata/v{}.metadata.json", warehouse_location.as_ref(), - version, - id + &version ); Ok(metadata_location) From e633e9ab1a744fcc889d7b7bce0fa244c5c7d481 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Fri, 9 May 2025 10:26:42 +0800 Subject: [PATCH 11/24] feat:Add hadoop catalog mode(s3 table_exists) --- crates/catalog/hadoop/src/catalog.rs | 42 ++++++++-------------------- crates/catalog/hadoop/src/utils.rs | 17 ++++++----- 2 files changed, 19 insertions(+), 40 deletions(-) diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index 0f09998ab..f6a74cee0 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -492,8 +492,8 @@ impl Catalog for HadoopCatalog { .await .map_err(|e| { Error::new( - ErrorKind::DataInvalid, - format!("Failed to delete object: {}", e) + ErrorKind::DataInvalid, + format!("Failed to delete object: {}", e), ) })?; } @@ -630,23 +630,17 @@ impl Catalog for HadoopCatalog { }; let table_version_hint_path = format!("{}/metadata/version-hint.text", &location,); - + if self.s3_client.is_some() { let s3_client = self.s3_client.as_ref().unwrap(); - let table_version_hint_relative_path= - format!("{}/metadata/version-hint.text", &location - .split("/") - .skip(3) - .collect::>() - .join("/")); - + let table_version_hint_relative_path = format!( + "{}/metadata/version-hint.text", + &location.split("/").skip(3).collect::>().join("/") + ); + match self.config.warehouse.clone() { Some(warehouse_url) => { let bucket = warehouse_url.split("/").nth(2).unwrap_or(""); - println!( - "bucket: {}", - bucket.clone() - ); let table_version_hint = s3_client .get_object() .bucket(bucket) @@ -663,7 +657,6 @@ impl Catalog for HadoopCatalog { return Err(Error::new(ErrorKind::DataInvalid, "Table already exists")); } - if valid_s3_namespaces(&namespace).is_err() { return Err(Error::new(ErrorKind::DataInvalid, "Invalid namespace name")); } @@ -729,26 +722,13 @@ impl Catalog for HadoopCatalog { table_ident.namespace.join("/"), &table_name ); - + let table_version_hint_result = s3_client .get_object() .bucket(bucket) .key(table_version_hint_path) .send() .await; - let location = match self.config.warehouse.clone() { - Some(warehouse_url) => get_default_table_location( - &table_ident.namespace, - &table_name, - &warehouse_url, - ), - None => { - return Err(Error::new( - ErrorKind::DataInvalid, - "warehouse_url is required", - )); - } - }; match table_version_hint_result { Ok(table_version_hint_result_output) => { @@ -854,8 +834,8 @@ impl Catalog for HadoopCatalog { .await .map_err(|e| { Error::new( - ErrorKind::DataInvalid, - format!("Failed to delete object: {}", e) + ErrorKind::DataInvalid, + format!("Failed to delete object: {}", e), ) })?; } diff --git a/crates/catalog/hadoop/src/utils.rs b/crates/catalog/hadoop/src/utils.rs index 8350306f3..4fbe25b66 100644 --- a/crates/catalog/hadoop/src/utils.rs +++ b/crates/catalog/hadoop/src/utils.rs @@ -15,13 +15,14 @@ // specific language governing permissions and limitations // under the License. -use aws_sdk_s3::{Config, config::{Credentials,BehaviorVersion}}; +use aws_sdk_s3::{ + config::{BehaviorVersion, Credentials}, + Config, +}; use iceberg::NamespaceIdent; use iceberg::{Error, ErrorKind, Namespace, Result}; use std::collections::HashMap; -/// Property aws profile name -pub const AWS_PROFILE_NAME: &str = "profile_name"; /// Property aws region pub const AWS_REGION_NAME: &str = "region_name"; /// Property aws access key @@ -34,13 +35,12 @@ pub const AWS_SESSION_TOKEN: &str = "aws_session_token"; pub const S3_PATH_STYLE_ACCESS: &str = "s3.path-style-access"; /// Creates an aws sdk configuration based on /// provided properties and an optional endpoint URL. -pub(crate) fn create_sdk_config( +pub(crate) fn create_sdk_config( properties: &HashMap, endpoint_url: Option, ) -> Config { - let mut config =Config::builder() - .behavior_version(BehaviorVersion::latest()); - + let mut config = Config::builder().behavior_version(BehaviorVersion::latest()); + if properties.is_empty() { return config.build(); } @@ -49,7 +49,6 @@ pub(crate) fn create_sdk_config( config = config.endpoint_url(endpoint_url); } - if let Some(path_style_access) = properties.get(S3_PATH_STYLE_ACCESS) { config = config.force_path_style(path_style_access.parse::().unwrap_or(false)); } @@ -146,4 +145,4 @@ pub(crate) fn get_default_table_location( namespace.join("/"), table_name.as_ref() ); -} \ No newline at end of file +} From 5c1309c61784e9b76c43441ac4eb8ac5000a1abd Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Fri, 9 May 2025 14:22:45 +0800 Subject: [PATCH 12/24] feat:Add hadoop catalog mode(add hdfs-native io) --- crates/catalog/hadoop/src/catalog.rs | 12 +----- crates/catalog/hadoop/src/utils.rs | 2 +- crates/iceberg/Cargo.toml | 6 +-- crates/iceberg/src/io/file_io.rs | 4 ++ crates/iceberg/src/io/mod.rs | 4 ++ crates/iceberg/src/io/storage.rs | 27 +++++++++++- crates/iceberg/src/io/storage_hdfs_native.rs | 45 ++++++++++++++++++++ 7 files changed, 85 insertions(+), 15 deletions(-) create mode 100644 crates/iceberg/src/io/storage_hdfs_native.rs diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index f6a74cee0..580e45b43 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -67,7 +67,6 @@ pub struct HadoopCatalog { impl HadoopCatalog { /// Creates a new Hadoop catalog. pub async fn new(config: HadoopCatalogConfig) -> Result { - let mut s3_client: Option = None; if let Some(warehouse_url) = &config.warehouse { if warehouse_url.starts_with("s3://") || warehouse_url.starts_with("s3a://") { let mut io_props = config.properties.clone(); @@ -81,7 +80,7 @@ impl HadoopCatalog { .with_props(&io_props) .build()?; let aws_config = create_sdk_config(&config.properties, config.endpoint_url.clone()); - s3_client = Some(aws_sdk_s3::Client::from_conf(aws_config)); + let s3_client = Some(aws_sdk_s3::Client::from_conf(aws_config)); return Ok(Self { config: config, file_io, @@ -392,7 +391,7 @@ impl Catalog for HadoopCatalog { .await { Ok(_) => (), - Err(e) => { + Err(_e) => { return Ok(false); } }; @@ -593,8 +592,6 @@ impl Catalog for HadoopCatalog { "s3 client is not initialized", )); } - - Ok(vec![]) } /// Creates a new table within a specified namespace. @@ -778,11 +775,6 @@ impl Catalog for HadoopCatalog { "s3 client is not initialized", )); } - - Err(Error::new( - ErrorKind::Unexpected, - "Table does not have a metadata location", - )) } /// Asynchronously drops a table from the database. diff --git a/crates/catalog/hadoop/src/utils.rs b/crates/catalog/hadoop/src/utils.rs index 4fbe25b66..0b3bbd5e0 100644 --- a/crates/catalog/hadoop/src/utils.rs +++ b/crates/catalog/hadoop/src/utils.rs @@ -20,7 +20,7 @@ use aws_sdk_s3::{ Config, }; use iceberg::NamespaceIdent; -use iceberg::{Error, ErrorKind, Namespace, Result}; +use iceberg::{Error, ErrorKind, Result}; use std::collections::HashMap; /// Property aws region diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 8795edc74..8c2fb46be 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -29,15 +29,15 @@ license = { workspace = true } repository = { workspace = true } [features] -default = ["storage-memory", "storage-fs", "storage-s3", "tokio"] -storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs"] +default = ["storage-memory", "storage-fs", "storage-s3","storage-hdfs-native", "tokio"] +storage-all = ["storage-memory", "storage-fs", "storage-s3","storage-hdfs-native", "storage-gcs"] storage-fs = ["opendal/services-fs"] storage-gcs = ["opendal/services-gcs"] storage-memory = ["opendal/services-memory"] storage-oss = ["opendal/services-oss"] storage-s3 = ["opendal/services-s3"] - +storage-hdfs-native = ["opendal/services-hdfs-native"] async-std = ["dep:async-std"] tokio = ["tokio/rt-multi-thread"] diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index cadef7d54..cfafcb309 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -41,6 +41,7 @@ use crate::{Error, ErrorKind, Result}; /// | Memory | `storage-memory` | `memory` | /// | S3 | `storage-s3` | `s3`, `s3a`| /// | GCS | `storage-gcs` | `gs`, `gcs`| +/// | Hdfs | `storage-hdfs-native` | `hdfs`| #[derive(Clone, Debug)] pub struct FileIO { builder: FileIOBuilder, @@ -501,6 +502,9 @@ mod tests { let io = FileIO::from_path("s3://bucket/a").unwrap(); assert_eq!("s3", io.scheme_str.unwrap().as_str()); + let io = FileIO::from_path("hdfs://tmp/a").unwrap(); + assert_eq!("hdfs", io.scheme_str.unwrap().as_str()); + let io = FileIO::from_path("tmp/||c"); assert!(io.is_err()); } diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index 8e0638257..fe047304f 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -78,6 +78,10 @@ use storage_memory::*; mod storage_s3; #[cfg(feature = "storage-s3")] pub use storage_s3::*; +#[cfg(feature = "storage-hdfs-native")] +mod storage_hdfs_native; +#[cfg(feature = "storage-hdfs-native")] +pub use storage_hdfs_native::*; pub(crate) mod object_cache; #[cfg(feature = "storage-fs")] mod storage_fs; diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index 638a2efc3..e1933b321 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -20,6 +20,8 @@ use std::sync::Arc; use opendal::layers::RetryLayer; #[cfg(feature = "storage-gcs")] use opendal::services::GcsConfig; +#[cfg(feature = "storage-hdfs-native")] +use opendal::services::HdfsNativeConfig; #[cfg(feature = "storage-oss")] use opendal::services::OssConfig; #[cfg(feature = "storage-s3")] @@ -47,6 +49,8 @@ pub(crate) enum Storage { Oss { config: Arc }, #[cfg(feature = "storage-gcs")] Gcs { config: Arc }, + #[cfg(feature = "storage-hdfs-native")] + HdfsNative { config: Arc }, } impl Storage { @@ -73,6 +77,10 @@ impl Storage { Scheme::Oss => Ok(Self::Oss { config: super::oss_config_parse(props)?.into(), }), + #[cfg(feature = "storage-hdfs-native")] + Scheme::HdfsNative => Ok(Self::HdfsNative { + config: super::hdfs_native_config_parse(props)?.into(), + }), // Update doc on [`FileIO`] when adding new schemes. _ => Err(Error::new( ErrorKind::FeatureUnsupported, @@ -162,11 +170,27 @@ impl Storage { )) } } + #[cfg(feature = "storage-hdfs-native")] + Storage::HdfsNative { config } => { + let op = super::hdfs_native_config_build(config)?; + + // Check prefix of oss path. + let prefix = format!("hdfs://{}/", op.info().name()); + if path.starts_with(&prefix) { + Ok((op, &path[prefix.len()..])) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid hdfs url: {}, should start with {}", path, prefix), + )) + } + } #[cfg(all( not(feature = "storage-s3"), not(feature = "storage-fs"), not(feature = "storage-gcs"), - not(feature = "storage-oss") + not(feature = "storage-oss"), + not(feature = "storage-hdfs-native") ))] _ => Err(Error::new( ErrorKind::FeatureUnsupported, @@ -189,6 +213,7 @@ impl Storage { "s3" | "s3a" => Ok(Scheme::S3), "gs" | "gcs" => Ok(Scheme::Gcs), "oss" => Ok(Scheme::Oss), + "hdfs" => Ok(Scheme::HdfsNative), s => Ok(s.parse::()?), } } diff --git a/crates/iceberg/src/io/storage_hdfs_native.rs b/crates/iceberg/src/io/storage_hdfs_native.rs new file mode 100644 index 000000000..e0e4b0f2d --- /dev/null +++ b/crates/iceberg/src/io/storage_hdfs_native.rs @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use opendal::services::HdfsNativeConfig; +use opendal::{Configurator, Operator}; + +use crate::Result; + +/// The configuration key for the default filesystem in core-site.xml. +/// This key is typically used to specify the HDFS namenode address. +pub const FS_DEFAULTFS: &str = "fs.defaultFS"; + +pub(crate) fn hdfs_native_config_parse(mut m: HashMap) -> Result { + let mut cfg = HdfsNativeConfig::default(); + cfg.root = Some("/".to_string()); + + if let Some(default_fs) = m.remove(FS_DEFAULTFS) { + cfg.name_node = Some(default_fs); + } + + Ok(cfg) +} + +/// Build new opendal operator from give path. +pub(crate) fn hdfs_native_config_build(cfg: &HdfsNativeConfig) -> Result { + let builder = cfg.clone().into_builder(); + + Ok(Operator::new(builder)?.finish()) +} From 660ec6ec9b9fcedfce67567e8031bfbc3052b033 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Fri, 9 May 2025 15:26:09 +0800 Subject: [PATCH 13/24] feat:Add hadoop catalog mode(hdfs_native list_namespaces) --- Cargo.toml | 1 + crates/catalog/hadoop/Cargo.toml | 1 + crates/catalog/hadoop/src/catalog.rs | 61 +++++++++++++++++++++++++++- crates/catalog/hadoop/src/utils.rs | 6 +++ 4 files changed, 68 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index b39e5a408..d72d58eea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -118,3 +118,4 @@ uuid = { version = "1.14", features = ["v7"] } volo = "0.10.6" volo-thrift = "0.10.6" zstd = "0.13.2" +hdfs-native = { version = "0.10" } diff --git a/crates/catalog/hadoop/Cargo.toml b/crates/catalog/hadoop/Cargo.toml index e2b36cfce..c9122d4a9 100644 --- a/crates/catalog/hadoop/Cargo.toml +++ b/crates/catalog/hadoop/Cargo.toml @@ -38,6 +38,7 @@ typed-builder = { workspace = true } uuid = { workspace = true, features = ["v4"] } aws-sdk-s3 = {version="1.84.0",features = ["behavior-version-latest"]} tokio = { workspace = true } +hdfs-native = { workspace = true } [dev-dependencies] iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } itertools = { workspace = true } diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index 580e45b43..a69fbb370 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -30,6 +30,7 @@ use typed_builder::TypedBuilder; use crate::utils::{ create_metadata_location, create_sdk_config, get_default_table_location, valid_s3_namespaces, + FS_DEFAULTFS, }; /// Hadoop catalog configuration. @@ -62,6 +63,7 @@ pub struct HadoopCatalog { config: HadoopCatalogConfig, file_io: FileIO, s3_client: Option, + hdfs_native_client: Option, } impl HadoopCatalog { @@ -85,16 +87,28 @@ impl HadoopCatalog { config: config, file_io, s3_client: s3_client, + hdfs_native_client: None, }); } else if warehouse_url.starts_with("hdfs://") { //todo hdfs native client let file_io = FileIO::from_path(&warehouse_url)? .with_props(&config.properties) .build()?; + let default_fs = config + .properties + .get(FS_DEFAULTFS) + .ok_or(iceberg::Error::new( + ErrorKind::DataInvalid, + " fs.defaultFS is null", + ))?; + let hdfs_native_client = + hdfs_native::Client::new_with_config(&default_fs, config.properties.clone()) + .map_err(|e| iceberg::Error::new(ErrorKind::Unexpected, e.to_string()))?; return Ok(Self { config: config, file_io: file_io, s3_client: None, + hdfs_native_client: Some(hdfs_native_client), }); } @@ -202,10 +216,55 @@ impl Catalog for HadoopCatalog { return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); } } + } else if self.hdfs_native_client.is_some() { + let hdfs_native_client = self.hdfs_native_client.as_ref().unwrap(); + let default_fs = + self.config + .properties + .get(FS_DEFAULTFS) + .ok_or(iceberg::Error::new( + ErrorKind::DataInvalid, + " fs.defaultFS is null", + ))?; + + match self.config.warehouse.clone() { + Some(warehouse_url) => { + let mut prefix = warehouse_url[default_fs.len()..].to_string(); + if let Some(parent) = parent { + prefix = format!("{}/{}", &prefix, parent.join("/")); + } + let list = hdfs_native_client + .list_status(&prefix, false) + .await + .map_err(|e| iceberg::Error::new(ErrorKind::Unexpected, e.to_string()))?; + for f in list { + let table_version_hint_path = + format!("{}/metadata/version-hint.text", f.path.clone(),); + if hdfs_native_client + .get_file_info(&table_version_hint_path) + .await + .is_err() + { + let file_relative_path = f.path[prefix.len()..].to_string(); + let namespaces = file_relative_path.split("/").collect::>(); + result.push(NamespaceIdent::from_vec( + namespaces + .iter() + .filter(|e| !e.is_empty()) + .map(|e| e.to_string()) + .collect(), + )?); + } + } + } + None => { + return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); + } + } } else { return Err(Error::new( ErrorKind::DataInvalid, - "s3 client is not initialized", + "s3 client or hdfs native client is not initialized", )); } diff --git a/crates/catalog/hadoop/src/utils.rs b/crates/catalog/hadoop/src/utils.rs index 0b3bbd5e0..5c959f396 100644 --- a/crates/catalog/hadoop/src/utils.rs +++ b/crates/catalog/hadoop/src/utils.rs @@ -33,6 +33,12 @@ pub const AWS_SECRET_ACCESS_KEY: &str = "aws_secret_access_key"; pub const AWS_SESSION_TOKEN: &str = "aws_session_token"; /// S3 Path Style Access. pub const S3_PATH_STYLE_ACCESS: &str = "s3.path-style-access"; + + +/// The configuration key for the default filesystem in core-site.xml. +/// This key is typically used to specify the HDFS namenode address. +pub const FS_DEFAULTFS: &str = "fs.defaultFS"; + /// Creates an aws sdk configuration based on /// provided properties and an optional endpoint URL. pub(crate) fn create_sdk_config( From 813947fae9acac84c1075be6ce4ef75191d76e1e Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Fri, 9 May 2025 15:44:56 +0800 Subject: [PATCH 14/24] feat:Add hadoop catalog mode(hdfs_native create_namespace) --- crates/catalog/hadoop/src/catalog.rs | 30 +++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index a69fbb370..73225512c 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -327,10 +327,38 @@ impl Catalog for HadoopCatalog { return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); } } + } else if self.hdfs_native_client.is_some() { + let hdfs_native_client = self.hdfs_native_client.as_ref().unwrap(); + let default_fs = + self.config + .properties + .get(FS_DEFAULTFS) + .ok_or(iceberg::Error::new( + ErrorKind::DataInvalid, + " fs.defaultFS is null", + ))?; + + match self.config.warehouse.clone() { + Some(warehouse_url) => { + let prefix = format!( + "{}/{}", + &warehouse_url[default_fs.len()..].to_string(), + namespace.join("/") + ); + + hdfs_native_client + .mkdirs(&prefix, 0o755, true) + .await + .map_err(|e| iceberg::Error::new(ErrorKind::Unexpected, e.to_string()))?; + } + None => { + return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); + } + } } else { return Err(Error::new( ErrorKind::DataInvalid, - "s3 client is not initialized", + "s3 client or hdfs native client is not initialized", )); } From 45e1bb7dbafb92168cf8ba063dd804aabf8000ac Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Fri, 9 May 2025 15:49:21 +0800 Subject: [PATCH 15/24] feat:Add hadoop catalog mode(hdfs_native get_namespace) --- crates/catalog/hadoop/src/catalog.rs | 44 +++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index 73225512c..6e4cc60b4 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -430,10 +430,52 @@ impl Catalog for HadoopCatalog { return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); } } + } else if self.hdfs_native_client.is_some() { + let hdfs_native_client = self.hdfs_native_client.as_ref().unwrap(); + let default_fs = + self.config + .properties + .get(FS_DEFAULTFS) + .ok_or(iceberg::Error::new( + ErrorKind::DataInvalid, + " fs.defaultFS is null", + ))?; + + match self.config.warehouse.clone() { + Some(warehouse_url) => { + let prefix = format!( + "{}/{}", + &warehouse_url[default_fs.len()..].to_string(), + namespace.join("/") + ); + + hdfs_native_client + .get_file_info(&prefix) + .await + .map_err(|e| iceberg::Error::new(ErrorKind::Unexpected, e.to_string()))?; + + let table_version_hint_path = + format!("{}/metadata/version-hint.text", &prefix,); + //check It's not table + let table_version_hint = hdfs_native_client + .get_file_info(&table_version_hint_path) + .await + .map_err(|e| iceberg::Error::new(ErrorKind::Unexpected, e.to_string())); + if table_version_hint.is_ok() { + return Err(Error::new( + ErrorKind::DataInvalid, + "It's a table not namespace", + )); + } + } + None => { + return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); + } + } } else { return Err(Error::new( ErrorKind::DataInvalid, - "s3 client is not initialized", + "s3 client or hdfs native client is not initialized", )); } From 1d4cda4950c2b5983c15c378b942fa2f146e4ce6 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Fri, 9 May 2025 15:53:30 +0800 Subject: [PATCH 16/24] feat:Add hadoop catalog mode(hdfs_native namespace_exists) --- crates/catalog/hadoop/src/catalog.rs | 49 +++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index 6e4cc60b4..48ffa8abc 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -550,10 +550,57 @@ impl Catalog for HadoopCatalog { return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); } } + } else if self.hdfs_native_client.is_some() { + let hdfs_native_client = self.hdfs_native_client.as_ref().unwrap(); + let default_fs = + self.config + .properties + .get(FS_DEFAULTFS) + .ok_or(iceberg::Error::new( + ErrorKind::DataInvalid, + " fs.defaultFS is null", + ))?; + + match self.config.warehouse.clone() { + Some(warehouse_url) => { + let prefix = format!( + "{}/{}", + &warehouse_url[default_fs.len()..].to_string(), + namespace.join("/") + ); + + match hdfs_native_client + .get_file_info(&prefix) + .await + .map_err(|e| iceberg::Error::new(ErrorKind::Unexpected, e.to_string())) + { + Ok(_) => (), + Err(_e) => { + return Ok(false); + } + } + let table_version_hint_path = + format!("{}/metadata/version-hint.text", &prefix,); + //check It's not table + let table_version_hint = hdfs_native_client + .get_file_info(&table_version_hint_path) + .await + .map_err(|e| iceberg::Error::new(ErrorKind::Unexpected, e.to_string())); + if table_version_hint.is_ok() { + return Err(Error::new( + ErrorKind::DataInvalid, + "It's a table not namespace", + )); + } + } + None => { + return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); + } + } } else { return Err(Error::new( ErrorKind::DataInvalid, - "s3 client is not initialized", + "s3 client or hdfs native client is not initialized", )); } From b13a01dcadeaa95d1a9f21085467c05e337bf163 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Fri, 9 May 2025 15:56:56 +0800 Subject: [PATCH 17/24] feat:Add hadoop catalog mode(hdfs_native drop_namespace) --- crates/catalog/hadoop/src/catalog.rs | 30 +++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index 48ffa8abc..7c8961311 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -677,10 +677,38 @@ impl Catalog for HadoopCatalog { return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); } } + } else if self.hdfs_native_client.is_some() { + let hdfs_native_client = self.hdfs_native_client.as_ref().unwrap(); + let default_fs = + self.config + .properties + .get(FS_DEFAULTFS) + .ok_or(iceberg::Error::new( + ErrorKind::DataInvalid, + " fs.defaultFS is null", + ))?; + + match self.config.warehouse.clone() { + Some(warehouse_url) => { + let prefix = format!( + "{}/{}", + &warehouse_url[default_fs.len()..].to_string(), + namespace.join("/") + ); + + hdfs_native_client + .delete(&prefix, true) + .await + .map_err(|e| iceberg::Error::new(ErrorKind::Unexpected, e.to_string()))?; + } + None => { + return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); + } + } } else { return Err(Error::new( ErrorKind::DataInvalid, - "s3 client is not initialized", + "s3 client or hdfs native client is not initialized", )); } From 899228c40dc6e8893f37e1ad55e18211f4bffbb4 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Fri, 9 May 2025 16:01:19 +0800 Subject: [PATCH 18/24] feat:Add hadoop catalog mode(hdfs_native list_tables) --- crates/catalog/hadoop/src/catalog.rs | 45 +++++++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index 7c8961311..b41243d81 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -790,10 +790,53 @@ impl Catalog for HadoopCatalog { return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); } } + } else if self.hdfs_native_client.is_some() { + let hdfs_native_client = self.hdfs_native_client.as_ref().unwrap(); + let default_fs = + self.config + .properties + .get(FS_DEFAULTFS) + .ok_or(iceberg::Error::new( + ErrorKind::DataInvalid, + " fs.defaultFS is null", + ))?; + + match self.config.warehouse.clone() { + Some(warehouse_url) => { + let prefix = format!( + "{}/{}", + &warehouse_url[default_fs.len()..].to_string(), + namespace.join("/") + ); + + let list = hdfs_native_client + .list_status(&prefix, true) + .await + .map_err(|e| iceberg::Error::new(ErrorKind::Unexpected, e.to_string()))?; + let mut result = Vec::new(); + for f in list { + let table_version_hint_path = + format!("{}/metadata/version-hint.text", f.path.clone(),); + if hdfs_native_client + .get_file_info(&table_version_hint_path) + .await + .is_ok() + { + let file_relative_path = f.path[prefix.len()..].to_string(); + let table_name = file_relative_path.split("/").last().unwrap_or(&""); + result.push(TableIdent::new(namespace.clone(), table_name.to_string())); + } + } + return Ok(result); + } + None => { + return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); + } + } } else { return Err(Error::new( ErrorKind::DataInvalid, - "s3 client is not initialized", + "s3 client or hdfs native client is not initialized", )); } } From 20540b94ce31322e6fa38b9b77ca293cfc746be1 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Fri, 9 May 2025 16:08:11 +0800 Subject: [PATCH 19/24] feat:Add hadoop catalog mode(hdfs_native create_table) --- crates/catalog/hadoop/src/catalog.rs | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index b41243d81..dc9d69ccd 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -909,10 +909,32 @@ impl Catalog for HadoopCatalog { return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); } } + } else if self.hdfs_native_client.is_some() { + let hdfs_native_client = self.hdfs_native_client.as_ref().unwrap(); + let default_fs = + self.config + .properties + .get(FS_DEFAULTFS) + .ok_or(iceberg::Error::new( + ErrorKind::DataInvalid, + " fs.defaultFS is null", + ))?; + + let table_version_hint_relative_path = format!( + "{}/metadata/version-hint.text", + &location[default_fs.len()..] + ); + let table_version_hint = hdfs_native_client + .get_file_info(&table_version_hint_relative_path) + .await + .map_err(|e| iceberg::Error::new(ErrorKind::Unexpected, e.to_string())); + if table_version_hint.is_ok() { + return Err(Error::new(ErrorKind::DataInvalid, "Table already exists")); + } } else { return Err(Error::new( ErrorKind::DataInvalid, - "s3 client is not initialized", + "s3 client or hdfs native client is not initialized", )); } From 794216fa681427e2941937682266c6e837716499 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Fri, 9 May 2025 16:23:41 +0800 Subject: [PATCH 20/24] feat:Add hadoop catalog mode(hdfs_native load_table) --- crates/catalog/hadoop/src/catalog.rs | 75 +++++++++++++++++++++++++++- 1 file changed, 74 insertions(+), 1 deletion(-) diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index dc9d69ccd..70e8ccdab 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -1038,10 +1038,83 @@ impl Catalog for HadoopCatalog { return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); } } + } else if self.hdfs_native_client.is_some() { + let hdfs_native_client = self.hdfs_native_client.as_ref().unwrap(); + let default_fs = + self.config + .properties + .get(FS_DEFAULTFS) + .ok_or(iceberg::Error::new( + ErrorKind::DataInvalid, + " fs.defaultFS is null", + ))?; + + match self.config.warehouse.clone() { + Some(warehouse_url) => { + let table_name = table_ident.name.clone(); + let table_version_hint_path = format!( + "{}/{}/{}/metadata/version-hint.text", + &warehouse_url[default_fs.len()..].to_string(), + table_ident.namespace.join("/"), + &table_name + ); + + let table_version_hint_result = hdfs_native_client + .get_file_info(&table_version_hint_path) + .await + .map_err(|e| iceberg::Error::new(ErrorKind::Unexpected, e.to_string())); + + match table_version_hint_result { + Ok(table_version_hint_file_status) => { + let mut table_version_hint_reader = hdfs_native_client + .read(&table_version_hint_path) + .await + .map_err(|e| { + iceberg::Error::new(ErrorKind::Unexpected, e.to_string()) + })?; + let buf = table_version_hint_reader + .read(table_version_hint_file_status.length as usize) + .await + .map_err(|e| { + iceberg::Error::new(ErrorKind::Unexpected, e.to_string()) + })?; + let table_version_hint = String::from_utf8_lossy(&buf); + + let metadata_location = format!( + "{}/{}/{}/metadata/v{}.metadata.json", + &warehouse_url[default_fs.len()..].to_string(), + table_ident.namespace.join("/"), + &table_name, + &table_version_hint + ); + let metadata_content = + self.file_io.new_input(&metadata_location)?.read().await?; + let metadata = + serde_json::from_slice::(&metadata_content)?; + + return Ok(Table::builder() + .file_io(self.file_io.clone()) + .metadata_location(metadata_location) + .metadata(metadata) + .identifier(table_ident.clone()) + .build()?); + } + Err(e) => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Failed to get table version hint: {}", e), + )); + } + } + } + None => { + return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); + } + } } else { return Err(Error::new( ErrorKind::DataInvalid, - "s3 client is not initialized", + "s3 client or hdfs native client is not initialized", )); } } From f368bc483be2adda258252f67973d0380c2b3d5d Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Fri, 9 May 2025 16:32:04 +0800 Subject: [PATCH 21/24] feat:Add hadoop catalog mode(hdfs_native drop_table) --- crates/catalog/hadoop/src/catalog.rs | 36 +++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index 70e8ccdab..15d076beb 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -1178,10 +1178,40 @@ impl Catalog for HadoopCatalog { return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); } } - } else { + } else if self.hdfs_native_client.is_some() { + let hdfs_native_client = self.hdfs_native_client.as_ref().unwrap(); + let default_fs = + self.config + .properties + .get(FS_DEFAULTFS) + .ok_or(iceberg::Error::new( + ErrorKind::DataInvalid, + " fs.defaultFS is null", + ))?; + + match self.config.warehouse.clone() { + Some(warehouse_url) => { + let table_name = table.name.clone(); + let table_path = format!( + "{}/{}/{}", + &warehouse_url[default_fs.len()..].to_string(), + table.namespace.join("/"), + &table_name, + ); + + hdfs_native_client + .delete(&table_path, true) + .await + .map_err(|e| iceberg::Error::new(ErrorKind::Unexpected, e.to_string()))?; + } + None => { + return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); + } + } + }else { return Err(Error::new( ErrorKind::DataInvalid, - "s3 client is not initialized", + "s3 client or hdfs native client is not initialized", )); } @@ -1232,7 +1262,7 @@ impl Catalog for HadoopCatalog { } else { return Err(Error::new( ErrorKind::DataInvalid, - "s3 client is not initialized", + "s3 client or hdfs native client is not initialized", )); } From 29b4f3cb0f3373ce4dbdb503e2833347fa4f8847 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Fri, 9 May 2025 16:35:22 +0800 Subject: [PATCH 22/24] feat:Add hadoop catalog mode(hdfs_native table_exists) --- crates/catalog/hadoop/src/catalog.rs | 31 +++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index 15d076beb..3e0c28def 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -1208,7 +1208,7 @@ impl Catalog for HadoopCatalog { return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); } } - }else { + } else { return Err(Error::new( ErrorKind::DataInvalid, "s3 client or hdfs native client is not initialized", @@ -1259,6 +1259,35 @@ impl Catalog for HadoopCatalog { return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); } } + } else if self.hdfs_native_client.is_some() { + let hdfs_native_client = self.hdfs_native_client.as_ref().unwrap(); + let default_fs = + self.config + .properties + .get(FS_DEFAULTFS) + .ok_or(iceberg::Error::new( + ErrorKind::DataInvalid, + " fs.defaultFS is null", + ))?; + + match self.config.warehouse.clone() { + Some(warehouse_url) => { + let table_name = table.name.clone(); + let table_version_hint_path = format!( + "{}/{}/{}", + &warehouse_url[default_fs.len()..].to_string(), + table.namespace.join("/"), + &table_name, + ); + hdfs_native_client + .get_file_info(&table_version_hint_path) + .await + .map_err(|e| iceberg::Error::new(ErrorKind::Unexpected, e.to_string()))?; + } + None => { + return Err(Error::new(ErrorKind::DataInvalid, "warehouse is required")); + } + } } else { return Err(Error::new( ErrorKind::DataInvalid, From 6ee7136faf8541e52bb4910a8894e6cbf7d8fd0a Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Mon, 12 May 2025 10:38:05 +0800 Subject: [PATCH 23/24] feat:Add hadoop catalog mode(hdfs_native create table) --- crates/catalog/hadoop/src/catalog.rs | 11 ++++++----- crates/iceberg/src/io/storage.rs | 4 ++-- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index 3e0c28def..0b86f6736 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -239,13 +239,13 @@ impl Catalog for HadoopCatalog { .map_err(|e| iceberg::Error::new(ErrorKind::Unexpected, e.to_string()))?; for f in list { let table_version_hint_path = - format!("{}/metadata/version-hint.text", f.path.clone(),); + format!("{}/metadata/version-hint.text", f.path.replace("\\","/").clone(),); if hdfs_native_client .get_file_info(&table_version_hint_path) .await .is_err() { - let file_relative_path = f.path[prefix.len()..].to_string(); + let file_relative_path = f.path.replace("\\","/")[prefix.len()..].to_string(); let namespaces = file_relative_path.split("/").collect::>(); result.push(NamespaceIdent::from_vec( namespaces @@ -810,19 +810,20 @@ impl Catalog for HadoopCatalog { ); let list = hdfs_native_client - .list_status(&prefix, true) + .list_status(&prefix, false) .await .map_err(|e| iceberg::Error::new(ErrorKind::Unexpected, e.to_string()))?; let mut result = Vec::new(); for f in list { let table_version_hint_path = - format!("{}/metadata/version-hint.text", f.path.clone(),); + format!("{}/metadata/version-hint.text", f.path.replace("\\","/").clone(),); + println!("table_version_hint_path: {}", &table_version_hint_path); if hdfs_native_client .get_file_info(&table_version_hint_path) .await .is_ok() { - let file_relative_path = f.path[prefix.len()..].to_string(); + let file_relative_path = f.path.replace("\\","/")[prefix.len()..].to_string(); let table_name = file_relative_path.split("/").last().unwrap_or(&""); result.push(TableIdent::new(namespace.clone(), table_name.to_string())); } diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index e1933b321..d5be62675 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -174,8 +174,8 @@ impl Storage { Storage::HdfsNative { config } => { let op = super::hdfs_native_config_build(config)?; - // Check prefix of oss path. - let prefix = format!("hdfs://{}/", op.info().name()); + // Check prefix of hdfs path. + let prefix = config.name_node.clone().unwrap_or_default(); if path.starts_with(&prefix) { Ok((op, &path[prefix.len()..])) } else { From a9ea6eac88b2034c3e9b37b803cc943cf544c851 Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Mon, 12 May 2025 13:26:40 +0800 Subject: [PATCH 24/24] feat:Add hadoop catalog mode(hdfs_native load table) --- crates/catalog/hadoop/src/catalog.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/hadoop/src/catalog.rs b/crates/catalog/hadoop/src/catalog.rs index 0b86f6736..c8f0ba4e5 100644 --- a/crates/catalog/hadoop/src/catalog.rs +++ b/crates/catalog/hadoop/src/catalog.rs @@ -1083,7 +1083,7 @@ impl Catalog for HadoopCatalog { let metadata_location = format!( "{}/{}/{}/metadata/v{}.metadata.json", - &warehouse_url[default_fs.len()..].to_string(), + &warehouse_url, table_ident.namespace.join("/"), &table_name, &table_version_hint