diff --git a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java index c4904ddc3..9d3c41ee6 100644 --- a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java +++ b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java @@ -25,6 +25,7 @@ import com.logicalclocks.hsfs.OnlineConfig; import com.logicalclocks.hsfs.StatisticsConfig; import com.logicalclocks.hsfs.Storage; +import com.logicalclocks.hsfs.StorageConnector; import com.logicalclocks.hsfs.beam.engine.FeatureGroupEngine; import com.logicalclocks.hsfs.beam.engine.BeamProducer; import com.logicalclocks.hsfs.constructor.QueryBase; @@ -48,7 +49,7 @@ public class StreamFeatureGroup extends FeatureGroupBase> { public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description, List primaryKeys, List partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, List features, StatisticsConfig statisticsConfig, String onlineTopicName, - String eventTime, OnlineConfig onlineConfig) { + String eventTime, OnlineConfig onlineConfig, StorageConnector storageConnector, String path) { this(); this.featureStore = featureStore; this.name = name; @@ -65,6 +66,8 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ this.onlineTopicName = onlineTopicName; this.eventTime = eventTime; this.onlineConfig = onlineConfig; + this.storageConnector = storageConnector; + this.path = path; } public StreamFeatureGroup() { diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java index d13a9c8e1..c3cd6cbd0 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java @@ -26,6 +26,7 @@ import com.logicalclocks.hsfs.OnlineConfig; import com.logicalclocks.hsfs.StatisticsConfig; import com.logicalclocks.hsfs.Storage; +import com.logicalclocks.hsfs.StorageConnector; import com.logicalclocks.hsfs.constructor.QueryBase; import com.logicalclocks.hsfs.metadata.Statistics; @@ -54,7 +55,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ List primaryKeys, List partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, List features, StatisticsConfig statisticsConfig, String onlineTopicName, String topicName, String notificationTopicName, String eventTime, - OnlineConfig onlineConfig) { + OnlineConfig onlineConfig, StorageConnector storageConnector, String path) { this(); this.featureStore = featureStore; this.name = name; @@ -73,6 +74,8 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ this.notificationTopicName = notificationTopicName; this.eventTime = eventTime; this.onlineConfig = onlineConfig; + this.storageConnector = storageConnector; + this.path = path; } public StreamFeatureGroup() { diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java index 585f4780e..61ca2f5a9 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java @@ -132,6 +132,14 @@ public abstract class FeatureGroupBase { @Setter protected OnlineConfig onlineConfig; + @Getter + @Setter + protected StorageConnector storageConnector; + + @Getter + @Setter + protected String path; + @JsonIgnore // These are only used in the client. In the server they are aggregated in the `features` field protected List partitionKeys; diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java index 71be4d995..91bd36819 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java @@ -124,6 +124,9 @@ public static class S3Connector extends StorageConnector { @Getter @Setter protected String bucket; + @Getter @Setter + protected String region; + @Getter @Setter protected String sessionToken; diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java index da4bf655a..a937cca1a 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java @@ -128,7 +128,7 @@ public static Long getTimeStampFromDateString(String inputDate) throws FeatureSt } SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatPattern); - Long commitTimeStamp = dateFormat.parse(tempDate).getTime();; + Long commitTimeStamp = dateFormat.parse(tempDate).getTime(); return commitTimeStamp; } diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java index 13f907ff4..e62a5e83e 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java @@ -57,10 +57,6 @@ @JsonIgnoreProperties(ignoreUnknown = true) public class ExternalFeatureGroup extends FeatureGroupBase> { - @Getter - @Setter - private StorageConnector storageConnector; - @Getter @Setter private String query; @@ -69,10 +65,6 @@ public class ExternalFeatureGroup extends FeatureGroupBase> { @Setter private ExternalDataFormat dataFormat; - @Getter - @Setter - private String path; - @Getter @Setter private List options; diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java index 9c968b3ba..07fe98ff6 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java @@ -31,6 +31,7 @@ import com.logicalclocks.hsfs.OnlineConfig; import com.logicalclocks.hsfs.StatisticsConfig; import com.logicalclocks.hsfs.Storage; +import com.logicalclocks.hsfs.StorageConnector; import com.logicalclocks.hsfs.TimeTravelFormat; import com.logicalclocks.hsfs.FeatureGroupBase; import com.logicalclocks.hsfs.metadata.Statistics; @@ -64,7 +65,8 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver String description, List primaryKeys, List partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, TimeTravelFormat timeTravelFormat, List features, StatisticsConfig statisticsConfig, String onlineTopicName, - String topicName, String notificationTopicName, String eventTime, OnlineConfig onlineConfig) { + String topicName, String notificationTopicName, String eventTime, OnlineConfig onlineConfig, + StorageConnector storageConnector, String path) { this(); this.featureStore = featureStore; this.name = name; @@ -85,6 +87,8 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver this.notificationTopicName = notificationTopicName; this.eventTime = eventTime; this.onlineConfig = onlineConfig; + this.storageConnector = storageConnector; + this.path = path; } public FeatureGroup() { diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java index 597e6e3ad..0c8b9bae3 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java @@ -30,6 +30,7 @@ import com.logicalclocks.hsfs.OnlineConfig; import com.logicalclocks.hsfs.StatisticsConfig; import com.logicalclocks.hsfs.Storage; +import com.logicalclocks.hsfs.StorageConnector; import com.logicalclocks.hsfs.FeatureGroupBase; import com.logicalclocks.hsfs.metadata.Statistics; @@ -62,7 +63,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ List primaryKeys, List partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, List features, StatisticsConfig statisticsConfig, String onlineTopicName, String topicName, String notificationTopicName, String eventTime, - OnlineConfig onlineConfig) { + OnlineConfig onlineConfig, StorageConnector storageConnector, String path) { this(); this.featureStore = featureStore; this.name = name; @@ -81,6 +82,8 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ this.notificationTopicName = notificationTopicName; this.eventTime = eventTime; this.onlineConfig = onlineConfig; + this.storageConnector = storageConnector; + this.path = path; } public StreamFeatureGroup() { diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java index b30bca98c..1aed9a650 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java @@ -219,10 +219,6 @@ public Dataset registerOnDemandTemporaryTable(ExternalFeatureGroup onDemand ? onDemandFeatureGroup.getDataFormat().toString() : null, getOnDemandOptions(onDemandFeatureGroup), onDemandFeatureGroup.getStorageConnector().getPath(onDemandFeatureGroup.getPath())); - if (!Strings.isNullOrEmpty(onDemandFeatureGroup.getLocation())) { - sparkSession.sparkContext().textFile(onDemandFeatureGroup.getLocation(), 0).collect(); - } - dataset.createOrReplaceTempView(alias); return dataset; } diff --git a/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java b/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java index adddd8310..bedd9716e 100644 --- a/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java +++ b/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java @@ -67,11 +67,11 @@ public void testFeatureGroupPrimaryKey() { StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description", Collections.singletonList("primaryKey"), Collections.singletonList("partitionKey"), "hudiPrecombineKey", - true, features, null, "onlineTopicName", null, null, null, null); + true, features, null, "onlineTopicName", null, null, null, null, null, null); Exception pkException = assertThrows(FeatureStoreException.class, () -> { featureGroupEngine.saveFeatureGroupMetaData(featureGroup, - null, null, null, null, null);;; + null, null, null, null, null); }); // Assert @@ -93,11 +93,11 @@ public void testFeatureGroupEventTimeFeature() { StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description", Collections.singletonList("featureA"), null, null, - true, features, null, "onlineTopicName", null, null, "eventTime", null); + true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null); Exception eventTimeException = assertThrows(FeatureStoreException.class, () -> { streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup, - null, null, null, null, null);;; + null, null, null, null, null); }); // Assert @@ -119,7 +119,7 @@ public void testFeatureGroupPartitionPrecombineKeys() { StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description", Collections.singletonList("featureA"), Collections.singletonList("partitionKey"), "hudiPrecombineKey", - true, features, null, "onlineTopicName", null, null, null, null); + true, features, null, "onlineTopicName", null, null, null, null, null, null); Exception partitionException = assertThrows(FeatureStoreException.class, () -> { streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup, @@ -164,7 +164,7 @@ public void testFeatureGroupAppendFeaturesResetSubject() throws FeatureStoreExce StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description", Collections.singletonList("featureA"), null, null, - true, features, null, "onlineTopicName", null, null, "eventTime", null); + true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null); featureGroup.featureGroupEngine = featureGroupEngine; // Act diff --git a/python/hsfs/core/delta_engine.py b/python/hsfs/core/delta_engine.py index 95ea7f37b..0032daddf 100644 --- a/python/hsfs/core/delta_engine.py +++ b/python/hsfs/core/delta_engine.py @@ -52,10 +52,12 @@ def save_delta_fg(self, dataset, write_options, validation_id=None): return self._feature_group_api.commit(self._feature_group, fg_commit) def register_temporary_table(self, delta_fg_alias, read_options): + location = self._feature_group.prepare_spark_location() + delta_options = self._setup_delta_read_opts(delta_fg_alias, read_options) self._spark_session.read.format(self.DELTA_SPARK_FORMAT).options( **delta_options - ).load(self._feature_group.location).createOrReplaceTempView( + ).load(location).createOrReplaceTempView( delta_fg_alias.alias ) @@ -85,15 +87,17 @@ def _setup_delta_read_opts(self, delta_fg_alias, read_options): return delta_options def delete_record(self, delete_df): + location = self._feature_group.prepare_spark_location() + if not DeltaTable.isDeltaTable( - self._spark_session, self._feature_group.location + self._spark_session, location ): raise FeatureStoreException( f"This is no data available in Feature group {self._feature_group.name}, or it not DELTA enabled " ) else: fg_source_table = DeltaTable.forPath( - self._spark_session, self._feature_group.location + self._spark_session, location ) source_alias = ( @@ -109,16 +113,18 @@ def delete_record(self, delete_df): ).whenMatchedDelete().execute() fg_commit = self._get_last_commit_metadata( - self._spark_session, self._feature_group.location + self._spark_session, location ) return self._feature_group_api.commit(self._feature_group, fg_commit) def _write_delta_dataset(self, dataset, write_options): + location = self._feature_group.prepare_spark_location() + if write_options is None: write_options = {} if not DeltaTable.isDeltaTable( - self._spark_session, self._feature_group.location + self._spark_session, location ): ( dataset.write.format(DeltaEngine.DELTA_SPARK_FORMAT) @@ -129,11 +135,11 @@ def _write_delta_dataset(self, dataset, write_options): else [] ) .mode("append") - .save(self._feature_group.location) + .save(location) ) else: fg_source_table = DeltaTable.forPath( - self._spark_session, self._feature_group.location + self._spark_session, location ) source_alias = ( @@ -149,9 +155,18 @@ def _write_delta_dataset(self, dataset, write_options): ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute() return self._get_last_commit_metadata( - self._spark_session, self._feature_group.location + self._spark_session, location ) + def vacuum(self, retention_hours): + location = self._feature_group.prepare_spark_location() + + delta_table = DeltaTable.forPath(self._spark_session, location) + + # Vacuum the table + # https://docs.delta.io/1.0.1/api/python/index.html#delta.tables.DeltaTable.vacuum + delta_table.vacuum(retention_hours) + def _generate_merge_query(self, source_alias, updates_alias): merge_query_list = [] primary_key = self._feature_group.primary_key diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index a06d12ca9..234829879 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -246,6 +246,20 @@ def commit_delete(feature_group, delete_df, write_options): ) return hudi_engine_instance.delete_record(delete_df, write_options) + @staticmethod + def clean(feature_group, write_options): + if feature_group.time_travel_format == "DELTA": + delta_engine_instance = delta_engine.DeltaEngine( + feature_group.feature_store_id, + feature_group.feature_store_name, + feature_group, + engine.get_instance()._spark_session, + engine.get_instance()._spark_context, + ) + return delta_engine_instance.vacuum(write_options.get("retention_hours", None)) + else: + return None + def sql(self, query, feature_store_name, dataframe_type, online, read_options): if online and self._online_conn is None: self._online_conn = self._storage_connector_api.get_online_connector( @@ -285,7 +299,7 @@ def append_features(self, feature_group, new_features): if feature_group.time_travel_format == "DELTA": engine.get_instance().add_cols_to_delta_table(feature_group, new_features) else: - engine.get_instance().save_empty_dataframe(feature_group) + engine.get_instance().save_empty_dataframe(feature_group, new_features=new_features) def update_description(self, feature_group, description): """Updates the description of a feature group.""" diff --git a/python/hsfs/core/hudi_engine.py b/python/hsfs/core/hudi_engine.py index 04c2c5a6b..4492f0a19 100644 --- a/python/hsfs/core/hudi_engine.py +++ b/python/hsfs/core/hudi_engine.py @@ -20,6 +20,10 @@ class HudiEngine: + + HUDI_SPEC_FEATURE_NAMES = ["_hoodie_record_key", "_hoodie_partition_path", + "_hoodie_commit_time", "_hoodie_file_name", "_hoodie_commit_seqno"] + HUDI_SPARK_FORMAT = "org.apache.hudi" HUDI_TABLE_NAME = "hoodie.table.name" HUDI_TABLE_STORAGE_TYPE = "hoodie.datasource.write.storage.type" @@ -100,21 +104,25 @@ def delete_record(self, delete_df, write_options): return self._feature_group_api.commit(self._feature_group, fg_commit) def register_temporary_table(self, hudi_fg_alias, read_options): + location = self._feature_group.prepare_spark_location() + hudi_options = self._setup_hudi_read_opts(hudi_fg_alias, read_options) self._spark_session.read.format(self.HUDI_SPARK_FORMAT).options( **hudi_options - ).load(self._feature_group.location).createOrReplaceTempView( + ).load(location).createOrReplaceTempView( hudi_fg_alias.alias ) def _write_hudi_dataset(self, dataset, save_mode, operation, write_options): + location = self._feature_group.prepare_spark_location() + hudi_options = self._setup_hudi_write_opts(operation, write_options) dataset.write.format(HudiEngine.HUDI_SPARK_FORMAT).options(**hudi_options).mode( save_mode - ).save(self._feature_group.location) + ).save(location) feature_group_commit = self._get_last_commit_metadata( - self._spark_context, self._feature_group.location + self._spark_context, location ) return feature_group_commit @@ -144,6 +152,9 @@ def _setup_hudi_write_opts(self, operation, write_options): else self._feature_group.primary_key[0] ) + # dont enable hive sync when using managed FG + hive_sync = self._feature_group.storage_connector is None + hudi_options = { self.HUDI_KEY_GENERATOR_OPT_KEY: self.HUDI_COMPLEX_KEY_GENERATOR_OPT_VAL, self.HUDI_PRECOMBINE_FIELD: pre_combine_key, @@ -153,7 +164,7 @@ def _setup_hudi_write_opts(self, operation, write_options): self.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY: self.DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL if len(partition_key) >= 1 else self.HIVE_NON_PARTITION_EXTRACTOR_CLASS_OPT_VAL, - self.HUDI_HIVE_SYNC_ENABLE: "true", + self.HUDI_HIVE_SYNC_ENABLE: str(hive_sync).lower(), self.HUDI_HIVE_SYNC_MODE: self.HUDI_HIVE_SYNC_MODE_VAL, self.HUDI_HIVE_SYNC_DB: self._feature_store_name, self.HUDI_HIVE_SYNC_TABLE: table_name, @@ -226,9 +237,8 @@ def _setup_hudi_read_opts(self, hudi_fg_alias, read_options): def reconcile_hudi_schema( self, save_empty_dataframe_callback, hudi_fg_alias, read_options ): - fg_table_name = hudi_fg_alias.feature_group._get_table_name() if sorted(self._spark_session.table(hudi_fg_alias.alias).columns) != sorted( - self._spark_session.table(fg_table_name).columns + [feature.name for feature in hudi_fg_alias.feature_group._features] + self.HUDI_SPEC_FEATURE_NAMES ): full_fg = self._feature_group_api.get( feature_store_id=hudi_fg_alias.feature_group._feature_store_id, diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 96c4f9ecc..512803ebd 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -481,6 +481,12 @@ def register_external_temporary_table( # No op to avoid query failure pass + def register_delta_temporary_table( + self, delta_fg_alias, feature_store_id, feature_store_name, read_options + ): + # No op to avoid query failure + pass + def register_hudi_temporary_table( self, hudi_fg_alias: "hsfs.constructor.hudi_feature_group_alias.HudiFeatureGroupAlias", @@ -1203,7 +1209,7 @@ def save_stream_dataframe( ) def save_empty_dataframe( - self, feature_group: Union[FeatureGroup, ExternalFeatureGroup] + self, feature_group: Union[FeatureGroup, ExternalFeatureGroup], new_features=None ) -> None: """Wrapper around save_dataframe in order to provide no-op.""" pass diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 53ea4dc0f..85071df22 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -193,12 +193,10 @@ def register_external_temporary_table(self, external_fg, alias): external_fg.query, external_fg.data_format, external_fg.options, - external_fg.storage_connector._get_path(external_fg.path), + external_fg.prepare_spark_location(), ) else: external_dataset = external_fg.dataframe - if external_fg.location: - self._spark_session.sparkContext.textFile(external_fg.location).collect() external_dataset.createOrReplaceTempView(alias) return external_dataset @@ -213,10 +211,12 @@ def register_hudi_temporary_table( self._spark_context, self._spark_session, ) + hudi_engine_instance.register_temporary_table( hudi_fg_alias, read_options, ) + hudi_engine_instance.reconcile_hudi_schema( self.save_empty_dataframe, hudi_fg_alias, read_options ) @@ -1196,7 +1196,7 @@ def _setup_s3_hadoop_conf(self, storage_connector, path): self._set_s3_hadoop_conf( storage_connector, f"fs.s3a.bucket.{storage_connector.bucket}" ) - return path.replace("s3", "s3a", 1) if path is not None else None + return path.replace("s3://", "s3a://", 1) if path is not None else None def _set_s3_hadoop_conf(self, storage_connector, prefix): if storage_connector.access_key: @@ -1247,13 +1247,22 @@ def is_spark_dataframe(self, dataframe): return True return False - def save_empty_dataframe(self, feature_group): - fg_table_name = feature_group._get_table_name() - dataframe = self._spark_session.table(fg_table_name).limit(0) + def save_empty_dataframe(self, feature_group, new_features=None): + location = feature_group.prepare_spark_location() + + dataframe = self._spark_session.read.format("hudi").load(location) + + if (new_features is not None): + if isinstance(new_features, list): + for new_feature in new_features: + dataframe = dataframe.withColumn(new_feature.name, lit(None).cast(new_feature.type)) + else: + dataframe = dataframe.withColumn(new_features.name, lit(None).cast(new_features.type)) + self.save_dataframe( feature_group, - dataframe, + dataframe.limit(0), "upsert", feature_group.online_enabled, "offline", @@ -1262,20 +1271,22 @@ def save_empty_dataframe(self, feature_group): ) def add_cols_to_delta_table(self, feature_group, new_features): - new_features_map = {} - if isinstance(new_features, list): - for new_feature in new_features: - new_features_map[new_feature.name] = lit("").cast(new_feature.type) - else: - new_features_map[new_features.name] = lit("").cast(new_features.type) + location = feature_group.prepare_spark_location() + + dataframe = self._spark_session.read.format("delta").load(location) + + if (new_features is not None): + if isinstance(new_features, list): + for new_feature in new_features: + dataframe = dataframe.withColumn(new_feature.name, lit("").cast(new_feature.type)) + else: + dataframe = dataframe.withColumn(new_features.name, lit("").cast(new_features.type)) - self._spark_session.read.format("delta").load( - feature_group.location - ).withColumns(new_features_map).limit(0).write.format("delta").mode( + dataframe.limit(0).write.format("delta").mode( "append" ).option("mergeSchema", "true").option( "spark.databricks.delta.schema.autoMerge.enabled", "true" - ).save(feature_group.location) + ).save(location) def _apply_transformation_function( self, diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index c3407ea2f..f47f1d962 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -140,6 +140,8 @@ def __init__( Dict[str, Any], ] ] = None, + storage_connector: Union[sc.StorageConnector, Dict[str, Any]] = None, + path: Optional[str] = None, **kwargs, ) -> None: self._version = version @@ -156,6 +158,14 @@ def __init__( self._feature_store_id = featurestore_id self._feature_store = None self._variable_api: VariableApi = VariableApi() + self._path = path + + if storage_connector is not None and isinstance(storage_connector, dict): + self._storage_connector = sc.StorageConnector.from_response_json( + storage_connector + ) + else: + self._storage_connector: "sc.StorageConnector" = storage_connector self._online_config = ( OnlineConfig.from_response_json(online_config) @@ -2049,6 +2059,20 @@ def online_enabled(self) -> bool: def online_enabled(self, online_enabled: bool) -> None: self._online_enabled = online_enabled + @property + def path(self) -> Optional[str]: + return self._path + + @property + def storage_connector(self) -> "sc.StorageConnector": + return self._storage_connector + + def prepare_spark_location(self) -> str: + location = self.location + if (self.storage_connector is not None): + location = self.storage_connector.prepare_spark(location) + return location + @property def topic_name(self) -> Optional[str]: """The topic used for feature group data ingestion.""" @@ -2235,6 +2259,8 @@ def __init__( ] ] = None, offline_backfill_every_hr: Optional[Union[str, int]] = None, + storage_connector: Union[sc.StorageConnector, Dict[str, Any]] = None, + path: Optional[str] = None, **kwargs, ) -> None: super().__init__( @@ -2252,6 +2278,8 @@ def __init__( notification_topic_name=notification_topic_name, deprecated=deprecated, online_config=online_config, + storage_connector=storage_connector, + path=path, ) self._feature_store_name: Optional[str] = featurestore_name self._description: Optional[str] = description @@ -3249,6 +3277,30 @@ def commit_delete_record( """ self._feature_group_engine.commit_delete(self, delete_df, write_options or {}) + def clean( + self, + write_options: Optional[Dict[Any, Any]] = None, + ) -> None: + """ Clean up old files. This method can only be used on feature groups stored as DELTA. + + !!! example + ```python + # connect to the Feature Store + fs = ... + + # get the Feature Group instance + fg = fs.get_or_create_feature_group(...) + + commit_details = fg.clean(write_options = {"retention_hours": 100}) + + # Arguments + write_options: User provided write options. Defaults to `{}`. + + # Raises + `hsfs.client.exceptions.RestAPIError`. + """ + self._feature_group_engine.clean(self, write_options or {}) + def as_of( self, wallclock_time: Optional[Union[str, int, datetime, date]] = None, @@ -3557,6 +3609,7 @@ def to_dict(self) -> Dict[str, Any]: "transformationFunctions": [ tf.to_dict() for tf in self._transformation_functions ], + "path": self._path, } if self._online_config: fg_meta_dict["onlineConfig"] = self._online_config.to_dict() @@ -3564,6 +3617,8 @@ def to_dict(self) -> Dict[str, Any]: fg_meta_dict["embeddingIndex"] = self.embedding_index.to_dict() if self._stream: fg_meta_dict["deltaStreamerJobConf"] = self._deltastreamer_jobconf + if self._storage_connector: + fg_meta_dict["storageConnector"] = self._storage_connector.to_dict() return fg_meta_dict def _get_table_name(self) -> str: @@ -3801,6 +3856,8 @@ def __init__( notification_topic_name=notification_topic_name, deprecated=deprecated, online_config=online_config, + storage_connector=storage_connector, + path=path, ) self._feature_store_name = featurestore_name @@ -3809,7 +3866,6 @@ def __init__( self._creator = user.User.from_response_json(creator) self._query = query self._data_format = data_format.upper() if data_format else None - self._path = path self._features = [ feature.Feature.from_response_json(feat) if isinstance(feat, dict) else feat @@ -3850,12 +3906,6 @@ def __init__( self._features = features self._options = options or {} - if storage_connector is not None and isinstance(storage_connector, dict): - self._storage_connector = sc.StorageConnector.from_response_json( - storage_connector - ) - else: - self._storage_connector: "sc.StorageConnector" = storage_connector self._vector_db_client: Optional["VectorDbClient"] = None self._href: Optional[str] = href @@ -4245,18 +4295,10 @@ def query(self) -> Optional[str]: def data_format(self) -> Optional[str]: return self._data_format - @property - def path(self) -> Optional[str]: - return self._path - @property def options(self) -> Optional[Dict[str, Any]]: return self._options - @property - def storage_connector(self) -> "sc.StorageConnector": - return self._storage_connector - @property def creator(self) -> Optional["user.User"]: return self._creator diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py index 2a384c961..d5c85cc52 100644 --- a/python/hsfs/feature_store.py +++ b/python/hsfs/feature_store.py @@ -524,6 +524,8 @@ def create_feature_group( ] ] = None, offline_backfill_every_hr: Optional[Union[int, str]] = None, + storage_connector: Union[storage_connector.StorageConnector, Dict[str, Any]] = None, + path: Optional[str] = None, ) -> feature_group.FeatureGroup: """Create a feature group metadata object. @@ -628,6 +630,10 @@ def plus_two(value): periodically. The value can be either an integer representing the number of hours between each run or a string representing a cron expression. Set the value to None to avoid scheduling the materialization job. Defaults to None (i.e no scheduling). + storage_connector: the storage connector to use to establish connectivity + with the data source. + path: The location within the scope of the storage connector, from where to read + the data for the external feature group # Returns `FeatureGroup`. The feature group metadata object. @@ -655,6 +661,8 @@ def plus_two(value): transformation_functions=transformation_functions, online_config=online_config, offline_backfill_every_hr=offline_backfill_every_hr, + storage_connector=storage_connector, + path=path, ) feature_group_object.feature_store = self return feature_group_object @@ -689,6 +697,8 @@ def get_or_create_feature_group( ] = None, online_config: Optional[Union[OnlineConfig, Dict[str, Any]]] = None, offline_backfill_every_hr: Optional[Union[int, str]] = None, + storage_connector: Union[storage_connector.StorageConnector, Dict[str, Any]] = None, + path: Optional[str] = None, ) -> Union[ feature_group.FeatureGroup, feature_group.ExternalFeatureGroup, @@ -783,6 +793,10 @@ def get_or_create_feature_group( periodically. The value can be either an integer representing the number of hours between each run or a string representing a cron expression. Set the value to None to avoid scheduling the materialization job. Defaults to None (i.e no automatic scheduling). Applies only on Feature Group creation. + storage_connector: the storage connector to use to establish connectivity + with the data source. + path: The location within the scope of the storage connector, from where to read + the data for the external feature group # Returns `FeatureGroup`. The feature group metadata object. @@ -819,6 +833,8 @@ def get_or_create_feature_group( transformation_functions=transformation_functions, online_config=online_config, offline_backfill_every_hr=offline_backfill_every_hr, + storage_connector=storage_connector, + path=path, ) feature_group_object.feature_store = self return feature_group_object diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index 3618e2e68..c7ba70dc2 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -134,6 +134,12 @@ def description(self) -> Optional[str]: def spark_options(self) -> None: pass + def prepare_spark(self, path: Optional[str] = None) -> Optional[str]: + _logger.info( + "This Storage Connector cannot be prepare for Spark." + ) + return path + def read( self, query: Optional[str] = None, @@ -270,6 +276,7 @@ def __init__( server_encryption_algorithm: Optional[str] = None, server_encryption_key: Optional[str] = None, bucket: Optional[str] = None, + region: Optional[str] = None, session_token: Optional[str] = None, iam_role: Optional[str] = None, arguments: Optional[Dict[str, Any]] = None, @@ -283,6 +290,7 @@ def __init__( self._server_encryption_algorithm = server_encryption_algorithm self._server_encryption_key = server_encryption_key self._bucket = bucket + self._region = region self._session_token = session_token self._iam_role = iam_role self._arguments = ( @@ -314,6 +322,11 @@ def bucket(self) -> Optional[str]: """Return the bucket for S3 connectors.""" return self._bucket + @property + def region(self) -> Optional[str]: + """Return the region for S3 connectors.""" + return self._region + @property def session_token(self) -> Optional[str]: """Session token.""" @@ -393,7 +406,7 @@ def read( if options is not None else self.spark_options() ) - if not path.startswith("s3://"): + if not path.startswith(("s3://", "s3a://")): path = self._get_path(path) print( "Prepending default bucket specified on connector, final path: {}".format( diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index 3c7fa999c..488232dee 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -174,43 +174,6 @@ def test_register_external_temporary_table(self, mocker): # Arrange mocker.patch("hopsworks_common.client.get_instance") mock_sc_read = mocker.patch("hsfs.storage_connector.JdbcConnector.read") - mock_pyspark_getOrCreate = mocker.patch( - "pyspark.sql.session.SparkSession.builder.getOrCreate" - ) - - spark_engine = spark.Engine() - - jdbc_connector = storage_connector.JdbcConnector( - id=1, - name="test_connector", - featurestore_id=1, - connection_string="", - arguments="", - ) - - external_fg = feature_group.ExternalFeatureGroup( - storage_connector=jdbc_connector, id=10 - ) - - # Act - spark_engine.register_external_temporary_table( - external_fg=external_fg, - alias=None, - ) - - # Assert - assert ( - mock_pyspark_getOrCreate.return_value.sparkContext.textFile.call_count == 0 - ) - assert mock_sc_read.return_value.createOrReplaceTempView.call_count == 1 - - def test_register_external_temporary_table_external_fg_location(self, mocker): - # Arrange - mocker.patch("hopsworks_common.client.get_instance") - mock_sc_read = mocker.patch("hsfs.storage_connector.JdbcConnector.read") - mock_pyspark_getOrCreate = mocker.patch( - "pyspark.sql.session.SparkSession.builder.getOrCreate" - ) spark_engine = spark.Engine() @@ -233,9 +196,6 @@ def test_register_external_temporary_table_external_fg_location(self, mocker): ) # Assert - assert ( - mock_pyspark_getOrCreate.return_value.sparkContext.textFile.call_count == 1 - ) assert mock_sc_read.return_value.createOrReplaceTempView.call_count == 1 def test_register_hudi_temporary_table(self, mocker): @@ -4396,11 +4356,11 @@ def test_setup_s3_hadoop_conf_legacy(self, mocker): # Act result = spark_engine._setup_s3_hadoop_conf( storage_connector=s3_connector, - path="s3_test_path", + path="s3://_test_path", ) # Assert - assert result == "s3a_test_path" + assert result == "s3a://_test_path" assert ( mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.call_count == 14 @@ -4453,11 +4413,11 @@ def test_setup_s3_hadoop_conf_bucket_scope(self, mocker): # Act result = spark_engine._setup_s3_hadoop_conf( storage_connector=s3_connector, - path="s3_test_path", + path="s3://_test_path", ) # Assert - assert result == "s3a_test_path" + assert result == "s3a://_test_path" assert ( mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.call_count == 14 @@ -4559,7 +4519,9 @@ def test_save_empty_dataframe(self, mocker): mock_spark_engine_save_dataframe = mocker.patch( "hsfs.engine.spark.Engine.save_dataframe" ) - mock_spark_table = mocker.patch("pyspark.sql.session.SparkSession.table") + mock_spark_read = mocker.patch("pyspark.sql.SparkSession.read") + mock_format = mocker.Mock() + mock_spark_read.format.return_value = mock_format # Arrange spark_engine = spark.Engine() @@ -4579,7 +4541,7 @@ def test_save_empty_dataframe(self, mocker): # Assert assert mock_spark_engine_save_dataframe.call_count == 1 - assert mock_spark_table.call_count == 1 + assert mock_spark_read.format.call_count == 1 def test_apply_transformation_function_single_output(self, mocker): # Arrange diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index 5af468873..4e73f1604 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -247,6 +247,18 @@ def run_feature_monitoring(job_conf: Dict[str, str]) -> None: raise e +def clean_fg(spark: SparkSession, job_conf: Dict[Any, Any]) -> None: + """ + Run clean on a feature group. + """ + feature_store = job_conf.pop("feature_store") + fs = get_feature_store_handle(feature_store) + + entity = fs.get_feature_group(name=job_conf["name"], version=job_conf["version"]) + + entity.clean() + + if __name__ == "__main__": # Setup spark first so it fails faster in case of args errors # Otherwise the resource manager will wait until the spark application master @@ -265,6 +277,7 @@ def run_feature_monitoring(job_conf: Dict[str, str]) -> None: "ge_validate", "import_fg", "run_feature_monitoring", + "clean_fg", ], help="Operation type", ) @@ -303,6 +316,8 @@ def parse_isoformat_date(da: str) -> datetime: import_fg(job_conf) elif args.op == "run_feature_monitoring": run_feature_monitoring(job_conf) + elif args.op == "clean_fg": + clean_fg(spark, job_conf) success = True except Exception: