From 11ed24613d98b8567ee66cd373e96f7cfea717fd Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 11 Sep 2024 12:03:35 +0300 Subject: [PATCH 01/33] init --- .../hsfs/beam/StreamFeatureGroup.java | 5 ++- .../hsfs/flink/StreamFeatureGroup.java | 5 ++- .../logicalclocks/hsfs/FeatureGroupBase.java | 8 ++++ .../hsfs/engine/FeatureGroupUtils.java | 2 +- .../hsfs/spark/ExternalFeatureGroup.java | 8 ---- .../hsfs/spark/FeatureGroup.java | 6 ++- .../hsfs/spark/StreamFeatureGroup.java | 5 ++- .../hsfs/spark/TestFeatureGroup.java | 12 +++--- python/hsfs/feature_group.py | 42 ++++++++++++------- python/hsfs/feature_store.py | 16 +++++++ 10 files changed, 75 insertions(+), 34 deletions(-) diff --git a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java index c4904ddc3..9d3c41ee6 100644 --- a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java +++ b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java @@ -25,6 +25,7 @@ import com.logicalclocks.hsfs.OnlineConfig; import com.logicalclocks.hsfs.StatisticsConfig; import com.logicalclocks.hsfs.Storage; +import com.logicalclocks.hsfs.StorageConnector; import com.logicalclocks.hsfs.beam.engine.FeatureGroupEngine; import com.logicalclocks.hsfs.beam.engine.BeamProducer; import com.logicalclocks.hsfs.constructor.QueryBase; @@ -48,7 +49,7 @@ public class StreamFeatureGroup extends FeatureGroupBase> { public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description, List primaryKeys, List partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, List features, StatisticsConfig statisticsConfig, String onlineTopicName, - String eventTime, OnlineConfig onlineConfig) { + String eventTime, OnlineConfig onlineConfig, StorageConnector storageConnector, String path) { this(); this.featureStore = featureStore; this.name = name; @@ -65,6 +66,8 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ this.onlineTopicName = onlineTopicName; this.eventTime = eventTime; this.onlineConfig = onlineConfig; + this.storageConnector = storageConnector; + this.path = path; } public StreamFeatureGroup() { diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java index d13a9c8e1..c3cd6cbd0 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java @@ -26,6 +26,7 @@ import com.logicalclocks.hsfs.OnlineConfig; import com.logicalclocks.hsfs.StatisticsConfig; import com.logicalclocks.hsfs.Storage; +import com.logicalclocks.hsfs.StorageConnector; import com.logicalclocks.hsfs.constructor.QueryBase; import com.logicalclocks.hsfs.metadata.Statistics; @@ -54,7 +55,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ List primaryKeys, List partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, List features, StatisticsConfig statisticsConfig, String onlineTopicName, String topicName, String notificationTopicName, String eventTime, - OnlineConfig onlineConfig) { + OnlineConfig onlineConfig, StorageConnector storageConnector, String path) { this(); this.featureStore = featureStore; this.name = name; @@ -73,6 +74,8 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ this.notificationTopicName = notificationTopicName; this.eventTime = eventTime; this.onlineConfig = onlineConfig; + this.storageConnector = storageConnector; + this.path = path; } public StreamFeatureGroup() { diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java index 585f4780e..61ca2f5a9 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java @@ -132,6 +132,14 @@ public abstract class FeatureGroupBase { @Setter protected OnlineConfig onlineConfig; + @Getter + @Setter + protected StorageConnector storageConnector; + + @Getter + @Setter + protected String path; + @JsonIgnore // These are only used in the client. In the server they are aggregated in the `features` field protected List partitionKeys; diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java index da4bf655a..a937cca1a 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java @@ -128,7 +128,7 @@ public static Long getTimeStampFromDateString(String inputDate) throws FeatureSt } SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatPattern); - Long commitTimeStamp = dateFormat.parse(tempDate).getTime();; + Long commitTimeStamp = dateFormat.parse(tempDate).getTime(); return commitTimeStamp; } diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java index 13f907ff4..e62a5e83e 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java @@ -57,10 +57,6 @@ @JsonIgnoreProperties(ignoreUnknown = true) public class ExternalFeatureGroup extends FeatureGroupBase> { - @Getter - @Setter - private StorageConnector storageConnector; - @Getter @Setter private String query; @@ -69,10 +65,6 @@ public class ExternalFeatureGroup extends FeatureGroupBase> { @Setter private ExternalDataFormat dataFormat; - @Getter - @Setter - private String path; - @Getter @Setter private List options; diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java index 9c968b3ba..07fe98ff6 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java @@ -31,6 +31,7 @@ import com.logicalclocks.hsfs.OnlineConfig; import com.logicalclocks.hsfs.StatisticsConfig; import com.logicalclocks.hsfs.Storage; +import com.logicalclocks.hsfs.StorageConnector; import com.logicalclocks.hsfs.TimeTravelFormat; import com.logicalclocks.hsfs.FeatureGroupBase; import com.logicalclocks.hsfs.metadata.Statistics; @@ -64,7 +65,8 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver String description, List primaryKeys, List partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, TimeTravelFormat timeTravelFormat, List features, StatisticsConfig statisticsConfig, String onlineTopicName, - String topicName, String notificationTopicName, String eventTime, OnlineConfig onlineConfig) { + String topicName, String notificationTopicName, String eventTime, OnlineConfig onlineConfig, + StorageConnector storageConnector, String path) { this(); this.featureStore = featureStore; this.name = name; @@ -85,6 +87,8 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver this.notificationTopicName = notificationTopicName; this.eventTime = eventTime; this.onlineConfig = onlineConfig; + this.storageConnector = storageConnector; + this.path = path; } public FeatureGroup() { diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java index 597e6e3ad..0c8b9bae3 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java @@ -30,6 +30,7 @@ import com.logicalclocks.hsfs.OnlineConfig; import com.logicalclocks.hsfs.StatisticsConfig; import com.logicalclocks.hsfs.Storage; +import com.logicalclocks.hsfs.StorageConnector; import com.logicalclocks.hsfs.FeatureGroupBase; import com.logicalclocks.hsfs.metadata.Statistics; @@ -62,7 +63,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ List primaryKeys, List partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, List features, StatisticsConfig statisticsConfig, String onlineTopicName, String topicName, String notificationTopicName, String eventTime, - OnlineConfig onlineConfig) { + OnlineConfig onlineConfig, StorageConnector storageConnector, String path) { this(); this.featureStore = featureStore; this.name = name; @@ -81,6 +82,8 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ this.notificationTopicName = notificationTopicName; this.eventTime = eventTime; this.onlineConfig = onlineConfig; + this.storageConnector = storageConnector; + this.path = path; } public StreamFeatureGroup() { diff --git a/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java b/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java index adddd8310..bedd9716e 100644 --- a/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java +++ b/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java @@ -67,11 +67,11 @@ public void testFeatureGroupPrimaryKey() { StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description", Collections.singletonList("primaryKey"), Collections.singletonList("partitionKey"), "hudiPrecombineKey", - true, features, null, "onlineTopicName", null, null, null, null); + true, features, null, "onlineTopicName", null, null, null, null, null, null); Exception pkException = assertThrows(FeatureStoreException.class, () -> { featureGroupEngine.saveFeatureGroupMetaData(featureGroup, - null, null, null, null, null);;; + null, null, null, null, null); }); // Assert @@ -93,11 +93,11 @@ public void testFeatureGroupEventTimeFeature() { StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description", Collections.singletonList("featureA"), null, null, - true, features, null, "onlineTopicName", null, null, "eventTime", null); + true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null); Exception eventTimeException = assertThrows(FeatureStoreException.class, () -> { streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup, - null, null, null, null, null);;; + null, null, null, null, null); }); // Assert @@ -119,7 +119,7 @@ public void testFeatureGroupPartitionPrecombineKeys() { StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description", Collections.singletonList("featureA"), Collections.singletonList("partitionKey"), "hudiPrecombineKey", - true, features, null, "onlineTopicName", null, null, null, null); + true, features, null, "onlineTopicName", null, null, null, null, null, null); Exception partitionException = assertThrows(FeatureStoreException.class, () -> { streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup, @@ -164,7 +164,7 @@ public void testFeatureGroupAppendFeaturesResetSubject() throws FeatureStoreExce StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description", Collections.singletonList("featureA"), null, null, - true, features, null, "onlineTopicName", null, null, "eventTime", null); + true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null); featureGroup.featureGroupEngine = featureGroupEngine; // Act diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 7eb7062c3..df1cb68e1 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -137,6 +137,8 @@ def __init__( Dict[str, Any], ] ] = None, + storage_connector: Union[sc.StorageConnector, Dict[str, Any]] = None, + path: Optional[str] = None, **kwargs, ) -> None: self._version = version @@ -153,6 +155,14 @@ def __init__( self._feature_store_id = featurestore_id self._feature_store = None self._variable_api: VariableApi = VariableApi() + self._path = path + + if storage_connector is not None and isinstance(storage_connector, dict): + self._storage_connector = sc.StorageConnector.from_response_json( + storage_connector + ) + else: + self._storage_connector: "sc.StorageConnector" = storage_connector self._online_config = ( OnlineConfig.from_response_json(online_config) @@ -1941,6 +1951,14 @@ def online_enabled(self) -> bool: def online_enabled(self, online_enabled: bool) -> None: self._online_enabled = online_enabled + @property + def path(self) -> Optional[str]: + return self._path + + @property + def storage_connector(self) -> "sc.StorageConnector": + return self._storage_connector + @property def topic_name(self) -> Optional[str]: """The topic used for feature group data ingestion.""" @@ -2127,6 +2145,8 @@ def __init__( ] ] = None, offline_backfill_every_hr: Optional[Union[str, int]] = None, + storage_connector: Union[sc.StorageConnector, Dict[str, Any]] = None, + path: Optional[str] = None, **kwargs, ) -> None: super().__init__( @@ -2144,6 +2164,8 @@ def __init__( notification_topic_name=notification_topic_name, deprecated=deprecated, online_config=online_config, + storage_connector=storage_connector, + path=path, ) self._feature_store_name: Optional[str] = featurestore_name self._description: Optional[str] = description @@ -3447,6 +3469,7 @@ def to_dict(self) -> Dict[str, Any]: "notificationTopicName": self.notification_topic_name, "deprecated": self.deprecated, "transformationFunctions": self._transformation_functions, + "path": self._path, } if self._online_config: fg_meta_dict["onlineConfig"] = self._online_config.to_dict() @@ -3454,6 +3477,8 @@ def to_dict(self) -> Dict[str, Any]: fg_meta_dict["embeddingIndex"] = self.embedding_index.to_dict() if self._stream: fg_meta_dict["deltaStreamerJobConf"] = self._deltastreamer_jobconf + if self._storage_connector: + fg_meta_dict["storageConnector"] = self._storage_connector.to_dict() return fg_meta_dict def _get_table_name(self) -> str: @@ -3691,6 +3716,8 @@ def __init__( notification_topic_name=notification_topic_name, deprecated=deprecated, online_config=online_config, + storage_connector=storage_connector, + path=path, ) self._feature_store_name = featurestore_name @@ -3699,7 +3726,6 @@ def __init__( self._creator = user.User.from_response_json(creator) self._query = query self._data_format = data_format.upper() if data_format else None - self._path = path self._features = [ feature.Feature.from_response_json(feat) if isinstance(feat, dict) else feat @@ -3740,12 +3766,6 @@ def __init__( self._features = features self._options = options or {} - if storage_connector is not None and isinstance(storage_connector, dict): - self._storage_connector = sc.StorageConnector.from_response_json( - storage_connector - ) - else: - self._storage_connector: "sc.StorageConnector" = storage_connector self._vector_db_client: Optional["VectorDbClient"] = None self._href: Optional[str] = href @@ -4135,18 +4155,10 @@ def query(self) -> Optional[str]: def data_format(self) -> Optional[str]: return self._data_format - @property - def path(self) -> Optional[str]: - return self._path - @property def options(self) -> Optional[Dict[str, Any]]: return self._options - @property - def storage_connector(self) -> "sc.StorageConnector": - return self._storage_connector - @property def creator(self) -> Optional["user.User"]: return self._creator diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py index 6cf52e87f..d49356cbf 100644 --- a/python/hsfs/feature_store.py +++ b/python/hsfs/feature_store.py @@ -521,6 +521,8 @@ def create_feature_group( ] ] = None, offline_backfill_every_hr: Optional[Union[int, str]] = None, + storage_connector: Union[storage_connector.StorageConnector, Dict[str, Any]] = None, + path: Optional[str] = None, ) -> feature_group.FeatureGroup: """Create a feature group metadata object. @@ -625,6 +627,10 @@ def plus_two(value): periodically. The value can be either an integer representing the number of hours between each run or a string representing a cron expression. Set the value to None to avoid scheduling the materialization job. Defaults to None (i.e no scheduling). + storage_connector: the storage connector to use to establish connectivity + with the data source. + path: The location within the scope of the storage connector, from where to read + the data for the external feature group # Returns `FeatureGroup`. The feature group metadata object. @@ -652,6 +658,8 @@ def plus_two(value): transformation_functions=transformation_functions, online_config=online_config, offline_backfill_every_hr=offline_backfill_every_hr, + storage_connector=storage_connector, + path=path, ) feature_group_object.feature_store = self return feature_group_object @@ -686,6 +694,8 @@ def get_or_create_feature_group( ] = None, online_config: Optional[Union[OnlineConfig, Dict[str, Any]]] = None, offline_backfill_every_hr: Optional[Union[int, str]] = None, + storage_connector: Union[storage_connector.StorageConnector, Dict[str, Any]] = None, + path: Optional[str] = None, ) -> Union[ feature_group.FeatureGroup, feature_group.ExternalFeatureGroup, @@ -780,6 +790,10 @@ def get_or_create_feature_group( periodically. The value can be either an integer representing the number of hours between each run or a string representing a cron expression. Set the value to None to avoid scheduling the materialization job. Defaults to None (i.e no automatic scheduling). Applies only on Feature Group creation. + storage_connector: the storage connector to use to establish connectivity + with the data source. + path: The location within the scope of the storage connector, from where to read + the data for the external feature group # Returns `FeatureGroup`. The feature group metadata object. @@ -816,6 +830,8 @@ def get_or_create_feature_group( transformation_functions=transformation_functions, online_config=online_config, offline_backfill_every_hr=offline_backfill_every_hr, + storage_connector=storage_connector, + path=path, ) feature_group_object.feature_store = self return feature_group_object From ec6aa89651518a268eeef4428755df288a007919 Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 11 Sep 2024 17:42:24 +0300 Subject: [PATCH 02/33] add region --- .../main/java/com/logicalclocks/hsfs/StorageConnector.java | 3 +++ python/hsfs/storage_connector.py | 7 +++++++ 2 files changed, 10 insertions(+) diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java index 71be4d995..91bd36819 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java @@ -124,6 +124,9 @@ public static class S3Connector extends StorageConnector { @Getter @Setter protected String bucket; + @Getter @Setter + protected String region; + @Getter @Setter protected String sessionToken; diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index 9f959b1da..c4e496317 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -267,6 +267,7 @@ def __init__( server_encryption_algorithm: Optional[str] = None, server_encryption_key: Optional[str] = None, bucket: Optional[str] = None, + region: Optional[str] = None, session_token: Optional[str] = None, iam_role: Optional[str] = None, arguments: Optional[Dict[str, Any]] = None, @@ -280,6 +281,7 @@ def __init__( self._server_encryption_algorithm = server_encryption_algorithm self._server_encryption_key = server_encryption_key self._bucket = bucket + self._region = region self._session_token = session_token self._iam_role = iam_role self._arguments = ( @@ -310,6 +312,11 @@ def server_encryption_key(self) -> Optional[str]: def bucket(self) -> Optional[str]: """Return the bucket for S3 connectors.""" return self._bucket + + @property + def region(self) -> Optional[str]: + """Return the region for S3 connectors.""" + return self._region @property def session_token(self) -> Optional[str]: From 6bc8e799649cf13bf0584036ca232add707f3e70 Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 11 Sep 2024 17:45:37 +0300 Subject: [PATCH 03/33] fix ruff --- python/hsfs/storage_connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index c4e496317..ba7568233 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -312,7 +312,7 @@ def server_encryption_key(self) -> Optional[str]: def bucket(self) -> Optional[str]: """Return the bucket for S3 connectors.""" return self._bucket - + @property def region(self) -> Optional[str]: """Return the region for S3 connectors.""" From 6b669890e92f839d8f65e64aba3d90f2b2a9ea49 Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 18 Sep 2024 17:10:28 +0300 Subject: [PATCH 04/33] temp --- python/hsfs/constructor/fs_query.py | 25 +++++++++++++++---------- python/hsfs/core/hudi_engine.py | 9 +++++++-- python/hsfs/engine/spark.py | 9 ++++++++- 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/python/hsfs/constructor/fs_query.py b/python/hsfs/constructor/fs_query.py index b944006f3..fb3c4ddf1 100644 --- a/python/hsfs/constructor/fs_query.py +++ b/python/hsfs/constructor/fs_query.py @@ -99,16 +99,21 @@ def register_external( Union[TypeVar("pyspark.sql.DataFrame"), TypeVar("pyspark.RDD")] ] = None, ) -> None: - if self._on_demand_fg_aliases is None: - return - - for external_fg_alias in self._on_demand_fg_aliases: - if type(external_fg_alias.on_demand_feature_group).__name__ == "SpineGroup": - external_fg_alias.on_demand_feature_group.dataframe = spine - engine.get_instance().register_external_temporary_table( - external_fg_alias.on_demand_feature_group, - external_fg_alias.alias, - ) + if self._on_demand_fg_aliases is not None: + for external_fg_alias in self._on_demand_fg_aliases: + if type(external_fg_alias.on_demand_feature_group).__name__ == "SpineGroup": + external_fg_alias.on_demand_feature_group.dataframe = spine + engine.get_instance().register_external_temporary_table( + external_fg_alias.on_demand_feature_group, + external_fg_alias.alias, + ) + + if self._hudi_cached_feature_groups is not None: + for external_fg_alias in self._hudi_cached_feature_groups: + engine.get_instance().register_external_temporary_table( + external_fg_alias.feature_group, + external_fg_alias.alias, + ) def register_hudi_tables( self, diff --git a/python/hsfs/core/hudi_engine.py b/python/hsfs/core/hudi_engine.py index 04c2c5a6b..a4707ebc2 100644 --- a/python/hsfs/core/hudi_engine.py +++ b/python/hsfs/core/hudi_engine.py @@ -108,13 +108,18 @@ def register_temporary_table(self, hudi_fg_alias, read_options): ) def _write_hudi_dataset(self, dataset, save_mode, operation, write_options): + if (self._feature_group.storage_connector is None): + location = self._feature_group.location + else: + location = self._feature_group.storage_connector._get_path(self._feature_group.path) + hudi_options = self._setup_hudi_write_opts(operation, write_options) dataset.write.format(HudiEngine.HUDI_SPARK_FORMAT).options(**hudi_options).mode( save_mode - ).save(self._feature_group.location) + ).save(location) feature_group_commit = self._get_last_commit_metadata( - self._spark_context, self._feature_group.location + self._spark_context, location ) return feature_group_commit diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index a1fe19a62..7fd8dd1f0 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -188,13 +188,20 @@ def set_job_group(self, group_id, description): self._spark_session.sparkContext.setJobGroup(group_id, description) def register_external_temporary_table(self, external_fg, alias): - if not isinstance(external_fg, fg_mod.SpineGroup): + if isinstance(external_fg, fg_mod.ExternalFeatureGroup): external_dataset = external_fg.storage_connector.read( external_fg.query, external_fg.data_format, external_fg.options, external_fg.storage_connector._get_path(external_fg.path), ) + elif isinstance(external_fg, fg_mod.FeatureGroup): + external_dataset = external_fg.storage_connector.read( + None, + external_fg.timeTravelFormat, + None, + external_fg.storage_connector._get_path(external_fg.path), + ) else: external_dataset = external_fg.dataframe if external_fg.location: From b3fa7e3695a3286f40967ed652377d498d80541d Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 18 Sep 2024 17:20:04 +0300 Subject: [PATCH 05/33] s3 -> s3a --- python/hsfs/storage_connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index ba7568233..6a4e9709d 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -331,7 +331,7 @@ def iam_role(self) -> Optional[str]: @property def path(self) -> Optional[str]: """If the connector refers to a path (e.g. S3) - return the path of the connector""" - return "s3://" + self._bucket + return "s3a://" + self._bucket @property def arguments(self) -> Optional[Dict[str, Any]]: From 07f3a35b25fa24b77066a5bc0c165d82e61cdc24 Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 18 Sep 2024 17:26:56 +0300 Subject: [PATCH 06/33] time_travel_format --- python/hsfs/engine/spark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 7fd8dd1f0..fb48a5b9a 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -198,7 +198,7 @@ def register_external_temporary_table(self, external_fg, alias): elif isinstance(external_fg, fg_mod.FeatureGroup): external_dataset = external_fg.storage_connector.read( None, - external_fg.timeTravelFormat, + external_fg.time_travel_format, None, external_fg.storage_connector._get_path(external_fg.path), ) From 5d7dff65ba22a778f1576eb0e6105cd1a82ac8f6 Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 18 Sep 2024 17:46:50 +0300 Subject: [PATCH 07/33] s3 -> s3a --- python/hsfs/storage_connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index 6a4e9709d..277b1842f 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -397,7 +397,7 @@ def read( if options is not None else self.spark_options() ) - if not path.startswith("s3://"): + if not path.startswith("s3a://"): path = self._get_path(path) print( "Prepending default bucket specified on connector, final path: {}".format( From 23670c768a6661a1f7443a2894ff9419e6d745a2 Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 18 Sep 2024 17:58:19 +0300 Subject: [PATCH 08/33] s3 issue continues --- python/hsfs/engine/spark.py | 2 +- python/hsfs/storage_connector.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index fb48a5b9a..332e2b44c 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -1228,7 +1228,7 @@ def _setup_s3_hadoop_conf(self, storage_connector, path): FS_S3_ENDPOINT, storage_connector.spark_options().get(FS_S3_ENDPOINT) ) - return path.replace("s3", "s3a", 1) if path is not None else None + return path.replace("s3:", "s3a:", 1) if path is not None else None def _setup_adls_hadoop_conf(self, storage_connector, path): for k, v in storage_connector.spark_options().items(): diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index 277b1842f..ba7568233 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -331,7 +331,7 @@ def iam_role(self) -> Optional[str]: @property def path(self) -> Optional[str]: """If the connector refers to a path (e.g. S3) - return the path of the connector""" - return "s3a://" + self._bucket + return "s3://" + self._bucket @property def arguments(self) -> Optional[Dict[str, Any]]: @@ -397,7 +397,7 @@ def read( if options is not None else self.spark_options() ) - if not path.startswith("s3a://"): + if not path.startswith("s3://"): path = self._get_path(path) print( "Prepending default bucket specified on connector, final path: {}".format( From 8922c81800c8d2a7a08c9f5c4e8fb4463023d80e Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 18 Sep 2024 18:08:22 +0300 Subject: [PATCH 09/33] prepare_spark --- python/hsfs/core/hudi_engine.py | 2 +- python/hsfs/engine/spark.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/hsfs/core/hudi_engine.py b/python/hsfs/core/hudi_engine.py index a4707ebc2..47e8b6674 100644 --- a/python/hsfs/core/hudi_engine.py +++ b/python/hsfs/core/hudi_engine.py @@ -111,7 +111,7 @@ def _write_hudi_dataset(self, dataset, save_mode, operation, write_options): if (self._feature_group.storage_connector is None): location = self._feature_group.location else: - location = self._feature_group.storage_connector._get_path(self._feature_group.path) + location = self._feature_group.storage_connector.prepare_spark(self._feature_group.path) hudi_options = self._setup_hudi_write_opts(operation, write_options) dataset.write.format(HudiEngine.HUDI_SPARK_FORMAT).options(**hudi_options).mode( diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 332e2b44c..1d28137da 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -200,7 +200,7 @@ def register_external_temporary_table(self, external_fg, alias): None, external_fg.time_travel_format, None, - external_fg.storage_connector._get_path(external_fg.path), + external_fg.storage_connector.prepare_spark(external_fg.path), ) else: external_dataset = external_fg.dataframe From 6b1a53c289a249fa1d45c02f5380ee2d7f3dd1bd Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 18 Sep 2024 18:17:54 +0300 Subject: [PATCH 10/33] path fix? --- python/hsfs/core/hudi_engine.py | 4 +++- python/hsfs/engine/spark.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/python/hsfs/core/hudi_engine.py b/python/hsfs/core/hudi_engine.py index 47e8b6674..feea12ad4 100644 --- a/python/hsfs/core/hudi_engine.py +++ b/python/hsfs/core/hudi_engine.py @@ -111,7 +111,9 @@ def _write_hudi_dataset(self, dataset, save_mode, operation, write_options): if (self._feature_group.storage_connector is None): location = self._feature_group.location else: - location = self._feature_group.storage_connector.prepare_spark(self._feature_group.path) + location = self._feature_group.storage_connector.prepare_spark( + self._feature_group.storage_connector._get_path(self._feature_group.path) + ) hudi_options = self._setup_hudi_write_opts(operation, write_options) dataset.write.format(HudiEngine.HUDI_SPARK_FORMAT).options(**hudi_options).mode( diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 1d28137da..ca64522af 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -200,7 +200,7 @@ def register_external_temporary_table(self, external_fg, alias): None, external_fg.time_travel_format, None, - external_fg.storage_connector.prepare_spark(external_fg.path), + external_fg.path, ) else: external_dataset = external_fg.dataframe From 150f836dba3c60003323a30337c82154257c884b Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 18 Sep 2024 18:44:59 +0300 Subject: [PATCH 11/33] reconcile_hudi_schema --- python/hsfs/core/hudi_engine.py | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/python/hsfs/core/hudi_engine.py b/python/hsfs/core/hudi_engine.py index feea12ad4..0e22d80de 100644 --- a/python/hsfs/core/hudi_engine.py +++ b/python/hsfs/core/hudi_engine.py @@ -233,22 +233,23 @@ def _setup_hudi_read_opts(self, hudi_fg_alias, read_options): def reconcile_hudi_schema( self, save_empty_dataframe_callback, hudi_fg_alias, read_options ): - fg_table_name = hudi_fg_alias.feature_group._get_table_name() - if sorted(self._spark_session.table(hudi_fg_alias.alias).columns) != sorted( - self._spark_session.table(fg_table_name).columns - ): - full_fg = self._feature_group_api.get( - feature_store_id=hudi_fg_alias.feature_group._feature_store_id, - name=hudi_fg_alias.feature_group.name, - version=hudi_fg_alias.feature_group.version, - ) - - save_empty_dataframe_callback(full_fg) - - self.register_temporary_table( - hudi_fg_alias, - read_options, - ) + if (hudi_fg_alias._feature_group.storage_connector is None): + fg_table_name = hudi_fg_alias.feature_group._get_table_name() + if sorted(self._spark_session.table(hudi_fg_alias.alias).columns) != sorted( + self._spark_session.table(fg_table_name).columns + ): + full_fg = self._feature_group_api.get( + feature_store_id=hudi_fg_alias.feature_group._feature_store_id, + name=hudi_fg_alias.feature_group.name, + version=hudi_fg_alias.feature_group.version, + ) + + save_empty_dataframe_callback(full_fg) + + self.register_temporary_table( + hudi_fg_alias, + read_options, + ) @staticmethod def _get_last_commit_metadata(spark_context, base_path): From d7ea12f84cd6a6379f9888460cb75e229f676eff Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 18 Sep 2024 19:02:59 +0300 Subject: [PATCH 12/33] small undo --- python/hsfs/engine/spark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index ca64522af..7465bb2fd 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -1228,7 +1228,7 @@ def _setup_s3_hadoop_conf(self, storage_connector, path): FS_S3_ENDPOINT, storage_connector.spark_options().get(FS_S3_ENDPOINT) ) - return path.replace("s3:", "s3a:", 1) if path is not None else None + return path.replace("s3", "s3a", 1) if path is not None else None def _setup_adls_hadoop_conf(self, storage_connector, path): for k, v in storage_connector.spark_options().items(): From 6db0d1c6737f743c6022c354d75e1492090dae76 Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 19 Sep 2024 11:48:17 +0300 Subject: [PATCH 13/33] remove textFile --- python/hsfs/engine/spark.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 7465bb2fd..3238a6dd4 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -204,8 +204,6 @@ def register_external_temporary_table(self, external_fg, alias): ) else: external_dataset = external_fg.dataframe - if external_fg.location: - self._spark_session.sparkContext.textFile(external_fg.location).collect() external_dataset.createOrReplaceTempView(alias) return external_dataset From 600b726956d8d8c745520015356d8753aa567176 Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 19 Sep 2024 12:02:14 +0300 Subject: [PATCH 14/33] register_temporary_table --- python/hsfs/core/hudi_engine.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/python/hsfs/core/hudi_engine.py b/python/hsfs/core/hudi_engine.py index 0e22d80de..3bbe8da6a 100644 --- a/python/hsfs/core/hudi_engine.py +++ b/python/hsfs/core/hudi_engine.py @@ -100,10 +100,17 @@ def delete_record(self, delete_df, write_options): return self._feature_group_api.commit(self._feature_group, fg_commit) def register_temporary_table(self, hudi_fg_alias, read_options): + if (self._feature_group.storage_connector is None): + location = self._feature_group.location + else: + location = self._feature_group.storage_connector.prepare_spark( + self._feature_group.storage_connector._get_path(self._feature_group.path) + ) + hudi_options = self._setup_hudi_read_opts(hudi_fg_alias, read_options) self._spark_session.read.format(self.HUDI_SPARK_FORMAT).options( **hudi_options - ).load(self._feature_group.location).createOrReplaceTempView( + ).load(location).createOrReplaceTempView( hudi_fg_alias.alias ) From d0738d46ed7e88cb4f6b44748dc42825a6b2dd2d Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 19 Sep 2024 14:47:14 +0300 Subject: [PATCH 15/33] add uri to FG --- python/hsfs/core/delta_engine.py | 16 +++++----- python/hsfs/core/hudi_engine.py | 53 ++++++++++++-------------------- python/hsfs/engine/spark.py | 25 +++++++++------ python/hsfs/feature_group.py | 10 ++++++ 4 files changed, 52 insertions(+), 52 deletions(-) diff --git a/python/hsfs/core/delta_engine.py b/python/hsfs/core/delta_engine.py index 56cce3a30..f5fa39e41 100644 --- a/python/hsfs/core/delta_engine.py +++ b/python/hsfs/core/delta_engine.py @@ -55,7 +55,7 @@ def register_temporary_table(self, delta_fg_alias, read_options): delta_options = self._setup_delta_read_opts(delta_fg_alias, read_options) self._spark_session.read.format(self.DELTA_SPARK_FORMAT).options( **delta_options - ).load(self._feature_group.location).createOrReplaceTempView( + ).load(self._feature_group.uri).createOrReplaceTempView( delta_fg_alias.alias ) @@ -86,14 +86,14 @@ def _setup_delta_read_opts(self, delta_fg_alias, read_options): def delete_record(self, delete_df): if not DeltaTable.isDeltaTable( - self._spark_session, self._feature_group.location + self._spark_session, self._feature_group.uri ): raise FeatureStoreException( f"This is no data available in Feature group {self._feature_group.name}, or it not DELTA enabled " ) else: fg_source_table = DeltaTable.forPath( - self._spark_session, self._feature_group.location + self._spark_session, self._feature_group.uri ) source_alias = ( @@ -109,7 +109,7 @@ def delete_record(self, delete_df): ).whenMatchedDelete().execute() fg_commit = self._get_last_commit_metadata( - self._spark_session, self._feature_group.location + self._spark_session, self._feature_group.uri ) return self._feature_group_api.commit(self._feature_group, fg_commit) @@ -118,7 +118,7 @@ def _write_delta_dataset(self, dataset, write_options): write_options = {} if not DeltaTable.isDeltaTable( - self._spark_session, self._feature_group.location + self._spark_session, self._feature_group.uri ): ( dataset.write.format(DeltaEngine.DELTA_SPARK_FORMAT) @@ -129,11 +129,11 @@ def _write_delta_dataset(self, dataset, write_options): else [] ) .mode("append") - .save(self._feature_group.location) + .save(self._feature_group.uri) ) else: fg_source_table = DeltaTable.forPath( - self._spark_session, self._feature_group.location + self._spark_session, self._feature_group.uri ) source_alias = ( @@ -149,7 +149,7 @@ def _write_delta_dataset(self, dataset, write_options): ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute() return self._get_last_commit_metadata( - self._spark_session, self._feature_group.location + self._spark_session, self._feature_group.uri ) def _generate_merge_query(self, source_alias, updates_alias): diff --git a/python/hsfs/core/hudi_engine.py b/python/hsfs/core/hudi_engine.py index 3bbe8da6a..f34143dc0 100644 --- a/python/hsfs/core/hudi_engine.py +++ b/python/hsfs/core/hudi_engine.py @@ -100,35 +100,21 @@ def delete_record(self, delete_df, write_options): return self._feature_group_api.commit(self._feature_group, fg_commit) def register_temporary_table(self, hudi_fg_alias, read_options): - if (self._feature_group.storage_connector is None): - location = self._feature_group.location - else: - location = self._feature_group.storage_connector.prepare_spark( - self._feature_group.storage_connector._get_path(self._feature_group.path) - ) - hudi_options = self._setup_hudi_read_opts(hudi_fg_alias, read_options) self._spark_session.read.format(self.HUDI_SPARK_FORMAT).options( **hudi_options - ).load(location).createOrReplaceTempView( + ).load(self._feature_group.uri).createOrReplaceTempView( hudi_fg_alias.alias ) def _write_hudi_dataset(self, dataset, save_mode, operation, write_options): - if (self._feature_group.storage_connector is None): - location = self._feature_group.location - else: - location = self._feature_group.storage_connector.prepare_spark( - self._feature_group.storage_connector._get_path(self._feature_group.path) - ) - hudi_options = self._setup_hudi_write_opts(operation, write_options) dataset.write.format(HudiEngine.HUDI_SPARK_FORMAT).options(**hudi_options).mode( save_mode - ).save(location) + ).save(self._feature_group.uri) feature_group_commit = self._get_last_commit_metadata( - self._spark_context, location + self._spark_context, self._feature_group.uri ) return feature_group_commit @@ -240,23 +226,22 @@ def _setup_hudi_read_opts(self, hudi_fg_alias, read_options): def reconcile_hudi_schema( self, save_empty_dataframe_callback, hudi_fg_alias, read_options ): - if (hudi_fg_alias._feature_group.storage_connector is None): - fg_table_name = hudi_fg_alias.feature_group._get_table_name() - if sorted(self._spark_session.table(hudi_fg_alias.alias).columns) != sorted( - self._spark_session.table(fg_table_name).columns - ): - full_fg = self._feature_group_api.get( - feature_store_id=hudi_fg_alias.feature_group._feature_store_id, - name=hudi_fg_alias.feature_group.name, - version=hudi_fg_alias.feature_group.version, - ) - - save_empty_dataframe_callback(full_fg) - - self.register_temporary_table( - hudi_fg_alias, - read_options, - ) + fg_table_name = hudi_fg_alias.feature_group._get_table_name() + if sorted(self._spark_session.table(hudi_fg_alias.alias).columns) != sorted( + self._spark_session.table(fg_table_name).columns + ): + full_fg = self._feature_group_api.get( + feature_store_id=hudi_fg_alias.feature_group._feature_store_id, + name=hudi_fg_alias.feature_group.name, + version=hudi_fg_alias.feature_group.version, + ) + + save_empty_dataframe_callback(full_fg) + + self.register_temporary_table( + hudi_fg_alias, + read_options, + ) @staticmethod def _get_last_commit_metadata(spark_context, base_path): diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 3238a6dd4..60914e390 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -193,17 +193,19 @@ def register_external_temporary_table(self, external_fg, alias): external_fg.query, external_fg.data_format, external_fg.options, - external_fg.storage_connector._get_path(external_fg.path), + external_fg.uri, ) - elif isinstance(external_fg, fg_mod.FeatureGroup): + elif isinstance(external_fg, fg_mod.SpineGroup): + external_dataset = external_fg.dataframe + else: external_dataset = external_fg.storage_connector.read( None, external_fg.time_travel_format, None, - external_fg.path, + external_fg.uri, ) - else: - external_dataset = external_fg.dataframe + if external_fg.location: + self._spark_session.sparkContext.textFile(external_fg.location).collect() external_dataset.createOrReplaceTempView(alias) return external_dataset @@ -218,13 +220,16 @@ def register_hudi_temporary_table( self._spark_context, self._spark_session, ) + hudi_engine_instance.register_temporary_table( hudi_fg_alias, read_options, ) - hudi_engine_instance.reconcile_hudi_schema( - self.save_empty_dataframe, hudi_fg_alias, read_options - ) + + if (hudi_fg_alias._feature_group.storage_connector is None): + hudi_engine_instance.reconcile_hudi_schema( + self.save_empty_dataframe, hudi_fg_alias, read_options + ) def register_delta_temporary_table( self, delta_fg_alias, feature_store_id, feature_store_name, read_options @@ -1262,12 +1267,12 @@ def add_cols_to_delta_table(self, feature_group, new_features): new_features_map[new_features.name] = lit("").cast(new_features.type) self._spark_session.read.format("delta").load( - feature_group.location + feature_group.uri ).withColumns(new_features_map).limit(0).write.format("delta").mode( "append" ).option("mergeSchema", "true").option( "spark.databricks.delta.schema.autoMerge.enabled", "true" - ).save(feature_group.location) + ).save(feature_group.uri) def _apply_transformation_function( self, diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 7333f0efc..628ecae03 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -2063,6 +2063,16 @@ def path(self) -> Optional[str]: @property def storage_connector(self) -> "sc.StorageConnector": return self._storage_connector + + @property + def uri(self) -> str: + """Location of data.""" + if (self.storage_connector is None): + return self.location + else: + return self.storage_connector.prepare_spark( + self.storage_connector._get_path(self.path) + ) @property def topic_name(self) -> Optional[str]: From f6da8a3c3751abb25b09605fb8272da35152b68a Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 19 Sep 2024 14:56:32 +0300 Subject: [PATCH 16/33] hive_sync --- python/hsfs/core/hudi_engine.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/hsfs/core/hudi_engine.py b/python/hsfs/core/hudi_engine.py index f34143dc0..facf12625 100644 --- a/python/hsfs/core/hudi_engine.py +++ b/python/hsfs/core/hudi_engine.py @@ -144,6 +144,9 @@ def _setup_hudi_write_opts(self, operation, write_options): else self._feature_group.primary_key[0] ) + # dont enable hive sync when using managed FG + hive_sync = self._feature_group.storage_connector is None + hudi_options = { self.HUDI_KEY_GENERATOR_OPT_KEY: self.HUDI_COMPLEX_KEY_GENERATOR_OPT_VAL, self.HUDI_PRECOMBINE_FIELD: pre_combine_key, @@ -153,7 +156,7 @@ def _setup_hudi_write_opts(self, operation, write_options): self.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY: self.DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL if len(partition_key) >= 1 else self.HIVE_NON_PARTITION_EXTRACTOR_CLASS_OPT_VAL, - self.HUDI_HIVE_SYNC_ENABLE: "true", + self.HUDI_HIVE_SYNC_ENABLE: hive_sync, self.HUDI_HIVE_SYNC_MODE: self.HUDI_HIVE_SYNC_MODE_VAL, self.HUDI_HIVE_SYNC_DB: self._feature_store_name, self.HUDI_HIVE_SYNC_TABLE: table_name, From d54c3701ed60a76badcc6b358aad6016cffa713e Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 19 Sep 2024 15:09:09 +0300 Subject: [PATCH 17/33] fix s3 replace --- python/hsfs/engine/spark.py | 2 +- python/hsfs/storage_connector.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 60914e390..bb6d7df7b 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -1231,7 +1231,7 @@ def _setup_s3_hadoop_conf(self, storage_connector, path): FS_S3_ENDPOINT, storage_connector.spark_options().get(FS_S3_ENDPOINT) ) - return path.replace("s3", "s3a", 1) if path is not None else None + return path.replace("s3://", "s3a://", 1) if path is not None else None def _setup_adls_hadoop_conf(self, storage_connector, path): for k, v in storage_connector.spark_options().items(): diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index ba7568233..aa483696b 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -397,7 +397,7 @@ def read( if options is not None else self.spark_options() ) - if not path.startswith("s3://"): + if not path.startswith(("s3://", "s3a://")): path = self._get_path(path) print( "Prepending default bucket specified on connector, final path: {}".format( From da9d1a49ea4e7725111a79e61364fb4dec9c7a87 Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 19 Sep 2024 15:11:35 +0300 Subject: [PATCH 18/33] fix ruff --- python/hsfs/feature_group.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 628ecae03..0f1090f4b 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -2063,7 +2063,7 @@ def path(self) -> Optional[str]: @property def storage_connector(self) -> "sc.StorageConnector": return self._storage_connector - + @property def uri(self) -> str: """Location of data.""" From 4ea01410c310d606e7da7b10e8c5f5413a255ab8 Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 19 Sep 2024 15:34:17 +0300 Subject: [PATCH 19/33] fix some tests --- python/hsfs/core/hudi_engine.py | 2 +- python/tests/engine/test_spark.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/hsfs/core/hudi_engine.py b/python/hsfs/core/hudi_engine.py index facf12625..7212dbe79 100644 --- a/python/hsfs/core/hudi_engine.py +++ b/python/hsfs/core/hudi_engine.py @@ -156,7 +156,7 @@ def _setup_hudi_write_opts(self, operation, write_options): self.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY: self.DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL if len(partition_key) >= 1 else self.HIVE_NON_PARTITION_EXTRACTOR_CLASS_OPT_VAL, - self.HUDI_HIVE_SYNC_ENABLE: hive_sync, + self.HUDI_HIVE_SYNC_ENABLE: str(hive_sync).lower(), self.HUDI_HIVE_SYNC_MODE: self.HUDI_HIVE_SYNC_MODE_VAL, self.HUDI_HIVE_SYNC_DB: self._feature_store_name, self.HUDI_HIVE_SYNC_TABLE: table_name, diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index f334de165..b24629737 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -4395,11 +4395,11 @@ def test_setup_s3_hadoop_conf(self, mocker): # Act result = spark_engine._setup_s3_hadoop_conf( storage_connector=s3_connector, - path="s3_test_path", + path="s3://_test_path", ) # Assert - assert result == "s3a_test_path" + assert result == "s3a://_test_path" assert ( mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.call_count == 7 From 91bea60e50c2acac4977826a06e8fa37a1ec44c9 Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 19 Sep 2024 15:45:59 +0300 Subject: [PATCH 20/33] get_uri() --- python/hsfs/core/delta_engine.py | 16 ++++++++-------- python/hsfs/core/hudi_engine.py | 6 +++--- python/hsfs/engine/spark.py | 8 ++++---- python/hsfs/feature_group.py | 3 +-- 4 files changed, 16 insertions(+), 17 deletions(-) diff --git a/python/hsfs/core/delta_engine.py b/python/hsfs/core/delta_engine.py index f5fa39e41..f0fcc67ba 100644 --- a/python/hsfs/core/delta_engine.py +++ b/python/hsfs/core/delta_engine.py @@ -55,7 +55,7 @@ def register_temporary_table(self, delta_fg_alias, read_options): delta_options = self._setup_delta_read_opts(delta_fg_alias, read_options) self._spark_session.read.format(self.DELTA_SPARK_FORMAT).options( **delta_options - ).load(self._feature_group.uri).createOrReplaceTempView( + ).load(self._feature_group.get_uri()).createOrReplaceTempView( delta_fg_alias.alias ) @@ -86,14 +86,14 @@ def _setup_delta_read_opts(self, delta_fg_alias, read_options): def delete_record(self, delete_df): if not DeltaTable.isDeltaTable( - self._spark_session, self._feature_group.uri + self._spark_session, self._feature_group.get_uri() ): raise FeatureStoreException( f"This is no data available in Feature group {self._feature_group.name}, or it not DELTA enabled " ) else: fg_source_table = DeltaTable.forPath( - self._spark_session, self._feature_group.uri + self._spark_session, self._feature_group.get_uri() ) source_alias = ( @@ -109,7 +109,7 @@ def delete_record(self, delete_df): ).whenMatchedDelete().execute() fg_commit = self._get_last_commit_metadata( - self._spark_session, self._feature_group.uri + self._spark_session, self._feature_group.get_uri() ) return self._feature_group_api.commit(self._feature_group, fg_commit) @@ -118,7 +118,7 @@ def _write_delta_dataset(self, dataset, write_options): write_options = {} if not DeltaTable.isDeltaTable( - self._spark_session, self._feature_group.uri + self._spark_session, self._feature_group.get_uri() ): ( dataset.write.format(DeltaEngine.DELTA_SPARK_FORMAT) @@ -129,11 +129,11 @@ def _write_delta_dataset(self, dataset, write_options): else [] ) .mode("append") - .save(self._feature_group.uri) + .save(self._feature_group.get_uri()) ) else: fg_source_table = DeltaTable.forPath( - self._spark_session, self._feature_group.uri + self._spark_session, self._feature_group.get_uri() ) source_alias = ( @@ -149,7 +149,7 @@ def _write_delta_dataset(self, dataset, write_options): ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute() return self._get_last_commit_metadata( - self._spark_session, self._feature_group.uri + self._spark_session, self._feature_group.get_uri() ) def _generate_merge_query(self, source_alias, updates_alias): diff --git a/python/hsfs/core/hudi_engine.py b/python/hsfs/core/hudi_engine.py index 7212dbe79..36070b91d 100644 --- a/python/hsfs/core/hudi_engine.py +++ b/python/hsfs/core/hudi_engine.py @@ -103,7 +103,7 @@ def register_temporary_table(self, hudi_fg_alias, read_options): hudi_options = self._setup_hudi_read_opts(hudi_fg_alias, read_options) self._spark_session.read.format(self.HUDI_SPARK_FORMAT).options( **hudi_options - ).load(self._feature_group.uri).createOrReplaceTempView( + ).load(self._feature_group.get_uri()).createOrReplaceTempView( hudi_fg_alias.alias ) @@ -111,10 +111,10 @@ def _write_hudi_dataset(self, dataset, save_mode, operation, write_options): hudi_options = self._setup_hudi_write_opts(operation, write_options) dataset.write.format(HudiEngine.HUDI_SPARK_FORMAT).options(**hudi_options).mode( save_mode - ).save(self._feature_group.uri) + ).save(self._feature_group.get_uri()) feature_group_commit = self._get_last_commit_metadata( - self._spark_context, self._feature_group.uri + self._spark_context, self._feature_group.get_uri() ) return feature_group_commit diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index bb6d7df7b..ff62c57ad 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -193,7 +193,7 @@ def register_external_temporary_table(self, external_fg, alias): external_fg.query, external_fg.data_format, external_fg.options, - external_fg.uri, + external_fg.get_uri(), ) elif isinstance(external_fg, fg_mod.SpineGroup): external_dataset = external_fg.dataframe @@ -202,7 +202,7 @@ def register_external_temporary_table(self, external_fg, alias): None, external_fg.time_travel_format, None, - external_fg.uri, + external_fg.get_uri(), ) if external_fg.location: self._spark_session.sparkContext.textFile(external_fg.location).collect() @@ -1267,12 +1267,12 @@ def add_cols_to_delta_table(self, feature_group, new_features): new_features_map[new_features.name] = lit("").cast(new_features.type) self._spark_session.read.format("delta").load( - feature_group.uri + feature_group.get_uri() ).withColumns(new_features_map).limit(0).write.format("delta").mode( "append" ).option("mergeSchema", "true").option( "spark.databricks.delta.schema.autoMerge.enabled", "true" - ).save(feature_group.uri) + ).save(feature_group.get_uri()) def _apply_transformation_function( self, diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 0f1090f4b..85d40016c 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -2064,8 +2064,7 @@ def path(self) -> Optional[str]: def storage_connector(self) -> "sc.StorageConnector": return self._storage_connector - @property - def uri(self) -> str: + def get_uri(self) -> str: """Location of data.""" if (self.storage_connector is None): return self.location From 3447281579122c5a30221b4d88c41d10c2b29e1c Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 19 Sep 2024 16:54:15 +0300 Subject: [PATCH 21/33] setup_storage_connector --- python/hsfs/feature_group.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 85d40016c..79c9ec20e 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -2069,9 +2069,10 @@ def get_uri(self) -> str: if (self.storage_connector is None): return self.location else: - return self.storage_connector.prepare_spark( - self.storage_connector._get_path(self.path) - ) + path = self.storage_connector._get_path(self.path) + if engine.get_type().startswith("spark"): + path = engine.get_instance().setup_storage_connector(self.storage_connector, path) + return path @property def topic_name(self) -> Optional[str]: From 700e2a8adeaa42273a0ef7bddd52429bdc5f6b0f Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 19 Sep 2024 17:08:49 +0300 Subject: [PATCH 22/33] fix merge --- python/hsfs/engine/spark.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 8eb741d00..3798afaf0 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -1206,7 +1206,7 @@ def _setup_s3_hadoop_conf(self, storage_connector, path): self._set_s3_hadoop_conf( storage_connector, f"fs.s3a.bucket.{storage_connector.bucket}" ) - return path.replace("s3", "s3a", 1) if path is not None else None + return path.replace("s3://", "s3a://", 1) if path is not None else None def _set_s3_hadoop_conf(self, storage_connector, prefix): if storage_connector.access_key: @@ -1246,8 +1246,6 @@ def _set_s3_hadoop_conf(self, storage_connector, prefix): storage_connector.spark_options().get(FS_S3_ENDPOINT), ) - return path.replace("s3://", "s3a://", 1) if path is not None else None - def _setup_adls_hadoop_conf(self, storage_connector, path): for k, v in storage_connector.spark_options().items(): self._spark_context._jsc.hadoopConfiguration().set(k, v) From 59f2c98d9397b904dd8fa68fe1c8d44d0c084e34 Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 19 Sep 2024 17:14:33 +0300 Subject: [PATCH 23/33] test fix --- python/tests/engine/test_spark.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index c3585e001..3fb939064 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -4453,11 +4453,11 @@ def test_setup_s3_hadoop_conf_bucket_scope(self, mocker): # Act result = spark_engine._setup_s3_hadoop_conf( storage_connector=s3_connector, - path="s3_test_path", + path="s3://_test_path", ) # Assert - assert result == "s3a_test_path" + assert result == "s3a://_test_path" assert ( mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.call_count == 14 From 9ddd618ef72a2a06a014cf74af47d08a5f9eabff Mon Sep 17 00:00:00 2001 From: Ralf Date: Mon, 23 Sep 2024 18:42:24 +0300 Subject: [PATCH 24/33] small fixes * tmp1 * tmp 2 * tmp 3 * prepare_spark --- python/hsfs/constructor/fs_query.py | 25 ++++++++++--------------- python/hsfs/engine/spark.py | 11 ++--------- python/hsfs/feature_group.py | 2 +- 3 files changed, 13 insertions(+), 25 deletions(-) diff --git a/python/hsfs/constructor/fs_query.py b/python/hsfs/constructor/fs_query.py index fb3c4ddf1..b944006f3 100644 --- a/python/hsfs/constructor/fs_query.py +++ b/python/hsfs/constructor/fs_query.py @@ -99,21 +99,16 @@ def register_external( Union[TypeVar("pyspark.sql.DataFrame"), TypeVar("pyspark.RDD")] ] = None, ) -> None: - if self._on_demand_fg_aliases is not None: - for external_fg_alias in self._on_demand_fg_aliases: - if type(external_fg_alias.on_demand_feature_group).__name__ == "SpineGroup": - external_fg_alias.on_demand_feature_group.dataframe = spine - engine.get_instance().register_external_temporary_table( - external_fg_alias.on_demand_feature_group, - external_fg_alias.alias, - ) - - if self._hudi_cached_feature_groups is not None: - for external_fg_alias in self._hudi_cached_feature_groups: - engine.get_instance().register_external_temporary_table( - external_fg_alias.feature_group, - external_fg_alias.alias, - ) + if self._on_demand_fg_aliases is None: + return + + for external_fg_alias in self._on_demand_fg_aliases: + if type(external_fg_alias.on_demand_feature_group).__name__ == "SpineGroup": + external_fg_alias.on_demand_feature_group.dataframe = spine + engine.get_instance().register_external_temporary_table( + external_fg_alias.on_demand_feature_group, + external_fg_alias.alias, + ) def register_hudi_tables( self, diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 3798afaf0..8819a91a1 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -188,22 +188,15 @@ def set_job_group(self, group_id, description): self._spark_session.sparkContext.setJobGroup(group_id, description) def register_external_temporary_table(self, external_fg, alias): - if isinstance(external_fg, fg_mod.ExternalFeatureGroup): + if not isinstance(external_fg, fg_mod.SpineGroup): external_dataset = external_fg.storage_connector.read( external_fg.query, external_fg.data_format, external_fg.options, external_fg.get_uri(), ) - elif isinstance(external_fg, fg_mod.SpineGroup): - external_dataset = external_fg.dataframe else: - external_dataset = external_fg.storage_connector.read( - None, - external_fg.time_travel_format, - None, - external_fg.get_uri(), - ) + external_dataset = external_fg.dataframe if external_fg.location: self._spark_session.sparkContext.textFile(external_fg.location).collect() diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index ed09c56df..10ac668f7 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -2074,7 +2074,7 @@ def get_uri(self) -> str: else: path = self.storage_connector._get_path(self.path) if engine.get_type().startswith("spark"): - path = engine.get_instance().setup_storage_connector(self.storage_connector, path) + path = self.storage_connector.prepare_spark(path) return path @property From 44d2f6f9e55f9ee31a24135ad82b02f2befc778f Mon Sep 17 00:00:00 2001 From: Ralf Date: Mon, 23 Sep 2024 21:10:30 +0300 Subject: [PATCH 25/33] fix reconcile and prepare_spark --- python/hsfs/core/feature_group_engine.py | 2 +- python/hsfs/core/hudi_engine.py | 7 +++++-- python/hsfs/engine/python.py | 2 +- python/hsfs/engine/spark.py | 24 ++++++++++++++++-------- python/hsfs/storage_connector.py | 6 ++++++ python/tests/engine/test_spark.py | 6 ++++-- 6 files changed, 33 insertions(+), 14 deletions(-) diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index 63184695f..166c3287c 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -285,7 +285,7 @@ def append_features(self, feature_group, new_features): if feature_group.time_travel_format == "DELTA": engine.get_instance().add_cols_to_delta_table(feature_group, new_features) else: - engine.get_instance().save_empty_dataframe(feature_group) + engine.get_instance().save_empty_dataframe(feature_group, new_features=new_features) def update_description(self, feature_group, description): """Updates the description of a feature group.""" diff --git a/python/hsfs/core/hudi_engine.py b/python/hsfs/core/hudi_engine.py index 36070b91d..aa301a6f4 100644 --- a/python/hsfs/core/hudi_engine.py +++ b/python/hsfs/core/hudi_engine.py @@ -20,6 +20,10 @@ class HudiEngine: + + HUDI_SPEC_FEATURE_NAMES = ["_hoodie_record_key", "_hoodie_partition_path", + "_hoodie_commit_time", "_hoodie_file_name", "_hoodie_commit_seqno"] + HUDI_SPARK_FORMAT = "org.apache.hudi" HUDI_TABLE_NAME = "hoodie.table.name" HUDI_TABLE_STORAGE_TYPE = "hoodie.datasource.write.storage.type" @@ -229,9 +233,8 @@ def _setup_hudi_read_opts(self, hudi_fg_alias, read_options): def reconcile_hudi_schema( self, save_empty_dataframe_callback, hudi_fg_alias, read_options ): - fg_table_name = hudi_fg_alias.feature_group._get_table_name() if sorted(self._spark_session.table(hudi_fg_alias.alias).columns) != sorted( - self._spark_session.table(fg_table_name).columns + [feature.name for feature in hudi_fg_alias.feature_group._features] + self.HUDI_SPEC_FEATURE_NAMES ): full_fg = self._feature_group_api.get( feature_store_id=hudi_fg_alias.feature_group._feature_store_id, diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 81fae1c6f..d472f3866 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1182,7 +1182,7 @@ def save_stream_dataframe( ) def save_empty_dataframe( - self, feature_group: Union[FeatureGroup, ExternalFeatureGroup] + self, feature_group: Union[FeatureGroup, ExternalFeatureGroup], new_features=None ) -> None: """Wrapper around save_dataframe in order to provide no-op.""" pass diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 8819a91a1..7ae445d9d 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -219,10 +219,9 @@ def register_hudi_temporary_table( read_options, ) - if (hudi_fg_alias._feature_group.storage_connector is None): - hudi_engine_instance.reconcile_hudi_schema( - self.save_empty_dataframe, hudi_fg_alias, read_options - ) + hudi_engine_instance.reconcile_hudi_schema( + self.save_empty_dataframe, hudi_fg_alias, read_options + ) def register_delta_temporary_table( self, delta_fg_alias, feature_store_id, feature_store_name, read_options @@ -1250,13 +1249,22 @@ def is_spark_dataframe(self, dataframe): return True return False - def save_empty_dataframe(self, feature_group): - fg_table_name = feature_group._get_table_name() - dataframe = self._spark_session.table(fg_table_name).limit(0) + def save_empty_dataframe(self, feature_group, new_features=None): + dataframe = self._spark_session.read.format("hudi").load( + feature_group.get_uri() + ) + + if (new_features is not None): + if isinstance(new_features, list): + for new_feature in new_features: + dataframe = dataframe.withColumn(new_feature.name, lit("").cast(new_feature.type)) + else: + dataframe = dataframe.withColumn(new_features.name, lit("").cast(new_features.type)) + self.save_dataframe( feature_group, - dataframe, + dataframe.limit(0), "upsert", feature_group.online_enabled, "offline", diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index eeeca4d2a..750d25c45 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -134,6 +134,12 @@ def description(self) -> Optional[str]: def spark_options(self) -> None: pass + def prepare_spark(self, path: Optional[str] = None) -> Optional[str]: + _logger.info( + "This Storage Connector cannot be prepare for Spark." + ) + return path + def read( self, query: Optional[str] = None, diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index 3fb939064..e602f53e0 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -4559,7 +4559,9 @@ def test_save_empty_dataframe(self, mocker): mock_spark_engine_save_dataframe = mocker.patch( "hsfs.engine.spark.Engine.save_dataframe" ) - mock_spark_table = mocker.patch("pyspark.sql.session.SparkSession.table") + mock_spark_read = mocker.patch("pyspark.sql.SparkSession.read") + mock_format = mocker.Mock() + mock_spark_read.format.return_value = mock_format # Arrange spark_engine = spark.Engine() @@ -4579,7 +4581,7 @@ def test_save_empty_dataframe(self, mocker): # Assert assert mock_spark_engine_save_dataframe.call_count == 1 - assert mock_spark_table.call_count == 1 + assert mock_spark_read.format.call_count == 1 def test_apply_transformation_function_single_output(self, mocker): # Arrange From 8aeeb03f3b8f604da977d49ee00f6b3c088f7af0 Mon Sep 17 00:00:00 2001 From: bubriks Date: Mon, 23 Sep 2024 21:21:52 +0300 Subject: [PATCH 26/33] fix add_cols_to_delta_table --- python/hsfs/engine/spark.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 7ae445d9d..4ee42e404 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -1273,16 +1273,18 @@ def save_empty_dataframe(self, feature_group, new_features=None): ) def add_cols_to_delta_table(self, feature_group, new_features): - new_features_map = {} - if isinstance(new_features, list): - for new_feature in new_features: - new_features_map[new_feature.name] = lit("").cast(new_feature.type) - else: - new_features_map[new_features.name] = lit("").cast(new_features.type) - - self._spark_session.read.format("delta").load( + dataframe = self._spark_session.read.format("delta").load( feature_group.get_uri() - ).withColumns(new_features_map).limit(0).write.format("delta").mode( + ) + + if (new_features is not None): + if isinstance(new_features, list): + for new_feature in new_features: + dataframe = dataframe.withColumn(new_feature.name, lit("").cast(new_feature.type)) + else: + dataframe = dataframe.withColumn(new_features.name, lit("").cast(new_features.type)) + + dataframe.limit(0).write.format("delta").mode( "append" ).option("mergeSchema", "true").option( "spark.databricks.delta.schema.autoMerge.enabled", "true" From d852a3477d70802f233372078f3f2cd5b77f0681 Mon Sep 17 00:00:00 2001 From: Ralf Date: Tue, 24 Sep 2024 11:29:08 +0300 Subject: [PATCH 27/33] fix schema evolution hudi --- python/hsfs/engine/spark.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 4ee42e404..4e966af0a 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -1257,9 +1257,9 @@ def save_empty_dataframe(self, feature_group, new_features=None): if (new_features is not None): if isinstance(new_features, list): for new_feature in new_features: - dataframe = dataframe.withColumn(new_feature.name, lit("").cast(new_feature.type)) + dataframe = dataframe.withColumn(new_feature.name, lit(None).cast(new_feature.type)) else: - dataframe = dataframe.withColumn(new_features.name, lit("").cast(new_features.type)) + dataframe = dataframe.withColumn(new_features.name, lit(None).cast(new_features.type)) self.save_dataframe( From c92a1d5199b623857d6dcfb2721acd763369d24a Mon Sep 17 00:00:00 2001 From: Ralf Date: Tue, 24 Sep 2024 15:46:18 +0300 Subject: [PATCH 28/33] reduce number of get_uri calls --- python/hsfs/core/delta_engine.py | 18 +++++++++++------- python/hsfs/core/hudi_engine.py | 6 ++++-- python/hsfs/engine/spark.py | 8 ++++---- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/python/hsfs/core/delta_engine.py b/python/hsfs/core/delta_engine.py index f0fcc67ba..99653c83e 100644 --- a/python/hsfs/core/delta_engine.py +++ b/python/hsfs/core/delta_engine.py @@ -85,15 +85,17 @@ def _setup_delta_read_opts(self, delta_fg_alias, read_options): return delta_options def delete_record(self, delete_df): + uri = self._feature_group.get_uri() + if not DeltaTable.isDeltaTable( - self._spark_session, self._feature_group.get_uri() + self._spark_session, uri ): raise FeatureStoreException( f"This is no data available in Feature group {self._feature_group.name}, or it not DELTA enabled " ) else: fg_source_table = DeltaTable.forPath( - self._spark_session, self._feature_group.get_uri() + self._spark_session, uri ) source_alias = ( @@ -109,16 +111,18 @@ def delete_record(self, delete_df): ).whenMatchedDelete().execute() fg_commit = self._get_last_commit_metadata( - self._spark_session, self._feature_group.get_uri() + self._spark_session, uri ) return self._feature_group_api.commit(self._feature_group, fg_commit) def _write_delta_dataset(self, dataset, write_options): + uri = self._feature_group.get_uri() + if write_options is None: write_options = {} if not DeltaTable.isDeltaTable( - self._spark_session, self._feature_group.get_uri() + self._spark_session, uri ): ( dataset.write.format(DeltaEngine.DELTA_SPARK_FORMAT) @@ -129,11 +133,11 @@ def _write_delta_dataset(self, dataset, write_options): else [] ) .mode("append") - .save(self._feature_group.get_uri()) + .save(uri) ) else: fg_source_table = DeltaTable.forPath( - self._spark_session, self._feature_group.get_uri() + self._spark_session, uri ) source_alias = ( @@ -149,7 +153,7 @@ def _write_delta_dataset(self, dataset, write_options): ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute() return self._get_last_commit_metadata( - self._spark_session, self._feature_group.get_uri() + self._spark_session, uri ) def _generate_merge_query(self, source_alias, updates_alias): diff --git a/python/hsfs/core/hudi_engine.py b/python/hsfs/core/hudi_engine.py index aa301a6f4..d51a769f6 100644 --- a/python/hsfs/core/hudi_engine.py +++ b/python/hsfs/core/hudi_engine.py @@ -112,13 +112,15 @@ def register_temporary_table(self, hudi_fg_alias, read_options): ) def _write_hudi_dataset(self, dataset, save_mode, operation, write_options): + uri = self._feature_group.get_uri() + hudi_options = self._setup_hudi_write_opts(operation, write_options) dataset.write.format(HudiEngine.HUDI_SPARK_FORMAT).options(**hudi_options).mode( save_mode - ).save(self._feature_group.get_uri()) + ).save(uri) feature_group_commit = self._get_last_commit_metadata( - self._spark_context, self._feature_group.get_uri() + self._spark_context, uri ) return feature_group_commit diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 4e966af0a..50ef01a7c 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -1273,9 +1273,9 @@ def save_empty_dataframe(self, feature_group, new_features=None): ) def add_cols_to_delta_table(self, feature_group, new_features): - dataframe = self._spark_session.read.format("delta").load( - feature_group.get_uri() - ) + uri = self._feature_group.get_uri() + + dataframe = self._spark_session.read.format("delta").load(uri) if (new_features is not None): if isinstance(new_features, list): @@ -1288,7 +1288,7 @@ def add_cols_to_delta_table(self, feature_group, new_features): "append" ).option("mergeSchema", "true").option( "spark.databricks.delta.schema.autoMerge.enabled", "true" - ).save(feature_group.get_uri()) + ).save(uri) def _apply_transformation_function( self, From 4447e3b5a6eeb90fc86b0529793ef86878c08c14 Mon Sep 17 00:00:00 2001 From: Ralf Date: Wed, 25 Sep 2024 20:49:41 +0300 Subject: [PATCH 29/33] get_uri -> prepare_spark_location --- .../hsfs/spark/engine/SparkEngine.java | 4 -- python/hsfs/core/delta_engine.py | 22 +++++----- python/hsfs/core/hudi_engine.py | 10 +++-- python/hsfs/engine/spark.py | 16 ++++---- python/hsfs/feature_group.py | 14 +++---- python/tests/engine/test_spark.py | 40 ------------------- 6 files changed, 30 insertions(+), 76 deletions(-) diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java index b30bca98c..1aed9a650 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java @@ -219,10 +219,6 @@ public Dataset registerOnDemandTemporaryTable(ExternalFeatureGroup onDemand ? onDemandFeatureGroup.getDataFormat().toString() : null, getOnDemandOptions(onDemandFeatureGroup), onDemandFeatureGroup.getStorageConnector().getPath(onDemandFeatureGroup.getPath())); - if (!Strings.isNullOrEmpty(onDemandFeatureGroup.getLocation())) { - sparkSession.sparkContext().textFile(onDemandFeatureGroup.getLocation(), 0).collect(); - } - dataset.createOrReplaceTempView(alias); return dataset; } diff --git a/python/hsfs/core/delta_engine.py b/python/hsfs/core/delta_engine.py index 99653c83e..939289c3a 100644 --- a/python/hsfs/core/delta_engine.py +++ b/python/hsfs/core/delta_engine.py @@ -52,10 +52,12 @@ def save_delta_fg(self, dataset, write_options, validation_id=None): return self._feature_group_api.commit(self._feature_group, fg_commit) def register_temporary_table(self, delta_fg_alias, read_options): + location = self._feature_group.prepare_spark_location() + delta_options = self._setup_delta_read_opts(delta_fg_alias, read_options) self._spark_session.read.format(self.DELTA_SPARK_FORMAT).options( **delta_options - ).load(self._feature_group.get_uri()).createOrReplaceTempView( + ).load(location).createOrReplaceTempView( delta_fg_alias.alias ) @@ -85,17 +87,17 @@ def _setup_delta_read_opts(self, delta_fg_alias, read_options): return delta_options def delete_record(self, delete_df): - uri = self._feature_group.get_uri() + location = self._feature_group.prepare_spark_location() if not DeltaTable.isDeltaTable( - self._spark_session, uri + self._spark_session, location ): raise FeatureStoreException( f"This is no data available in Feature group {self._feature_group.name}, or it not DELTA enabled " ) else: fg_source_table = DeltaTable.forPath( - self._spark_session, uri + self._spark_session, location ) source_alias = ( @@ -111,18 +113,18 @@ def delete_record(self, delete_df): ).whenMatchedDelete().execute() fg_commit = self._get_last_commit_metadata( - self._spark_session, uri + self._spark_session, location ) return self._feature_group_api.commit(self._feature_group, fg_commit) def _write_delta_dataset(self, dataset, write_options): - uri = self._feature_group.get_uri() + location = self._feature_group.prepare_spark_location() if write_options is None: write_options = {} if not DeltaTable.isDeltaTable( - self._spark_session, uri + self._spark_session, location ): ( dataset.write.format(DeltaEngine.DELTA_SPARK_FORMAT) @@ -133,11 +135,11 @@ def _write_delta_dataset(self, dataset, write_options): else [] ) .mode("append") - .save(uri) + .save(location) ) else: fg_source_table = DeltaTable.forPath( - self._spark_session, uri + self._spark_session, location ) source_alias = ( @@ -153,7 +155,7 @@ def _write_delta_dataset(self, dataset, write_options): ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute() return self._get_last_commit_metadata( - self._spark_session, uri + self._spark_session, location ) def _generate_merge_query(self, source_alias, updates_alias): diff --git a/python/hsfs/core/hudi_engine.py b/python/hsfs/core/hudi_engine.py index d51a769f6..4492f0a19 100644 --- a/python/hsfs/core/hudi_engine.py +++ b/python/hsfs/core/hudi_engine.py @@ -104,23 +104,25 @@ def delete_record(self, delete_df, write_options): return self._feature_group_api.commit(self._feature_group, fg_commit) def register_temporary_table(self, hudi_fg_alias, read_options): + location = self._feature_group.prepare_spark_location() + hudi_options = self._setup_hudi_read_opts(hudi_fg_alias, read_options) self._spark_session.read.format(self.HUDI_SPARK_FORMAT).options( **hudi_options - ).load(self._feature_group.get_uri()).createOrReplaceTempView( + ).load(location).createOrReplaceTempView( hudi_fg_alias.alias ) def _write_hudi_dataset(self, dataset, save_mode, operation, write_options): - uri = self._feature_group.get_uri() + location = self._feature_group.prepare_spark_location() hudi_options = self._setup_hudi_write_opts(operation, write_options) dataset.write.format(HudiEngine.HUDI_SPARK_FORMAT).options(**hudi_options).mode( save_mode - ).save(uri) + ).save(location) feature_group_commit = self._get_last_commit_metadata( - self._spark_context, uri + self._spark_context, location ) return feature_group_commit diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 50ef01a7c..85071df22 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -193,12 +193,10 @@ def register_external_temporary_table(self, external_fg, alias): external_fg.query, external_fg.data_format, external_fg.options, - external_fg.get_uri(), + external_fg.prepare_spark_location(), ) else: external_dataset = external_fg.dataframe - if external_fg.location: - self._spark_session.sparkContext.textFile(external_fg.location).collect() external_dataset.createOrReplaceTempView(alias) return external_dataset @@ -1250,9 +1248,9 @@ def is_spark_dataframe(self, dataframe): return False def save_empty_dataframe(self, feature_group, new_features=None): - dataframe = self._spark_session.read.format("hudi").load( - feature_group.get_uri() - ) + location = feature_group.prepare_spark_location() + + dataframe = self._spark_session.read.format("hudi").load(location) if (new_features is not None): if isinstance(new_features, list): @@ -1273,9 +1271,9 @@ def save_empty_dataframe(self, feature_group, new_features=None): ) def add_cols_to_delta_table(self, feature_group, new_features): - uri = self._feature_group.get_uri() + location = feature_group.prepare_spark_location() - dataframe = self._spark_session.read.format("delta").load(uri) + dataframe = self._spark_session.read.format("delta").load(location) if (new_features is not None): if isinstance(new_features, list): @@ -1288,7 +1286,7 @@ def add_cols_to_delta_table(self, feature_group, new_features): "append" ).option("mergeSchema", "true").option( "spark.databricks.delta.schema.autoMerge.enabled", "true" - ).save(uri) + ).save(location) def _apply_transformation_function( self, diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 10ac668f7..575d58022 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -2067,15 +2067,11 @@ def path(self) -> Optional[str]: def storage_connector(self) -> "sc.StorageConnector": return self._storage_connector - def get_uri(self) -> str: - """Location of data.""" - if (self.storage_connector is None): - return self.location - else: - path = self.storage_connector._get_path(self.path) - if engine.get_type().startswith("spark"): - path = self.storage_connector.prepare_spark(path) - return path + def prepare_spark_location(self) -> str: + location = self.location + if (self.storage_connector is not None): + location = self.storage_connector.prepare_spark(location) + return location @property def topic_name(self) -> Optional[str]: diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index e602f53e0..488232dee 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -174,43 +174,6 @@ def test_register_external_temporary_table(self, mocker): # Arrange mocker.patch("hopsworks_common.client.get_instance") mock_sc_read = mocker.patch("hsfs.storage_connector.JdbcConnector.read") - mock_pyspark_getOrCreate = mocker.patch( - "pyspark.sql.session.SparkSession.builder.getOrCreate" - ) - - spark_engine = spark.Engine() - - jdbc_connector = storage_connector.JdbcConnector( - id=1, - name="test_connector", - featurestore_id=1, - connection_string="", - arguments="", - ) - - external_fg = feature_group.ExternalFeatureGroup( - storage_connector=jdbc_connector, id=10 - ) - - # Act - spark_engine.register_external_temporary_table( - external_fg=external_fg, - alias=None, - ) - - # Assert - assert ( - mock_pyspark_getOrCreate.return_value.sparkContext.textFile.call_count == 0 - ) - assert mock_sc_read.return_value.createOrReplaceTempView.call_count == 1 - - def test_register_external_temporary_table_external_fg_location(self, mocker): - # Arrange - mocker.patch("hopsworks_common.client.get_instance") - mock_sc_read = mocker.patch("hsfs.storage_connector.JdbcConnector.read") - mock_pyspark_getOrCreate = mocker.patch( - "pyspark.sql.session.SparkSession.builder.getOrCreate" - ) spark_engine = spark.Engine() @@ -233,9 +196,6 @@ def test_register_external_temporary_table_external_fg_location(self, mocker): ) # Assert - assert ( - mock_pyspark_getOrCreate.return_value.sparkContext.textFile.call_count == 1 - ) assert mock_sc_read.return_value.createOrReplaceTempView.call_count == 1 def test_register_hudi_temporary_table(self, mocker): From 0303ad90d354864a3575582ce79ddabac406790a Mon Sep 17 00:00:00 2001 From: Ralf Date: Mon, 30 Sep 2024 18:26:19 +0300 Subject: [PATCH 30/33] add clean fg (#7) --- python/hsfs/core/delta_engine.py | 9 +++++++++ python/hsfs/core/feature_group_engine.py | 14 ++++++++++++++ python/hsfs/engine/python.py | 6 ++++++ python/hsfs/feature_group.py | 24 ++++++++++++++++++++++++ utils/python/hsfs_utils.py | 15 +++++++++++++++ 5 files changed, 68 insertions(+) diff --git a/python/hsfs/core/delta_engine.py b/python/hsfs/core/delta_engine.py index 939289c3a..7960b2ec6 100644 --- a/python/hsfs/core/delta_engine.py +++ b/python/hsfs/core/delta_engine.py @@ -158,6 +158,15 @@ def _write_delta_dataset(self, dataset, write_options): self._spark_session, location ) + def vacuum(self, retention_hours): + location = self._feature_group.prepare_spark_location() + + delta_table = DeltaTable.forPath(self._spark_session, location) + + # Vacuum the table + # https://docs.delta.io/1.0.1/api/python/index.html#delta.tables.DeltaTable.vacuum + delta_table.vacuum(retention_hours) + def _generate_merge_query(self, source_alias, updates_alias): merge_query_list = [] primary_key = self._feature_group.primary_key diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index 5248c0b20..234829879 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -246,6 +246,20 @@ def commit_delete(feature_group, delete_df, write_options): ) return hudi_engine_instance.delete_record(delete_df, write_options) + @staticmethod + def clean(feature_group, write_options): + if feature_group.time_travel_format == "DELTA": + delta_engine_instance = delta_engine.DeltaEngine( + feature_group.feature_store_id, + feature_group.feature_store_name, + feature_group, + engine.get_instance()._spark_session, + engine.get_instance()._spark_context, + ) + return delta_engine_instance.vacuum(write_options.get("retention_hours", None)) + else: + return None + def sql(self, query, feature_store_name, dataframe_type, online, read_options): if online and self._online_conn is None: self._online_conn = self._storage_connector_api.get_online_connector( diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index cb4c5151f..512803ebd 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -481,6 +481,12 @@ def register_external_temporary_table( # No op to avoid query failure pass + def register_delta_temporary_table( + self, delta_fg_alias, feature_store_id, feature_store_name, read_options + ): + # No op to avoid query failure + pass + def register_hudi_temporary_table( self, hudi_fg_alias: "hsfs.constructor.hudi_feature_group_alias.HudiFeatureGroupAlias", diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 941aebf19..f47f1d962 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -3277,6 +3277,30 @@ def commit_delete_record( """ self._feature_group_engine.commit_delete(self, delete_df, write_options or {}) + def clean( + self, + write_options: Optional[Dict[Any, Any]] = None, + ) -> None: + """ Clean up old files. This method can only be used on feature groups stored as DELTA. + + !!! example + ```python + # connect to the Feature Store + fs = ... + + # get the Feature Group instance + fg = fs.get_or_create_feature_group(...) + + commit_details = fg.clean(write_options = {"retention_hours": 100}) + + # Arguments + write_options: User provided write options. Defaults to `{}`. + + # Raises + `hsfs.client.exceptions.RestAPIError`. + """ + self._feature_group_engine.clean(self, write_options or {}) + def as_of( self, wallclock_time: Optional[Union[str, int, datetime, date]] = None, diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index 5af468873..4e73f1604 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -247,6 +247,18 @@ def run_feature_monitoring(job_conf: Dict[str, str]) -> None: raise e +def clean_fg(spark: SparkSession, job_conf: Dict[Any, Any]) -> None: + """ + Run clean on a feature group. + """ + feature_store = job_conf.pop("feature_store") + fs = get_feature_store_handle(feature_store) + + entity = fs.get_feature_group(name=job_conf["name"], version=job_conf["version"]) + + entity.clean() + + if __name__ == "__main__": # Setup spark first so it fails faster in case of args errors # Otherwise the resource manager will wait until the spark application master @@ -265,6 +277,7 @@ def run_feature_monitoring(job_conf: Dict[str, str]) -> None: "ge_validate", "import_fg", "run_feature_monitoring", + "clean_fg", ], help="Operation type", ) @@ -303,6 +316,8 @@ def parse_isoformat_date(da: str) -> datetime: import_fg(job_conf) elif args.op == "run_feature_monitoring": run_feature_monitoring(job_conf) + elif args.op == "clean_fg": + clean_fg(spark, job_conf) success = True except Exception: From 626fb1277274f381dc2cb992b10f1ca66fa49b25 Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 2 Oct 2024 17:21:14 +0300 Subject: [PATCH 31/33] add tests for clean --- .../tests/core/test_feature_group_engine.py | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/python/tests/core/test_feature_group_engine.py b/python/tests/core/test_feature_group_engine.py index 2847d5219..2ab5dad16 100644 --- a/python/tests/core/test_feature_group_engine.py +++ b/python/tests/core/test_feature_group_engine.py @@ -525,6 +525,54 @@ def test_commit_delete(self, mocker): # Assert assert mock_hudi_engine.return_value.delete_record.call_count == 1 + def test_clean_delta(self, mocker): + # Arrange + feature_store_id = 99 + + mocker.patch("hsfs.engine.get_instance") + mock_hudi_engine = mocker.patch("hsfs.core.delta_engine.DeltaEngine") + + fg_engine = feature_group_engine.FeatureGroupEngine( + feature_store_id=feature_store_id + ) + + fg = feature_group.FeatureGroup( + name="test", + version=1, + featurestore_id=feature_store_id, + primary_key=[], + partition_key=[], + id=10, + time_travel_format="DELTA", + ) + + # Act + fg_engine.clean(feature_group=fg, write_options={}) + + # Assert + assert mock_hudi_engine.return_value.vacuum.call_count == 1 + + def test_clean_hudi(self, mocker): + # Arrange + feature_store_id = 99 + + fg_engine = feature_group_engine.FeatureGroupEngine( + feature_store_id=feature_store_id + ) + + fg = feature_group.FeatureGroup( + name="test", + version=1, + featurestore_id=feature_store_id, + primary_key=[], + partition_key=[], + id=10, + time_travel_format="HUDI", + ) + + # Act + fg_engine.clean(feature_group=fg, write_options={}) + def test_sql(self, mocker): # Arrange feature_store_id = 99 From de7e29a205ebd389acf06fd68676daf4f0abe428 Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 2 Oct 2024 17:45:09 +0300 Subject: [PATCH 32/33] test_prepare_spark_location --- python/tests/test_feature_group.py | 49 +++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/python/tests/test_feature_group.py b/python/tests/test_feature_group.py index bb2944f97..5e01b5a10 100644 --- a/python/tests/test_feature_group.py +++ b/python/tests/test_feature_group.py @@ -32,7 +32,7 @@ ) from hsfs.client.exceptions import FeatureStoreException, RestAPIError from hsfs.core.constants import HAS_GREAT_EXPECTATIONS -from hsfs.engine import python +from hsfs.engine import python, spark from hsfs.transformation_function import TransformationType @@ -908,3 +908,50 @@ def test_from_response_json_transformation_functions(self, backend_fixtures): assert ( fg.expectation_suite.expectation_suite_name == "test_expectation_suite_name" ) + + def test_prepare_spark_location(self, mocker, backend_fixtures): + # Arrange + engine = spark.Engine() + engine_instance = mocker.patch("hsfs.engine.get_instance", return_value=engine) + json = backend_fixtures["feature_group"]["get_basic_info"]["response"] + fg = feature_group.FeatureGroup.from_response_json(json) + fg._location = f"{fg.name}_{fg.version}" + + # Act + path = fg.prepare_spark_location() + + # Assert + assert fg.location == path + engine_instance.assert_not_called() + + def test_prepare_spark_location_with_s3_connector(self, mocker, backend_fixtures): + # Arrange + engine = spark.Engine() + engine_instance = mocker.patch("hsfs.engine.get_instance", return_value=engine) + json = backend_fixtures["feature_group"]["get_basic_info"]["response"] + fg = feature_group.FeatureGroup.from_response_json(json) + fg._location = f"{fg.name}_{fg.version}" + fg._storage_connector = storage_connector.S3Connector(id=1, name="s3_conn", featurestore_id=fg.feature_store_id) + + # Act + path = fg.prepare_spark_location() + + # Assert + assert fg.location == path + engine_instance.assert_called_once() + + def test_prepare_spark_location_with_s3_connector_python(self, mocker, backend_fixtures): + # Arrange + engine = python.Engine() + engine_instance = mocker.patch("hsfs.engine.get_instance", return_value=engine) + json = backend_fixtures["feature_group"]["get_basic_info"]["response"] + fg = feature_group.FeatureGroup.from_response_json(json) + fg._location = f"{fg.name}_{fg.version}" + fg._storage_connector = storage_connector.S3Connector(id=1, name="s3_conn", featurestore_id=fg.feature_store_id) + + # Act + with pytest.raises(AttributeError): + fg.prepare_spark_location() + + # Assert + engine_instance.assert_called_once() From abec75e47662fbd76be86fa1a11a8faed5ab78d5 Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 2 Oct 2024 17:47:32 +0300 Subject: [PATCH 33/33] ruff fix --- python/tests/core/test_feature_group_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/core/test_feature_group_engine.py b/python/tests/core/test_feature_group_engine.py index 2ab5dad16..f93e44abd 100644 --- a/python/tests/core/test_feature_group_engine.py +++ b/python/tests/core/test_feature_group_engine.py @@ -551,7 +551,7 @@ def test_clean_delta(self, mocker): # Assert assert mock_hudi_engine.return_value.vacuum.call_count == 1 - + def test_clean_hudi(self, mocker): # Arrange feature_store_id = 99