diff --git a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java index 9d3c41ee6..c4904ddc3 100644 --- a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java +++ b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java @@ -25,7 +25,6 @@ import com.logicalclocks.hsfs.OnlineConfig; import com.logicalclocks.hsfs.StatisticsConfig; import com.logicalclocks.hsfs.Storage; -import com.logicalclocks.hsfs.StorageConnector; import com.logicalclocks.hsfs.beam.engine.FeatureGroupEngine; import com.logicalclocks.hsfs.beam.engine.BeamProducer; import com.logicalclocks.hsfs.constructor.QueryBase; @@ -49,7 +48,7 @@ public class StreamFeatureGroup extends FeatureGroupBase> { public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description, List primaryKeys, List partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, List features, StatisticsConfig statisticsConfig, String onlineTopicName, - String eventTime, OnlineConfig onlineConfig, StorageConnector storageConnector, String path) { + String eventTime, OnlineConfig onlineConfig) { this(); this.featureStore = featureStore; this.name = name; @@ -66,8 +65,6 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ this.onlineTopicName = onlineTopicName; this.eventTime = eventTime; this.onlineConfig = onlineConfig; - this.storageConnector = storageConnector; - this.path = path; } public StreamFeatureGroup() { diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java index c3cd6cbd0..d13a9c8e1 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java @@ -26,7 +26,6 @@ import com.logicalclocks.hsfs.OnlineConfig; import com.logicalclocks.hsfs.StatisticsConfig; import com.logicalclocks.hsfs.Storage; -import com.logicalclocks.hsfs.StorageConnector; import com.logicalclocks.hsfs.constructor.QueryBase; import com.logicalclocks.hsfs.metadata.Statistics; @@ -55,7 +54,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ List primaryKeys, List partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, List features, StatisticsConfig statisticsConfig, String onlineTopicName, String topicName, String notificationTopicName, String eventTime, - OnlineConfig onlineConfig, StorageConnector storageConnector, String path) { + OnlineConfig onlineConfig) { this(); this.featureStore = featureStore; this.name = name; @@ -74,8 +73,6 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ this.notificationTopicName = notificationTopicName; this.eventTime = eventTime; this.onlineConfig = onlineConfig; - this.storageConnector = storageConnector; - this.path = path; } public StreamFeatureGroup() { diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java index 61ca2f5a9..585f4780e 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java @@ -132,14 +132,6 @@ public abstract class FeatureGroupBase { @Setter protected OnlineConfig onlineConfig; - @Getter - @Setter - protected StorageConnector storageConnector; - - @Getter - @Setter - protected String path; - @JsonIgnore // These are only used in the client. In the server they are aggregated in the `features` field protected List partitionKeys; diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java index 91bd36819..71be4d995 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java @@ -124,9 +124,6 @@ public static class S3Connector extends StorageConnector { @Getter @Setter protected String bucket; - @Getter @Setter - protected String region; - @Getter @Setter protected String sessionToken; diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java index a937cca1a..da4bf655a 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java @@ -128,7 +128,7 @@ public static Long getTimeStampFromDateString(String inputDate) throws FeatureSt } SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatPattern); - Long commitTimeStamp = dateFormat.parse(tempDate).getTime(); + Long commitTimeStamp = dateFormat.parse(tempDate).getTime();; return commitTimeStamp; } diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java index e62a5e83e..13f907ff4 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java @@ -57,6 +57,10 @@ @JsonIgnoreProperties(ignoreUnknown = true) public class ExternalFeatureGroup extends FeatureGroupBase> { + @Getter + @Setter + private StorageConnector storageConnector; + @Getter @Setter private String query; @@ -65,6 +69,10 @@ public class ExternalFeatureGroup extends FeatureGroupBase> { @Setter private ExternalDataFormat dataFormat; + @Getter + @Setter + private String path; + @Getter @Setter private List options; diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java index 07fe98ff6..9c968b3ba 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java @@ -31,7 +31,6 @@ import com.logicalclocks.hsfs.OnlineConfig; import com.logicalclocks.hsfs.StatisticsConfig; import com.logicalclocks.hsfs.Storage; -import com.logicalclocks.hsfs.StorageConnector; import com.logicalclocks.hsfs.TimeTravelFormat; import com.logicalclocks.hsfs.FeatureGroupBase; import com.logicalclocks.hsfs.metadata.Statistics; @@ -65,8 +64,7 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver String description, List primaryKeys, List partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, TimeTravelFormat timeTravelFormat, List features, StatisticsConfig statisticsConfig, String onlineTopicName, - String topicName, String notificationTopicName, String eventTime, OnlineConfig onlineConfig, - StorageConnector storageConnector, String path) { + String topicName, String notificationTopicName, String eventTime, OnlineConfig onlineConfig) { this(); this.featureStore = featureStore; this.name = name; @@ -87,8 +85,6 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver this.notificationTopicName = notificationTopicName; this.eventTime = eventTime; this.onlineConfig = onlineConfig; - this.storageConnector = storageConnector; - this.path = path; } public FeatureGroup() { diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java index 0c8b9bae3..597e6e3ad 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java @@ -30,7 +30,6 @@ import com.logicalclocks.hsfs.OnlineConfig; import com.logicalclocks.hsfs.StatisticsConfig; import com.logicalclocks.hsfs.Storage; -import com.logicalclocks.hsfs.StorageConnector; import com.logicalclocks.hsfs.FeatureGroupBase; import com.logicalclocks.hsfs.metadata.Statistics; @@ -63,7 +62,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ List primaryKeys, List partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, List features, StatisticsConfig statisticsConfig, String onlineTopicName, String topicName, String notificationTopicName, String eventTime, - OnlineConfig onlineConfig, StorageConnector storageConnector, String path) { + OnlineConfig onlineConfig) { this(); this.featureStore = featureStore; this.name = name; @@ -82,8 +81,6 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ this.notificationTopicName = notificationTopicName; this.eventTime = eventTime; this.onlineConfig = onlineConfig; - this.storageConnector = storageConnector; - this.path = path; } public StreamFeatureGroup() { diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java index 1aed9a650..b30bca98c 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java @@ -219,6 +219,10 @@ public Dataset registerOnDemandTemporaryTable(ExternalFeatureGroup onDemand ? onDemandFeatureGroup.getDataFormat().toString() : null, getOnDemandOptions(onDemandFeatureGroup), onDemandFeatureGroup.getStorageConnector().getPath(onDemandFeatureGroup.getPath())); + if (!Strings.isNullOrEmpty(onDemandFeatureGroup.getLocation())) { + sparkSession.sparkContext().textFile(onDemandFeatureGroup.getLocation(), 0).collect(); + } + dataset.createOrReplaceTempView(alias); return dataset; } diff --git a/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java b/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java index bedd9716e..adddd8310 100644 --- a/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java +++ b/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java @@ -67,11 +67,11 @@ public void testFeatureGroupPrimaryKey() { StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description", Collections.singletonList("primaryKey"), Collections.singletonList("partitionKey"), "hudiPrecombineKey", - true, features, null, "onlineTopicName", null, null, null, null, null, null); + true, features, null, "onlineTopicName", null, null, null, null); Exception pkException = assertThrows(FeatureStoreException.class, () -> { featureGroupEngine.saveFeatureGroupMetaData(featureGroup, - null, null, null, null, null); + null, null, null, null, null);;; }); // Assert @@ -93,11 +93,11 @@ public void testFeatureGroupEventTimeFeature() { StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description", Collections.singletonList("featureA"), null, null, - true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null); + true, features, null, "onlineTopicName", null, null, "eventTime", null); Exception eventTimeException = assertThrows(FeatureStoreException.class, () -> { streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup, - null, null, null, null, null); + null, null, null, null, null);;; }); // Assert @@ -119,7 +119,7 @@ public void testFeatureGroupPartitionPrecombineKeys() { StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description", Collections.singletonList("featureA"), Collections.singletonList("partitionKey"), "hudiPrecombineKey", - true, features, null, "onlineTopicName", null, null, null, null, null, null); + true, features, null, "onlineTopicName", null, null, null, null); Exception partitionException = assertThrows(FeatureStoreException.class, () -> { streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup, @@ -164,7 +164,7 @@ public void testFeatureGroupAppendFeaturesResetSubject() throws FeatureStoreExce StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description", Collections.singletonList("featureA"), null, null, - true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null); + true, features, null, "onlineTopicName", null, null, "eventTime", null); featureGroup.featureGroupEngine = featureGroupEngine; // Act diff --git a/python/hsfs/constructor/fs_query.py b/python/hsfs/constructor/fs_query.py index b367d5b13..b944006f3 100644 --- a/python/hsfs/constructor/fs_query.py +++ b/python/hsfs/constructor/fs_query.py @@ -35,7 +35,6 @@ def __init__( expand: Optional[List[str]] = None, items: Optional[List[Dict[str, Any]]] = None, type: Optional[str] = None, - delta_cached_feature_groups: Optional[List[Dict[str, Any]]] = None, **kwargs, ) -> None: self._query = query @@ -61,14 +60,6 @@ def __init__( else: self._hudi_cached_feature_groups = [] - if delta_cached_feature_groups is not None: - self._delta_cached_feature_groups = [ - hudi_feature_group_alias.HudiFeatureGroupAlias.from_response_json(fg) - for fg in delta_cached_feature_groups - ] - else: - self._delta_cached_feature_groups = [] - @classmethod def from_response_json(cls, json_dict: Dict[str, Any]) -> "FsQuery": json_decamelized = humps.decamelize(json_dict) @@ -136,7 +127,7 @@ def register_delta_tables( feature_store_name: str, read_options: Optional[Dict[str, Any]], ) -> None: - for hudi_fg in self._delta_cached_feature_groups: + for hudi_fg in self._hudi_cached_feature_groups: engine.get_instance().register_delta_temporary_table( hudi_fg, feature_store_id, feature_store_name, read_options ) diff --git a/python/hsfs/core/arrow_flight_client.py b/python/hsfs/core/arrow_flight_client.py index d4a4602a8..53bb96a86 100644 --- a/python/hsfs/core/arrow_flight_client.py +++ b/python/hsfs/core/arrow_flight_client.py @@ -124,13 +124,7 @@ def _is_query_supported_rec(query: query.Query): and query._left_feature_group.storage_connector.type in ArrowFlightClient.SUPPORTED_EXTERNAL_CONNECTORS ) - delta_s3 = ( - isinstance(query._left_feature_group, feature_group.FeatureGroup) - and query._left_feature_group.time_travel_format == "DELTA" - and query._left_feature_group.storage_connector - and query._left_feature_group.storage_connector.type == StorageConnector.S3 - ) - supported = hudi_no_time_travel or supported_connector or delta_s3 + supported = hudi_no_time_travel or supported_connector for j in query._joins: supported &= _is_query_supported_rec(j._query) return supported @@ -555,7 +549,6 @@ def enabled_on_cluster(self) -> bool: def _serialize_featuregroup_connector(fg, query, on_demand_fg_aliases): connector = {} if isinstance(fg, feature_group.ExternalFeatureGroup): - connector["time_travel_type"] = None connector["type"] = fg.storage_connector.type connector["options"] = fg.storage_connector.connector_options() connector["query"] = fg.query[:-1] if fg.query.endswith(";") else fg.query @@ -573,25 +566,8 @@ def _serialize_featuregroup_connector(fg, query, on_demand_fg_aliases): connector["filters"] = _serialize_filter_expression( join_obj._query._filter, join_obj._query, True ) - elif fg.time_travel_format == "DELTA": - connector["time_travel_type"] = "delta" - connector["type"] = fg.storage_connector.type - connector["options"] = fg.storage_connector.connector_options() - if fg.storage_connector.type == StorageConnector.S3: - connector["options"]["path"] = fg.location - connector["query"] = "" - if query._left_feature_group == fg: - connector["filters"] = _serialize_filter_expression( - query._filter, query, True - ) - else: - for join_obj in query._joins: - if join_obj._query._left_feature_group == fg: - connector["filters"] = _serialize_filter_expression( - join_obj._query._filter, join_obj._query, True - ) else: - connector["time_travel_type"] = "hudi" + connector["type"] = "hudi" return connector diff --git a/python/hsfs/core/delta_engine.py b/python/hsfs/core/delta_engine.py index 452052c54..95ea7f37b 100644 --- a/python/hsfs/core/delta_engine.py +++ b/python/hsfs/core/delta_engine.py @@ -52,12 +52,10 @@ def save_delta_fg(self, dataset, write_options, validation_id=None): return self._feature_group_api.commit(self._feature_group, fg_commit) def register_temporary_table(self, delta_fg_alias, read_options): - location = self._feature_group.prepare_spark_location() - delta_options = self._setup_delta_read_opts(delta_fg_alias, read_options) self._spark_session.read.format(self.DELTA_SPARK_FORMAT).options( **delta_options - ).load(location).createOrReplaceTempView( + ).load(self._feature_group.location).createOrReplaceTempView( delta_fg_alias.alias ) @@ -87,17 +85,15 @@ def _setup_delta_read_opts(self, delta_fg_alias, read_options): return delta_options def delete_record(self, delete_df): - location = self._feature_group.prepare_spark_location() - if not DeltaTable.isDeltaTable( - self._spark_session, location + self._spark_session, self._feature_group.location ): raise FeatureStoreException( f"This is no data available in Feature group {self._feature_group.name}, or it not DELTA enabled " ) else: fg_source_table = DeltaTable.forPath( - self._spark_session, location + self._spark_session, self._feature_group.location ) source_alias = ( @@ -113,18 +109,16 @@ def delete_record(self, delete_df): ).whenMatchedDelete().execute() fg_commit = self._get_last_commit_metadata( - self._spark_session, location + self._spark_session, self._feature_group.location ) return self._feature_group_api.commit(self._feature_group, fg_commit) def _write_delta_dataset(self, dataset, write_options): - location = self._feature_group.prepare_spark_location() - if write_options is None: write_options = {} if not DeltaTable.isDeltaTable( - self._spark_session, location + self._spark_session, self._feature_group.location ): ( dataset.write.format(DeltaEngine.DELTA_SPARK_FORMAT) @@ -135,11 +129,11 @@ def _write_delta_dataset(self, dataset, write_options): else [] ) .mode("append") - .save(location) + .save(self._feature_group.location) ) else: fg_source_table = DeltaTable.forPath( - self._spark_session, location + self._spark_session, self._feature_group.location ) source_alias = ( @@ -155,14 +149,9 @@ def _write_delta_dataset(self, dataset, write_options): ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute() return self._get_last_commit_metadata( - self._spark_session, location + self._spark_session, self._feature_group.location ) - def vacuum(self, retention_hours: int): - location = self._feature_group.prepare_spark_location() - retention = f"RETAIN {retention_hours} HOURS" if retention_hours is not None else "" - self._spark_session.sql(f"VACUUM '{location}' {retention}") - def _generate_merge_query(self, source_alias, updates_alias): merge_query_list = [] primary_key = self._feature_group.primary_key diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index f00a044e1..a06d12ca9 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -246,20 +246,6 @@ def commit_delete(feature_group, delete_df, write_options): ) return hudi_engine_instance.delete_record(delete_df, write_options) - @staticmethod - def delta_vacuum(feature_group, retention_hours): - if feature_group.time_travel_format == "DELTA": - delta_engine_instance = delta_engine.DeltaEngine( - feature_group.feature_store_id, - feature_group.feature_store_name, - feature_group, - engine.get_instance()._spark_session, - engine.get_instance()._spark_context, - ) - return delta_engine_instance.vacuum(retention_hours) - else: - return None - def sql(self, query, feature_store_name, dataframe_type, online, read_options): if online and self._online_conn is None: self._online_conn = self._storage_connector_api.get_online_connector( @@ -299,7 +285,7 @@ def append_features(self, feature_group, new_features): if feature_group.time_travel_format == "DELTA": engine.get_instance().add_cols_to_delta_table(feature_group, new_features) else: - engine.get_instance().save_empty_dataframe(feature_group, new_features=new_features) + engine.get_instance().save_empty_dataframe(feature_group) def update_description(self, feature_group, description): """Updates the description of a feature group.""" diff --git a/python/hsfs/core/hudi_engine.py b/python/hsfs/core/hudi_engine.py index 4492f0a19..04c2c5a6b 100644 --- a/python/hsfs/core/hudi_engine.py +++ b/python/hsfs/core/hudi_engine.py @@ -20,10 +20,6 @@ class HudiEngine: - - HUDI_SPEC_FEATURE_NAMES = ["_hoodie_record_key", "_hoodie_partition_path", - "_hoodie_commit_time", "_hoodie_file_name", "_hoodie_commit_seqno"] - HUDI_SPARK_FORMAT = "org.apache.hudi" HUDI_TABLE_NAME = "hoodie.table.name" HUDI_TABLE_STORAGE_TYPE = "hoodie.datasource.write.storage.type" @@ -104,25 +100,21 @@ def delete_record(self, delete_df, write_options): return self._feature_group_api.commit(self._feature_group, fg_commit) def register_temporary_table(self, hudi_fg_alias, read_options): - location = self._feature_group.prepare_spark_location() - hudi_options = self._setup_hudi_read_opts(hudi_fg_alias, read_options) self._spark_session.read.format(self.HUDI_SPARK_FORMAT).options( **hudi_options - ).load(location).createOrReplaceTempView( + ).load(self._feature_group.location).createOrReplaceTempView( hudi_fg_alias.alias ) def _write_hudi_dataset(self, dataset, save_mode, operation, write_options): - location = self._feature_group.prepare_spark_location() - hudi_options = self._setup_hudi_write_opts(operation, write_options) dataset.write.format(HudiEngine.HUDI_SPARK_FORMAT).options(**hudi_options).mode( save_mode - ).save(location) + ).save(self._feature_group.location) feature_group_commit = self._get_last_commit_metadata( - self._spark_context, location + self._spark_context, self._feature_group.location ) return feature_group_commit @@ -152,9 +144,6 @@ def _setup_hudi_write_opts(self, operation, write_options): else self._feature_group.primary_key[0] ) - # dont enable hive sync when using managed FG - hive_sync = self._feature_group.storage_connector is None - hudi_options = { self.HUDI_KEY_GENERATOR_OPT_KEY: self.HUDI_COMPLEX_KEY_GENERATOR_OPT_VAL, self.HUDI_PRECOMBINE_FIELD: pre_combine_key, @@ -164,7 +153,7 @@ def _setup_hudi_write_opts(self, operation, write_options): self.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY: self.DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL if len(partition_key) >= 1 else self.HIVE_NON_PARTITION_EXTRACTOR_CLASS_OPT_VAL, - self.HUDI_HIVE_SYNC_ENABLE: str(hive_sync).lower(), + self.HUDI_HIVE_SYNC_ENABLE: "true", self.HUDI_HIVE_SYNC_MODE: self.HUDI_HIVE_SYNC_MODE_VAL, self.HUDI_HIVE_SYNC_DB: self._feature_store_name, self.HUDI_HIVE_SYNC_TABLE: table_name, @@ -237,8 +226,9 @@ def _setup_hudi_read_opts(self, hudi_fg_alias, read_options): def reconcile_hudi_schema( self, save_empty_dataframe_callback, hudi_fg_alias, read_options ): + fg_table_name = hudi_fg_alias.feature_group._get_table_name() if sorted(self._spark_session.table(hudi_fg_alias.alias).columns) != sorted( - [feature.name for feature in hudi_fg_alias.feature_group._features] + self.HUDI_SPEC_FEATURE_NAMES + self._spark_session.table(fg_table_name).columns ): full_fg = self._feature_group_api.get( feature_store_id=hudi_fg_alias.feature_group._feature_store_id, diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index a287ba4d1..5db04ab8e 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -484,12 +484,6 @@ def register_external_temporary_table( # No op to avoid query failure pass - def register_delta_temporary_table( - self, delta_fg_alias, feature_store_id, feature_store_name, read_options - ): - # No op to avoid query failure - pass - def register_hudi_temporary_table( self, hudi_fg_alias: "hsfs.constructor.hudi_feature_group_alias.HudiFeatureGroupAlias", @@ -1212,7 +1206,7 @@ def save_stream_dataframe( ) def save_empty_dataframe( - self, feature_group: Union[FeatureGroup, ExternalFeatureGroup], new_features=None + self, feature_group: Union[FeatureGroup, ExternalFeatureGroup] ) -> None: """Wrapper around save_dataframe in order to provide no-op.""" pass @@ -1473,9 +1467,9 @@ def _start_offline_materialization(offline_write_options: Dict[str, Any]) -> boo def _convert_feature_log_to_df(feature_log, cols) -> pd.DataFrame: if feature_log is None and cols: return pd.DataFrame(columns=cols) - if not (isinstance(feature_log, (list, pd.DataFrame, pl.DataFrame)) or ( + if not (isinstance(feature_log, (list, pd.DataFrame, pl.DataFrame))) or ( HAS_NUMPY and isinstance(feature_log, np.ndarray) - )): + ): raise ValueError(f"Type '{type(feature_log)}' not accepted") if isinstance(feature_log, list) or ( HAS_NUMPY and isinstance(feature_log, np.ndarray) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index c0d184faa..3a990fe18 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -197,10 +197,12 @@ def register_external_temporary_table(self, external_fg, alias): external_fg.query, external_fg.data_format, external_fg.options, - external_fg.prepare_spark_location(), + external_fg.storage_connector._get_path(external_fg.path), ) else: external_dataset = external_fg.dataframe + if external_fg.location: + self._spark_session.sparkContext.textFile(external_fg.location).collect() external_dataset.createOrReplaceTempView(alias) return external_dataset @@ -215,12 +217,10 @@ def register_hudi_temporary_table( self._spark_context, self._spark_session, ) - hudi_engine_instance.register_temporary_table( hudi_fg_alias, read_options, ) - hudi_engine_instance.reconcile_hudi_schema( self.save_empty_dataframe, hudi_fg_alias, read_options ) @@ -1258,7 +1258,7 @@ def _setup_s3_hadoop_conf(self, storage_connector, path): self._set_s3_hadoop_conf( storage_connector, f"fs.s3a.bucket.{storage_connector.bucket}" ) - return path.replace("s3://", "s3a://", 1) if path is not None else None + return path.replace("s3", "s3a", 1) if path is not None else None def _set_s3_hadoop_conf(self, storage_connector, prefix): if storage_connector.access_key: @@ -1309,22 +1309,13 @@ def is_spark_dataframe(self, dataframe): return True return False - def save_empty_dataframe(self, feature_group, new_features=None): - location = feature_group.prepare_spark_location() - - dataframe = self._spark_session.read.format("hudi").load(location) - - if (new_features is not None): - if isinstance(new_features, list): - for new_feature in new_features: - dataframe = dataframe.withColumn(new_feature.name, lit(None).cast(new_feature.type)) - else: - dataframe = dataframe.withColumn(new_features.name, lit(None).cast(new_features.type)) - + def save_empty_dataframe(self, feature_group): + fg_table_name = feature_group._get_table_name() + dataframe = self._spark_session.table(fg_table_name).limit(0) self.save_dataframe( feature_group, - dataframe.limit(0), + dataframe, "upsert", feature_group.online_enabled, "offline", @@ -1333,22 +1324,20 @@ def save_empty_dataframe(self, feature_group, new_features=None): ) def add_cols_to_delta_table(self, feature_group, new_features): - location = feature_group.prepare_spark_location() - - dataframe = self._spark_session.read.format("delta").load(location) - - if (new_features is not None): - if isinstance(new_features, list): - for new_feature in new_features: - dataframe = dataframe.withColumn(new_feature.name, lit("").cast(new_feature.type)) - else: - dataframe = dataframe.withColumn(new_features.name, lit("").cast(new_features.type)) + new_features_map = {} + if isinstance(new_features, list): + for new_feature in new_features: + new_features_map[new_feature.name] = lit("").cast(new_feature.type) + else: + new_features_map[new_features.name] = lit("").cast(new_features.type) - dataframe.limit(0).write.format("delta").mode( + self._spark_session.read.format("delta").load( + feature_group.location + ).withColumns(new_features_map).limit(0).write.format("delta").mode( "append" ).option("mergeSchema", "true").option( "spark.databricks.delta.schema.autoMerge.enabled", "true" - ).save(location) + ).save(feature_group.location) def _apply_transformation_function( self, diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index a3385afda..4a1db2c57 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -137,8 +137,6 @@ def __init__( Dict[str, Any], ] ] = None, - storage_connector: Union[sc.StorageConnector, Dict[str, Any]] = None, - path: Optional[str] = None, **kwargs, ) -> None: self._version = version @@ -155,14 +153,7 @@ def __init__( self._feature_store_id = featurestore_id self._feature_store = None self._variable_api: VariableApi = VariableApi() - self._path = path - if storage_connector is not None and isinstance(storage_connector, dict): - self._storage_connector = sc.StorageConnector.from_response_json( - storage_connector - ) - else: - self._storage_connector: "sc.StorageConnector" = storage_connector self._online_config = ( OnlineConfig.from_response_json(online_config) if isinstance(online_config, dict) @@ -2055,20 +2046,6 @@ def online_enabled(self) -> bool: def online_enabled(self, online_enabled: bool) -> None: self._online_enabled = online_enabled - @property - def path(self) -> Optional[str]: - return self._path - - @property - def storage_connector(self) -> "sc.StorageConnector": - return self._storage_connector - - def prepare_spark_location(self) -> str: - location = self.location - if (self.storage_connector is not None): - location = self.storage_connector.prepare_spark(location) - return location - @property def topic_name(self) -> Optional[str]: """The topic used for feature group data ingestion.""" @@ -2255,8 +2232,6 @@ def __init__( ] ] = None, offline_backfill_every_hr: Optional[Union[str, int]] = None, - storage_connector: Union[sc.StorageConnector, Dict[str, Any]] = None, - path: Optional[str] = None, **kwargs, ) -> None: super().__init__( @@ -2274,8 +2249,6 @@ def __init__( notification_topic_name=notification_topic_name, deprecated=deprecated, online_config=online_config, - storage_connector=storage_connector, - path=path, ) self._feature_store_name: Optional[str] = featurestore_name self._description: Optional[str] = description @@ -3273,31 +3246,6 @@ def commit_delete_record( """ self._feature_group_engine.commit_delete(self, delete_df, write_options or {}) - def delta_vacuum( - self, - retention_hours: int = None, - ) -> None: - """ Vacuum files that are no longer referenced by a Delta table and are older than the retention threshold. - This method can only be used on feature groups stored as DELTA. - - !!! example - ```python - # connect to the Feature Store - fs = ... - - # get the Feature Group instance - fg = fs.get_or_create_feature_group(...) - - commit_details = fg.delta_vacuum(retention_hours = 168) - - # Arguments - retention_hours: User provided retention period. The default retention threshold for the files is 7 days. - - # Raises - `hsfs.client.exceptions.RestAPIError`. - """ - self._feature_group_engine.delta_vacuum(self, retention_hours) - def as_of( self, wallclock_time: Optional[Union[str, int, datetime, date]] = None, @@ -3606,7 +3554,6 @@ def to_dict(self) -> Dict[str, Any]: "transformationFunctions": [ tf.to_dict() for tf in self._transformation_functions ], - "path": self._path, } if self._online_config: fg_meta_dict["onlineConfig"] = self._online_config.to_dict() @@ -3614,8 +3561,6 @@ def to_dict(self) -> Dict[str, Any]: fg_meta_dict["embeddingIndex"] = self.embedding_index.to_dict() if self._stream: fg_meta_dict["deltaStreamerJobConf"] = self._deltastreamer_jobconf - if self._storage_connector: - fg_meta_dict["storageConnector"] = self._storage_connector.to_dict() return fg_meta_dict def _get_table_name(self) -> str: @@ -3853,8 +3798,6 @@ def __init__( notification_topic_name=notification_topic_name, deprecated=deprecated, online_config=online_config, - storage_connector=storage_connector, - path=path, ) self._feature_store_name = featurestore_name @@ -3863,6 +3806,7 @@ def __init__( self._creator = user.User.from_response_json(creator) self._query = query self._data_format = data_format.upper() if data_format else None + self._path = path self._features = [ feature.Feature.from_response_json(feat) if isinstance(feat, dict) else feat @@ -3903,6 +3847,12 @@ def __init__( self._features = features self._options = options or {} + if storage_connector is not None and isinstance(storage_connector, dict): + self._storage_connector = sc.StorageConnector.from_response_json( + storage_connector + ) + else: + self._storage_connector: "sc.StorageConnector" = storage_connector self._vector_db_client: Optional["VectorDbClient"] = None self._href: Optional[str] = href @@ -4292,10 +4242,18 @@ def query(self) -> Optional[str]: def data_format(self) -> Optional[str]: return self._data_format + @property + def path(self) -> Optional[str]: + return self._path + @property def options(self) -> Optional[Dict[str, Any]]: return self._options + @property + def storage_connector(self) -> "sc.StorageConnector": + return self._storage_connector + @property def creator(self) -> Optional["user.User"]: return self._creator diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py index c1ef352f9..77c48ae1a 100644 --- a/python/hsfs/feature_store.py +++ b/python/hsfs/feature_store.py @@ -526,8 +526,6 @@ def create_feature_group( ] ] = None, offline_backfill_every_hr: Optional[Union[int, str]] = None, - storage_connector: Union[storage_connector.StorageConnector, Dict[str, Any]] = None, - path: Optional[str] = None, ) -> feature_group.FeatureGroup: """Create a feature group metadata object. @@ -632,10 +630,6 @@ def plus_two(value): periodically. The value can be either an integer representing the number of hours between each run or a string representing a cron expression. Set the value to None to avoid scheduling the materialization job. Defaults to None (i.e no scheduling). - storage_connector: the storage connector used to establish connectivity - with the data source. - path: The location within the scope of the storage connector, from where to read - the data for the external feature group # Returns `FeatureGroup`. The feature group metadata object. @@ -663,8 +657,6 @@ def plus_two(value): transformation_functions=transformation_functions, online_config=online_config, offline_backfill_every_hr=offline_backfill_every_hr, - storage_connector=storage_connector, - path=path, ) feature_group_object.feature_store = self return feature_group_object @@ -699,8 +691,6 @@ def get_or_create_feature_group( ] = None, online_config: Optional[Union[OnlineConfig, Dict[str, Any]]] = None, offline_backfill_every_hr: Optional[Union[int, str]] = None, - storage_connector: Union[storage_connector.StorageConnector, Dict[str, Any]] = None, - path: Optional[str] = None, ) -> Union[ feature_group.FeatureGroup, feature_group.ExternalFeatureGroup, @@ -795,10 +785,6 @@ def get_or_create_feature_group( periodically. The value can be either an integer representing the number of hours between each run or a string representing a cron expression. Set the value to None to avoid scheduling the materialization job. Defaults to None (i.e no automatic scheduling). Applies only on Feature Group creation. - storage_connector: the storage connector used to establish connectivity - with the data source. - path: The location within the scope of the storage connector, from where to read - the data for the external feature group # Returns `FeatureGroup`. The feature group metadata object. @@ -835,8 +821,6 @@ def get_or_create_feature_group( transformation_functions=transformation_functions, online_config=online_config, offline_backfill_every_hr=offline_backfill_every_hr, - storage_connector=storage_connector, - path=path, ) feature_group_object.feature_store = self return feature_group_object @@ -879,7 +863,7 @@ def create_on_demand_feature_group( # Arguments name: Name of the external feature group to create. - storage_connector: the storage connector used to establish connectivity + storage_connector: the storage connector to use to establish connectivity with the data source. query: A string containing a SQL query valid for the target data source. the query will be used to pull data from the data sources when the @@ -1036,7 +1020,7 @@ def create_external_feature_group( # Arguments name: Name of the external feature group to create. - storage_connector: the storage connector used to establish connectivity + storage_connector: the storage connector to use to establish connectivity with the data source. query: A string containing a SQL query valid for the target data source. the query will be used to pull data from the data sources when the diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index 7ed887cd9..5f953737f 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -136,12 +136,6 @@ def description(self) -> Optional[str]: def spark_options(self) -> None: pass - def prepare_spark(self, path: Optional[str] = None) -> Optional[str]: - _logger.info( - "This Storage Connector cannot be prepared for Spark." - ) - return path - def read( self, query: Optional[str] = None, @@ -278,7 +272,6 @@ def __init__( server_encryption_algorithm: Optional[str] = None, server_encryption_key: Optional[str] = None, bucket: Optional[str] = None, - region: Optional[str] = None, session_token: Optional[str] = None, iam_role: Optional[str] = None, arguments: Optional[Dict[str, Any]] = None, @@ -292,7 +285,6 @@ def __init__( self._server_encryption_algorithm = server_encryption_algorithm self._server_encryption_key = server_encryption_key self._bucket = bucket - self._region = region self._session_token = session_token self._iam_role = iam_role self._arguments = ( @@ -324,11 +316,6 @@ def bucket(self) -> Optional[str]: """Return the bucket for S3 connectors.""" return self._bucket - @property - def region(self) -> Optional[str]: - """Return the region for S3 connectors.""" - return self._region - @property def session_token(self) -> Optional[str]: """Session token.""" @@ -371,19 +358,6 @@ def prepare_spark(self, path: Optional[str] = None) -> Optional[str]: """ return engine.get_instance().setup_storage_connector(self, path) - def connector_options(self) -> Dict[str, Any]: - """Return options to be passed to an external S3 connector library""" - self.refetch() - options = { - "access_key": self.access_key, - "secret_key": self.secret_key, - "session_token": self.session_token, - "region": self.region, - } - if self.arguments.get("fs.s3a.endpoint"): - options["endpoint"] = self.arguments.get("fs.s3a.endpoint") - return options - def read( self, query: Optional[str] = None, @@ -421,7 +395,7 @@ def read( if options is not None else self.spark_options() ) - if not path.startswith(("s3://", "s3a://")): + if not path.startswith("s3://"): path = self._get_path(path) print( "Prepending default bucket specified on connector, final path: {}".format( diff --git a/python/pyproject.toml b/python/pyproject.toml index a66d15115..d655534df 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -74,7 +74,6 @@ dev-no-opt = [ "pyspark==3.1.1", "moto[s3]==5.0.0", "typeguard==4.2.1", - "delta-spark==1.0.1" ] dev-pandas1 = [ "hopsworks[python]", @@ -85,7 +84,6 @@ dev-pandas1 = [ "moto[s3]==5.0.0", "pandas<=1.5.3", "sqlalchemy<=1.4.48", - "delta-spark==1.0.1" ] dev = ["hopsworks[dev-no-opt,great-expectations,polars]"] polars=[ diff --git a/python/tests/core/test_arrow_flight_client.py b/python/tests/core/test_arrow_flight_client.py index e67006d67..0f480d12f 100644 --- a/python/tests/core/test_arrow_flight_client.py +++ b/python/tests/core/test_arrow_flight_client.py @@ -252,7 +252,7 @@ def test_construct_query_object(self, mocker, backend_fixtures): "right_filter": None, }, }, - "connectors": {"test.fg_test_1": {"time_travel_type": "hudi"}}, + "connectors": {"test.fg_test_1": {"type": "hudi"}}, } query_object["features"] = { @@ -293,7 +293,7 @@ def test_construct_query_object_datetime_filter(self, mocker, backend_fixtures): }, "right_filter": None, }, - "connectors": {"test.fg_test_1": {"time_travel_type": "hudi"}}, + "connectors": {"test.fg_test_1": {"type": "hudi"}}, } query_object["features"] = { @@ -331,7 +331,7 @@ def test_construct_query_object_without_fs(self, mocker, backend_fixtures): }, "right_filter": None, }, - "connectors": {"test.fg_test_1": {"time_travel_type": "hudi"}}, + "connectors": {"test.fg_test_1": {"type": "hudi"}}, } query_object["features"] = { @@ -369,7 +369,7 @@ def test_construct_query_object_without_fs_excluded(self, mocker, backend_fixtur }, "right_filter": None, }, - "connectors": {"test.fg_test_1": {"time_travel_type": "hudi"}}, + "connectors": {"test.fg_test_1": {"type": "hudi"}}, } query_object["features"] = { @@ -430,8 +430,7 @@ def test_construct_query_object_snowflake(self, mocker, backend_fixtures): }, "connectors": { "test.tpch1snowflake_1": { - "time_travel_type": None, - "type": 'SNOWFLAKE', + "type": "SNOWFLAKE", "options": { "user": "test_user", "account": "test_url", diff --git a/python/tests/core/test_feature_group_engine.py b/python/tests/core/test_feature_group_engine.py index 91f1086ed..2847d5219 100644 --- a/python/tests/core/test_feature_group_engine.py +++ b/python/tests/core/test_feature_group_engine.py @@ -525,54 +525,6 @@ def test_commit_delete(self, mocker): # Assert assert mock_hudi_engine.return_value.delete_record.call_count == 1 - def test_clean_delta(self, mocker): - # Arrange - feature_store_id = 99 - - mocker.patch("hsfs.engine.get_instance") - mock_hudi_engine = mocker.patch("hsfs.core.delta_engine.DeltaEngine") - - fg_engine = feature_group_engine.FeatureGroupEngine( - feature_store_id=feature_store_id - ) - - fg = feature_group.FeatureGroup( - name="test", - version=1, - featurestore_id=feature_store_id, - primary_key=[], - partition_key=[], - id=10, - time_travel_format="DELTA", - ) - - # Act - fg_engine.delta_vacuum(feature_group=fg, retention_hours=200) - - # Assert - assert mock_hudi_engine.return_value.vacuum.call_count == 1 - - def test_clean_hudi(self, mocker): - # Arrange - feature_store_id = 99 - - fg_engine = feature_group_engine.FeatureGroupEngine( - feature_store_id=feature_store_id - ) - - fg = feature_group.FeatureGroup( - name="test", - version=1, - featurestore_id=feature_store_id, - primary_key=[], - partition_key=[], - id=10, - time_travel_format="HUDI", - ) - - # Act - fg_engine.delta_vacuum(feature_group=fg, retention_hours=200) - def test_sql(self, mocker): # Arrange feature_store_id = 99 diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index 488232dee..3c7fa999c 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -174,6 +174,43 @@ def test_register_external_temporary_table(self, mocker): # Arrange mocker.patch("hopsworks_common.client.get_instance") mock_sc_read = mocker.patch("hsfs.storage_connector.JdbcConnector.read") + mock_pyspark_getOrCreate = mocker.patch( + "pyspark.sql.session.SparkSession.builder.getOrCreate" + ) + + spark_engine = spark.Engine() + + jdbc_connector = storage_connector.JdbcConnector( + id=1, + name="test_connector", + featurestore_id=1, + connection_string="", + arguments="", + ) + + external_fg = feature_group.ExternalFeatureGroup( + storage_connector=jdbc_connector, id=10 + ) + + # Act + spark_engine.register_external_temporary_table( + external_fg=external_fg, + alias=None, + ) + + # Assert + assert ( + mock_pyspark_getOrCreate.return_value.sparkContext.textFile.call_count == 0 + ) + assert mock_sc_read.return_value.createOrReplaceTempView.call_count == 1 + + def test_register_external_temporary_table_external_fg_location(self, mocker): + # Arrange + mocker.patch("hopsworks_common.client.get_instance") + mock_sc_read = mocker.patch("hsfs.storage_connector.JdbcConnector.read") + mock_pyspark_getOrCreate = mocker.patch( + "pyspark.sql.session.SparkSession.builder.getOrCreate" + ) spark_engine = spark.Engine() @@ -196,6 +233,9 @@ def test_register_external_temporary_table(self, mocker): ) # Assert + assert ( + mock_pyspark_getOrCreate.return_value.sparkContext.textFile.call_count == 1 + ) assert mock_sc_read.return_value.createOrReplaceTempView.call_count == 1 def test_register_hudi_temporary_table(self, mocker): @@ -4356,11 +4396,11 @@ def test_setup_s3_hadoop_conf_legacy(self, mocker): # Act result = spark_engine._setup_s3_hadoop_conf( storage_connector=s3_connector, - path="s3://_test_path", + path="s3_test_path", ) # Assert - assert result == "s3a://_test_path" + assert result == "s3a_test_path" assert ( mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.call_count == 14 @@ -4413,11 +4453,11 @@ def test_setup_s3_hadoop_conf_bucket_scope(self, mocker): # Act result = spark_engine._setup_s3_hadoop_conf( storage_connector=s3_connector, - path="s3://_test_path", + path="s3_test_path", ) # Assert - assert result == "s3a://_test_path" + assert result == "s3a_test_path" assert ( mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.call_count == 14 @@ -4519,9 +4559,7 @@ def test_save_empty_dataframe(self, mocker): mock_spark_engine_save_dataframe = mocker.patch( "hsfs.engine.spark.Engine.save_dataframe" ) - mock_spark_read = mocker.patch("pyspark.sql.SparkSession.read") - mock_format = mocker.Mock() - mock_spark_read.format.return_value = mock_format + mock_spark_table = mocker.patch("pyspark.sql.session.SparkSession.table") # Arrange spark_engine = spark.Engine() @@ -4541,7 +4579,7 @@ def test_save_empty_dataframe(self, mocker): # Assert assert mock_spark_engine_save_dataframe.call_count == 1 - assert mock_spark_read.format.call_count == 1 + assert mock_spark_table.call_count == 1 def test_apply_transformation_function_single_output(self, mocker): # Arrange diff --git a/python/tests/test_feature_group.py b/python/tests/test_feature_group.py index 5e01b5a10..bb2944f97 100644 --- a/python/tests/test_feature_group.py +++ b/python/tests/test_feature_group.py @@ -32,7 +32,7 @@ ) from hsfs.client.exceptions import FeatureStoreException, RestAPIError from hsfs.core.constants import HAS_GREAT_EXPECTATIONS -from hsfs.engine import python, spark +from hsfs.engine import python from hsfs.transformation_function import TransformationType @@ -908,50 +908,3 @@ def test_from_response_json_transformation_functions(self, backend_fixtures): assert ( fg.expectation_suite.expectation_suite_name == "test_expectation_suite_name" ) - - def test_prepare_spark_location(self, mocker, backend_fixtures): - # Arrange - engine = spark.Engine() - engine_instance = mocker.patch("hsfs.engine.get_instance", return_value=engine) - json = backend_fixtures["feature_group"]["get_basic_info"]["response"] - fg = feature_group.FeatureGroup.from_response_json(json) - fg._location = f"{fg.name}_{fg.version}" - - # Act - path = fg.prepare_spark_location() - - # Assert - assert fg.location == path - engine_instance.assert_not_called() - - def test_prepare_spark_location_with_s3_connector(self, mocker, backend_fixtures): - # Arrange - engine = spark.Engine() - engine_instance = mocker.patch("hsfs.engine.get_instance", return_value=engine) - json = backend_fixtures["feature_group"]["get_basic_info"]["response"] - fg = feature_group.FeatureGroup.from_response_json(json) - fg._location = f"{fg.name}_{fg.version}" - fg._storage_connector = storage_connector.S3Connector(id=1, name="s3_conn", featurestore_id=fg.feature_store_id) - - # Act - path = fg.prepare_spark_location() - - # Assert - assert fg.location == path - engine_instance.assert_called_once() - - def test_prepare_spark_location_with_s3_connector_python(self, mocker, backend_fixtures): - # Arrange - engine = python.Engine() - engine_instance = mocker.patch("hsfs.engine.get_instance", return_value=engine) - json = backend_fixtures["feature_group"]["get_basic_info"]["response"] - fg = feature_group.FeatureGroup.from_response_json(json) - fg._location = f"{fg.name}_{fg.version}" - fg._storage_connector = storage_connector.S3Connector(id=1, name="s3_conn", featurestore_id=fg.feature_store_id) - - # Act - with pytest.raises(AttributeError): - fg.prepare_spark_location() - - # Assert - engine_instance.assert_called_once() diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index 6b8c49311..dfc0badfb 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -247,18 +247,6 @@ def run_feature_monitoring(job_conf: Dict[str, str]) -> None: raise e -def delta_vacuum_fg(spark: SparkSession, job_conf: Dict[Any, Any]) -> None: - """ - Run delta vacuum on a feature group. - """ - feature_store = job_conf.pop("feature_store") - fs = get_feature_store_handle(feature_store) - - entity = fs.get_feature_group(name=job_conf["name"], version=job_conf["version"]) - - entity.delta_vacuum() - - if __name__ == "__main__": # Setup spark first so it fails faster in case of args errors # Otherwise the resource manager will wait until the spark application master @@ -277,7 +265,6 @@ def delta_vacuum_fg(spark: SparkSession, job_conf: Dict[Any, Any]) -> None: "ge_validate", "import_fg", "run_feature_monitoring", - "delta_vacuum_fg", ], help="Operation type", ) @@ -316,8 +303,6 @@ def parse_isoformat_date(da: str) -> datetime: import_fg(job_conf) elif args.op == "run_feature_monitoring": run_feature_monitoring(job_conf) - elif args.op == "delta_vacuum_fg": - delta_vacuum_fg(spark, job_conf) success = True except Exception: