From 71df14495ed247d1894058eb1b934edb6cc58fcc Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Fri, 11 Oct 2024 17:58:18 +0300 Subject: [PATCH 01/37] _deserialize_from_avro (no tests fixed) --- python/hsfs/engine/spark.py | 43 +++++++++++++++++++++++++------------ 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index c0d184faa..5464f548e 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -475,9 +475,7 @@ def save_stream_dataframe( write_options = kafka_engine.get_kafka_config( feature_group.feature_store_id, write_options, engine="spark" ) - serialized_df = self._online_fg_to_avro( - feature_group, self._encode_complex_features(feature_group, dataframe) - ) + serialized_df = self._serialize_to_avro(feature_group, dataframe) project_id = str(feature_group.feature_store.project_id) feature_group_id = str(feature_group._id) @@ -570,9 +568,7 @@ def _save_online_dataframe(self, feature_group, dataframe, write_options): feature_group.feature_store_id, write_options, engine="spark" ) - serialized_df = self._online_fg_to_avro( - feature_group, self._encode_complex_features(feature_group, dataframe) - ) + serialized_df = self._serialize_to_avro(feature_group, dataframe) project_id = str(feature_group.feature_store.project_id).encode("utf8") feature_group_id = str(feature_group._id).encode("utf8") @@ -592,13 +588,13 @@ def _save_online_dataframe(self, feature_group, dataframe, write_options): "topic", feature_group._online_topic_name ).save() - def _encode_complex_features( + def _serialize_to_avro( self, feature_group: Union[fg_mod.FeatureGroup, fg_mod.ExternalFeatureGroup], dataframe: Union[RDD, DataFrame], ): """Encodes all complex type features to binary using their avro type as schema.""" - return dataframe.select( + encoded_dataframe = dataframe.select( [ field["name"] if field["name"] not in feature_group.get_complex_features() @@ -609,15 +605,10 @@ def _encode_complex_features( ] ) - def _online_fg_to_avro( - self, - feature_group: Union[fg_mod.FeatureGroup, fg_mod.ExternalFeatureGroup], - dataframe: Union[DataFrame, RDD], - ): """Packs all features into named struct to be serialized to single avro/binary column. And packs primary key into arry to be serialized for partitioning. """ - return dataframe.select( + return encoded_dataframe.select( [ # be aware: primary_key array should always be sorted to_avro( @@ -639,6 +630,30 @@ def _online_fg_to_avro( ).alias("value"), ] ) + + def _deserialize_from_avro( + self, + feature_group: Union[fg_mod.FeatureGroup, fg_mod.ExternalFeatureGroup], + dataframe: Union[RDD, DataFrame], + ): + """ + Deserializes 'value' column from binary using avro schema and unpacks it into columns. + """ + decoded_dataframe = dataframe.select( + from_avro("value", feature_group._get_encoded_avro_schema()).alias("value") + ).select(col("value.*")) + + """Decodes all complex type features from binary using their avro type as schema.""" + return decoded_dataframe.select( + [ + field["name"] + if field["name"] not in feature_group.get_complex_features() + else from_avro( + field["name"], feature_group._get_feature_avro_schema(field["name"]) + ).alias(field["name"]) + for field in json.loads(feature_group.avro_schema)["fields"] + ] + ) def get_training_data( self, From 4ca58ff3564c4e6112cab5b8d3488a464ab5192c Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Fri, 11 Oct 2024 19:31:48 +0300 Subject: [PATCH 02/37] dont limit to HUDI --- python/hsfs/feature_group.py | 34 ++++++++++++++-------------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index a3385afda..328dba4df 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -2327,29 +2327,23 @@ def __init__( # for python engine we always use stream feature group if engine.get_type() == "python": self._stream = True - # for stream feature group time travel format is always HUDI - if self._stream: - expected_format = "HUDI" - if self._time_travel_format != expected_format: - warnings.warn( - ( - "The provided time travel format `{}` has been overwritten " - "because Stream enabled feature groups only support `{}`" - ).format(self._time_travel_format, expected_format), - util.FeatureGroupWarning, - stacklevel=1, - ) - self._time_travel_format = expected_format self.primary_key = primary_key self.partition_key = partition_key - self._hudi_precombine_key = ( - util.autofix_feature_name(hudi_precombine_key) - if hudi_precombine_key is not None - and self._time_travel_format is not None - and self._time_travel_format == "HUDI" - else None - ) + if ( + time_travel_format is not None + and time_travel_format.upper() == "HUDI" + and self._features + ): + self._hudi_precombine_key = ( + util.autofix_feature_name(hudi_precombine_key) + if hudi_precombine_key is not None + and self._time_travel_format is not None + and self._time_travel_format == "HUDI" + else None + ) + else: + self._hudi_precombine_key: Optional[str] = None self.statistics_config = statistics_config self._offline_backfill_every_hr = offline_backfill_every_hr From 97d6e35c9919bf1e536cb8355740c5cdae6dbce5 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Fri, 11 Oct 2024 21:42:39 +0300 Subject: [PATCH 03/37] hsfs utils changes --- utils/python/hsfs_utils.py | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index 6b8c49311..4f28f0856 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -16,9 +16,10 @@ import hopsworks +from hsfs import engine from hsfs.constructor import query from hsfs.statistics_config import StatisticsConfig -from hsfs.core import feature_monitoring_config_engine, feature_view_engine +from hsfs.core import feature_monitoring_config_engine, feature_view_engine, kafka_engine def read_job_conf(path: str) -> Dict[Any, Any]: @@ -258,6 +259,32 @@ def delta_vacuum_fg(spark: SparkSession, job_conf: Dict[Any, Any]) -> None: entity.delta_vacuum() +def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any]) -> None: + """ + Run materialization job on a feature group. + """ + feature_store = job_conf.pop("feature_store") + fs = get_feature_store_handle(feature_store) + + entity = fs.get_feature_group(name=job_conf["name"], version=job_conf["version"]) + + read_options = kafka_engine.get_kafka_config( + entity.feature_store_id, {}, engine="spark" + ) + + df = ( + spark.read.format("kafka") + .options(**read_options) + .option("subscribe", entity._online_topic_name) + .load() + ) + + # deserialize dataframe so that it can be properly saved + deserialized_df = engine.get_instance()._deserialize_from_avro(entity, df) + + entity.stream = False # to make sure we dont write to kafka + entity.insert(deserialized_df) + if __name__ == "__main__": # Setup spark first so it fails faster in case of args errors @@ -278,6 +305,7 @@ def delta_vacuum_fg(spark: SparkSession, job_conf: Dict[Any, Any]) -> None: "import_fg", "run_feature_monitoring", "delta_vacuum_fg", + "offline_fg_materialization", ], help="Operation type", ) @@ -318,6 +346,8 @@ def parse_isoformat_date(da: str) -> datetime: run_feature_monitoring(job_conf) elif args.op == "delta_vacuum_fg": delta_vacuum_fg(spark, job_conf) + elif args.op == "offline_fg_materialization": + offline_fg_materialization(spark, job_conf) success = True except Exception: From c744892c1ea77a80e5e0f8648de553dd758bf95f Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Mon, 14 Oct 2024 12:18:01 +0300 Subject: [PATCH 04/37] initial_check_point_string --- utils/python/hsfs_utils.py | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index 4f28f0856..e1d3df9aa 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -259,7 +259,7 @@ def delta_vacuum_fg(spark: SparkSession, job_conf: Dict[Any, Any]) -> None: entity.delta_vacuum() -def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any]) -> None: +def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], initial_check_point_string: str) -> None: """ Run materialization job on a feature group. """ @@ -276,6 +276,7 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any]) -> spark.read.format("kafka") .options(**read_options) .option("subscribe", entity._online_topic_name) + .option("startingOffsets", _build_starting_offsets(initial_check_point_string)) \ .load() ) @@ -285,6 +286,20 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any]) -> entity.stream = False # to make sure we dont write to kafka entity.insert(deserialized_df) +def _build_starting_offsets(initial_check_point_string: str): + if not initial_check_point_string: + return "" + + # Split the input string into the topic and partition-offset pairs + topic, offsets = initial_check_point_string.split(',', 1) + + # Split the offsets and build a dictionary from them + offsets_dict = {partition: int(offset) for partition, offset in (pair.split(':') for pair in offsets.split(','))} + + # Create the final dictionary structure + result = {topic: offsets_dict} + + return json.dumps(result) if __name__ == "__main__": # Setup spark first so it fails faster in case of args errors @@ -325,6 +340,12 @@ def parse_isoformat_date(da: str) -> datetime: help="Job start time", ) + parser.add_argument( + "-initialCheckPointString", + type=str, + help="Kafka offset to start consuming from", + ) + args = parser.parse_args() job_conf = read_job_conf(args.path) @@ -347,7 +368,7 @@ def parse_isoformat_date(da: str) -> datetime: elif args.op == "delta_vacuum_fg": delta_vacuum_fg(spark, job_conf) elif args.op == "offline_fg_materialization": - offline_fg_materialization(spark, job_conf) + offline_fg_materialization(spark, job_conf, args.initialCheckPointString) success = True except Exception: From 0e923769ba47f83798300cb1cf74a78a416a2608 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Mon, 14 Oct 2024 16:05:47 +0300 Subject: [PATCH 05/37] save offsets? --- python/hsfs/core/kafka_engine.py | 2 +- python/hsfs/engine/python.py | 2 +- utils/python/hsfs_utils.py | 35 ++++++++++++++++++++++++++++++-- 3 files changed, 35 insertions(+), 4 deletions(-) diff --git a/python/hsfs/core/kafka_engine.py b/python/hsfs/core/kafka_engine.py index d21b6ec22..ee9e892be 100644 --- a/python/hsfs/core/kafka_engine.py +++ b/python/hsfs/core/kafka_engine.py @@ -141,7 +141,7 @@ def kafka_get_offsets( offsets += f",{partition_metadata.id}:{consumer.get_watermark_offsets(partition)[tuple_value]}" consumer.close() - return f" -initialCheckPointString {topic_name + offsets}" + return f"{topic_name + offsets}" return "" diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 44cf505fc..9d89eba81 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1401,7 +1401,7 @@ def _write_dataframe_kafka( now = datetime.now(timezone.utc) feature_group.materialization_job.run( args=feature_group.materialization_job.config.get("defaultArgs", "") - + initial_check_point, + + (f" -initialCheckPointString {initial_check_point}" if initial_check_point else ""), await_termination=offline_write_options.get("wait_for_job", False), ) offline_backfill_every_hr = offline_write_options.pop( diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index e1d3df9aa..408acef86 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -13,6 +13,7 @@ hopsfs = pfs.HadoopFileSystem("default", user=os.environ["HADOOP_USER_NAME"]) from pyspark.sql import SparkSession from pyspark.sql.types import StructField, StructType, _parse_datatype_string +from pyspark.sql.functions import max import hopsworks @@ -272,20 +273,50 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in entity.feature_store_id, {}, engine="spark" ) + # get offsets + offset_location = entity.location + "_offsets" + try: + if initial_check_point_string: + offset_string = json.dumps(_build_starting_offsets(initial_check_point_string)) + else: + offset_string = spark.read.json(offset_location).toJSON().first() + except Exception as e: + print(f"An unexpected error occurred: {e}") + # if all else fails read from the beggining + initial_check_point_string = kafka_engine.kafka_get_offsets( + topic_name=entity._online_topic_name, + feature_store_id=entity.feature_store_id, + high=False, + ) + offset_string = json.dumps(_build_starting_offsets(initial_check_point_string)) + + # read kafka topic df = ( spark.read.format("kafka") .options(**read_options) .option("subscribe", entity._online_topic_name) - .option("startingOffsets", _build_starting_offsets(initial_check_point_string)) \ + .option("startingOffsets", offset_string) + .limit(5000000) .load() ) # deserialize dataframe so that it can be properly saved deserialized_df = engine.get_instance()._deserialize_from_avro(entity, df) + # insert data entity.stream = False # to make sure we dont write to kafka entity.insert(deserialized_df) + # update offsets + df_offsets = df.groupBy('partition').agg(max('offset').alias('offset')).collect() + offset_dict = json.loads(offset_string) + for offset_row in df_offsets: + offset_dict[f"{entity._online_topic_name}"][f"{offset_row.partition}"] = offset_row.offset + + # save offsets + offset_df = spark.createDataFrame([offset_dict]) + offset_df.write.mode("overwrite").json(offset_location) + def _build_starting_offsets(initial_check_point_string: str): if not initial_check_point_string: return "" @@ -299,7 +330,7 @@ def _build_starting_offsets(initial_check_point_string: str): # Create the final dictionary structure result = {topic: offsets_dict} - return json.dumps(result) + return result if __name__ == "__main__": # Setup spark first so it fails faster in case of args errors From cf1c325021a8eb84ee0ce81283c9a5ff1ae5c68d Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Mon, 14 Oct 2024 17:25:39 +0300 Subject: [PATCH 06/37] add filter to kafka --- utils/python/hsfs_utils.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index 408acef86..b46112595 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -13,7 +13,7 @@ hopsfs = pfs.HadoopFileSystem("default", user=os.environ["HADOOP_USER_NAME"]) from pyspark.sql import SparkSession from pyspark.sql.types import StructField, StructType, _parse_datatype_string -from pyspark.sql.functions import max +from pyspark.sql.functions import max, expr import hopsworks @@ -300,6 +300,10 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in .load() ) + # filter only the necassary entries + df = df.filter(expr("CAST(filter(headers, header -> header.key = 'featureGroupId')[0].value AS STRING)") == str(entity._id)) + df = df.filter(expr("CAST(filter(headers, header -> header.key = 'subjectId')[0].value AS STRING)") == str(entity.subject["id"])) + # deserialize dataframe so that it can be properly saved deserialized_df = engine.get_instance()._deserialize_from_avro(entity, df) From ddf72c555116e49bd6e56d9cc75b575600fb2c16 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Tue, 15 Oct 2024 10:16:16 +0300 Subject: [PATCH 07/37] initialCheckPointString fix --- python/hsfs/engine/python.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 9d89eba81..4924205e7 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1431,7 +1431,7 @@ def _write_dataframe_kafka( # provide the initial_check_point as it will reduce the read amplification of materialization job feature_group.materialization_job.run( args=feature_group.materialization_job.config.get("defaultArgs", "") - + initial_check_point, + + (f" -initialCheckPointString {initial_check_point}" if initial_check_point else ""), await_termination=offline_write_options.get("wait_for_job", False), ) return feature_group.materialization_job From aba04f7481367b0a0cb6587216ad3e05acd95dc3 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Tue, 15 Oct 2024 10:28:33 +0300 Subject: [PATCH 08/37] includeHeaders and limit fix --- utils/python/hsfs_utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index b46112595..4d9a1e09e 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -296,8 +296,9 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in .options(**read_options) .option("subscribe", entity._online_topic_name) .option("startingOffsets", offset_string) - .limit(5000000) + .option("includeHeaders", "true") .load() + .limit(5000000) ) # filter only the necassary entries From 5a7c938fe60d7463f746767b8157ecddeecf4dd3 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Tue, 15 Oct 2024 11:02:57 +0300 Subject: [PATCH 09/37] small fix --- utils/python/hsfs_utils.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index 4d9a1e09e..08d10bd48 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -286,6 +286,7 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in initial_check_point_string = kafka_engine.kafka_get_offsets( topic_name=entity._online_topic_name, feature_store_id=entity.feature_store_id, + offline_write_options={}, high=False, ) offset_string = json.dumps(_build_starting_offsets(initial_check_point_string)) @@ -316,11 +317,11 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in df_offsets = df.groupBy('partition').agg(max('offset').alias('offset')).collect() offset_dict = json.loads(offset_string) for offset_row in df_offsets: - offset_dict[f"{entity._online_topic_name}"][f"{offset_row.partition}"] = offset_row.offset + offset_dict[f"{entity._online_topic_name}"][f"{offset_row.partition}"] = offset_row.offset + 1 # save offsets offset_df = spark.createDataFrame([offset_dict]) - offset_df.write.mode("overwrite").json(offset_location) + offset_df.coalesce(1).write.mode("overwrite").json(offset_location) def _build_starting_offsets(initial_check_point_string: str): if not initial_check_point_string: From 8692113b6418a48a95ae609e169e7d156986cdbf Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Tue, 15 Oct 2024 11:33:21 +0300 Subject: [PATCH 10/37] java client fix --- .../logicalclocks/hsfs/beam/FeatureStore.java | 2 +- .../hsfs/beam/StreamFeatureGroup.java | 25 ++++++----- .../hsfs/flink/FeatureStore.java | 5 ++- .../hsfs/flink/StreamFeatureGroup.java | 28 ++++++------ .../logicalclocks/hsfs/FeatureStoreBase.java | 4 +- .../logicalclocks/hsfs/TimeTravelFormat.java | 3 +- .../hsfs/spark/FeatureStore.java | 16 ++++--- .../hsfs/spark/StreamFeatureGroup.java | 44 ++++++++++--------- .../hsfs/spark/engine/FeatureGroupEngine.java | 4 +- .../hsfs/spark/TestFeatureGroup.java | 9 ++-- 10 files changed, 78 insertions(+), 62 deletions(-) diff --git a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/FeatureStore.java b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/FeatureStore.java index c059520f7..db01f295a 100644 --- a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/FeatureStore.java +++ b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/FeatureStore.java @@ -160,7 +160,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver @Override public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, String description, List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, - StatisticsConfig statisticsConfig, String eventTime, OnlineConfig onlineConfig) + TimeTravelFormat timeTravelFormat, StatisticsConfig statisticsConfig, String eventTime, OnlineConfig onlineConfig) throws IOException, FeatureStoreException { throw new UnsupportedOperationException("Not supported for Beam"); } diff --git a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java index 9d3c41ee6..e74b51ade 100644 --- a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java +++ b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java @@ -17,6 +17,14 @@ package com.logicalclocks.hsfs.beam; +import java.io.IOException; +import java.text.ParseException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.beam.sdk.values.PCollection; + import com.logicalclocks.hsfs.Feature; import com.logicalclocks.hsfs.FeatureGroupBase; import com.logicalclocks.hsfs.FeatureStoreException; @@ -26,19 +34,14 @@ import com.logicalclocks.hsfs.StatisticsConfig; import com.logicalclocks.hsfs.Storage; import com.logicalclocks.hsfs.StorageConnector; -import com.logicalclocks.hsfs.beam.engine.FeatureGroupEngine; +import com.logicalclocks.hsfs.TimeTravelFormat; import com.logicalclocks.hsfs.beam.engine.BeamProducer; +import com.logicalclocks.hsfs.beam.engine.FeatureGroupEngine; import com.logicalclocks.hsfs.constructor.QueryBase; import com.logicalclocks.hsfs.metadata.Statistics; + import lombok.Builder; import lombok.NonNull; -import org.apache.beam.sdk.values.PCollection; - -import java.io.IOException; -import java.text.ParseException; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; public class StreamFeatureGroup extends FeatureGroupBase<PCollection<Object>> { @@ -48,8 +51,9 @@ public class StreamFeatureGroup extends FeatureGroupBase<PCollection<Object>> { @Builder public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description, List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, - boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName, - String eventTime, OnlineConfig onlineConfig, StorageConnector storageConnector, String path) { + boolean onlineEnabled, TimeTravelFormat timeTravelFormat, List<Feature> features, + StatisticsConfig statisticsConfig, String onlineTopicName, String eventTime, + OnlineConfig onlineConfig, StorageConnector storageConnector, String path) { this(); this.featureStore = featureStore; this.name = name; @@ -61,6 +65,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ ? partitionKeys.stream().map(String::toLowerCase).collect(Collectors.toList()) : null; this.hudiPrecombineKey = hudiPrecombineKey != null ? hudiPrecombineKey.toLowerCase() : null; this.onlineEnabled = onlineEnabled; + this.timeTravelFormat = timeTravelFormat != null ? timeTravelFormat : TimeTravelFormat.HUDI; this.features = features; this.statisticsConfig = statisticsConfig != null ? statisticsConfig : new StatisticsConfig(); this.onlineTopicName = onlineTopicName; diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/FeatureStore.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/FeatureStore.java index b6314bad4..60dcbaeb6 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/FeatureStore.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/FeatureStore.java @@ -165,8 +165,9 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, String description, List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, - StatisticsConfig statisticsConfig, String eventTime, - OnlineConfig onlineConfig) + TimeTravelFormat timeTravelFormat, + StatisticsConfig statisticsConfig, + String eventTime, OnlineConfig onlineConfig) throws IOException, FeatureStoreException { throw new UnsupportedOperationException("Not supported for Flink"); } diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java index c3cd6cbd0..0fa821fb3 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java @@ -17,6 +17,15 @@ package com.logicalclocks.hsfs.flink; +import java.io.IOException; +import java.text.ParseException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; + import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.logicalclocks.hsfs.Feature; import com.logicalclocks.hsfs.FeatureGroupBase; @@ -27,22 +36,14 @@ import com.logicalclocks.hsfs.StatisticsConfig; import com.logicalclocks.hsfs.Storage; import com.logicalclocks.hsfs.StorageConnector; +import com.logicalclocks.hsfs.TimeTravelFormat; import com.logicalclocks.hsfs.constructor.QueryBase; - +import com.logicalclocks.hsfs.flink.engine.FeatureGroupEngine; import com.logicalclocks.hsfs.metadata.Statistics; -import com.logicalclocks.hsfs.flink.engine.FeatureGroupEngine; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.NonNull; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; - -import java.io.IOException; -import java.text.ParseException; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; @AllArgsConstructor @JsonIgnoreProperties(ignoreUnknown = true) @@ -53,9 +54,9 @@ public class StreamFeatureGroup extends FeatureGroupBase<DataStream<?>> { @Builder public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description, List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, - boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig, - String onlineTopicName, String topicName, String notificationTopicName, String eventTime, - OnlineConfig onlineConfig, StorageConnector storageConnector, String path) { + boolean onlineEnabled, TimeTravelFormat timeTravelFormat, List<Feature> features, + StatisticsConfig statisticsConfig, String onlineTopicName, String topicName, String notificationTopicName, + String eventTime, OnlineConfig onlineConfig, StorageConnector storageConnector, String path) { this(); this.featureStore = featureStore; this.name = name; @@ -67,6 +68,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ ? partitionKeys.stream().map(String::toLowerCase).collect(Collectors.toList()) : null; this.hudiPrecombineKey = hudiPrecombineKey != null ? hudiPrecombineKey.toLowerCase() : null; this.onlineEnabled = onlineEnabled; + this.timeTravelFormat = timeTravelFormat != null ? timeTravelFormat : TimeTravelFormat.HUDI; this.features = features; this.statisticsConfig = statisticsConfig != null ? statisticsConfig : new StatisticsConfig(); this.onlineTopicName = onlineTopicName; diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureStoreBase.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureStoreBase.java index ad391ef90..057838cad 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureStoreBase.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureStoreBase.java @@ -122,8 +122,8 @@ public abstract Object getOrCreateStreamFeatureGroup(String name, Integer versio public abstract Object getOrCreateStreamFeatureGroup(String name, Integer version, String description, List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, - StatisticsConfig statisticsConfig, String eventTime, - OnlineConfig onlineConfig) + TimeTravelFormat timeTravelFormat, StatisticsConfig statisticsConfig, + String eventTime, OnlineConfig onlineConfig) throws IOException, FeatureStoreException; public abstract Object createExternalFeatureGroup(); diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/TimeTravelFormat.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/TimeTravelFormat.java index 4e0fb0419..d6c3d0b2e 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/TimeTravelFormat.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/TimeTravelFormat.java @@ -19,5 +19,6 @@ public enum TimeTravelFormat { NONE, - HUDI + HUDI, + DELTA } diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureStore.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureStore.java index 33e3b6058..65dbc66d7 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureStore.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureStore.java @@ -404,7 +404,7 @@ public StreamFeatureGroup.StreamFeatureGroupBuilder createStreamFeatureGroup() { public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version) throws IOException, FeatureStoreException { return featureGroupEngine.getOrCreateStreamFeatureGroup(this, name, version, null, - null, null, null, false, null, null, null); + null, null, null, false, TimeTravelFormat.HUDI, null, null, null); } /** @@ -438,7 +438,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver boolean onlineEnabled, String eventTime) throws IOException, FeatureStoreException { return featureGroupEngine.getOrCreateStreamFeatureGroup(this, name, version, null, - primaryKeys, null, null, onlineEnabled, null, eventTime, null); + primaryKeys, null, null, onlineEnabled, TimeTravelFormat.HUDI, null, eventTime, null); } /** @@ -477,7 +477,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver return featureGroupEngine.getOrCreateStreamFeatureGroup(this, name, version, null, - primaryKeys, partitionKeys, null, onlineEnabled, null, eventTime, null); + primaryKeys, partitionKeys, null, onlineEnabled, TimeTravelFormat.HUDI, null, eventTime, null); } /** @@ -506,6 +506,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver * the first primary key of the feature group will be used as hudi precombine key. * @param onlineEnabled Define whether the feature group should be made available also in the online feature store * for low latency access. + * @param timeTravelFormat Format used for time travel, defaults to `"HUDI"`. * @param statisticsConfig A configuration object, to generally enable descriptive statistics computation for * this feature group, `"correlations`" to turn on feature correlation computation, * `"histograms"` to compute feature value frequencies and `"exact_uniqueness"` to compute @@ -523,13 +524,14 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, String description, List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, - StatisticsConfig statisticsConfig, String eventTime, - OnlineConfig onlineConfig) + TimeTravelFormat timeTravelFormat, + StatisticsConfig statisticsConfig, + String eventTime, OnlineConfig onlineConfig) throws IOException, FeatureStoreException { return featureGroupEngine.getOrCreateStreamFeatureGroup(this, name, version, description, - primaryKeys, partitionKeys, hudiPrecombineKey, onlineEnabled, statisticsConfig, eventTime, - onlineConfig); + primaryKeys, partitionKeys, hudiPrecombineKey, onlineEnabled, timeTravelFormat, + statisticsConfig, eventTime, onlineConfig); } /** diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java index 0c8b9bae3..4f423e8f3 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java @@ -17,13 +17,23 @@ package com.logicalclocks.hsfs.spark; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import java.io.IOException; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; -import com.logicalclocks.hsfs.spark.constructor.Query; -import com.logicalclocks.hsfs.spark.engine.FeatureGroupEngine; -import com.logicalclocks.hsfs.spark.engine.StatisticsEngine; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.streaming.StreamingQuery; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.logicalclocks.hsfs.EntityEndpointType; import com.logicalclocks.hsfs.Feature; +import com.logicalclocks.hsfs.FeatureGroupBase; import com.logicalclocks.hsfs.FeatureStoreException; import com.logicalclocks.hsfs.HudiOperationType; import com.logicalclocks.hsfs.JobConfiguration; @@ -31,26 +41,16 @@ import com.logicalclocks.hsfs.StatisticsConfig; import com.logicalclocks.hsfs.Storage; import com.logicalclocks.hsfs.StorageConnector; -import com.logicalclocks.hsfs.FeatureGroupBase; +import com.logicalclocks.hsfs.TimeTravelFormat; import com.logicalclocks.hsfs.metadata.Statistics; +import com.logicalclocks.hsfs.spark.constructor.Query; +import com.logicalclocks.hsfs.spark.engine.FeatureGroupEngine; +import com.logicalclocks.hsfs.spark.engine.StatisticsEngine; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.NonNull; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.streaming.StreamingQuery; - -import java.io.IOException; -import java.text.ParseException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - @AllArgsConstructor @JsonIgnoreProperties(ignoreUnknown = true) public class StreamFeatureGroup extends FeatureGroupBase<Dataset<Row>> { @@ -61,9 +61,10 @@ public class StreamFeatureGroup extends FeatureGroupBase<Dataset<Row>> { @Builder public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description, List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, - boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig, - String onlineTopicName, String topicName, String notificationTopicName, String eventTime, - OnlineConfig onlineConfig, StorageConnector storageConnector, String path) { + boolean onlineEnabled, TimeTravelFormat timeTravelFormat, List<Feature> features, + StatisticsConfig statisticsConfig, String onlineTopicName, String topicName, + String notificationTopicName, String eventTime, OnlineConfig onlineConfig, + StorageConnector storageConnector, String path) { this(); this.featureStore = featureStore; this.name = name; @@ -75,6 +76,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ ? partitionKeys.stream().map(String::toLowerCase).collect(Collectors.toList()) : null; this.hudiPrecombineKey = hudiPrecombineKey != null ? hudiPrecombineKey.toLowerCase() : null; this.onlineEnabled = onlineEnabled; + this.timeTravelFormat = timeTravelFormat != null ? timeTravelFormat : TimeTravelFormat.HUDI; this.features = features; this.statisticsConfig = statisticsConfig != null ? statisticsConfig : new StatisticsConfig(); this.onlineTopicName = onlineTopicName; diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/FeatureGroupEngine.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/FeatureGroupEngine.java index 96ddfd5f2..f791d8bcd 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/FeatureGroupEngine.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/FeatureGroupEngine.java @@ -364,7 +364,8 @@ public List<FeatureGroup> getFeatureGroups(FeatureStore featureStore, String fgN public StreamFeatureGroup getOrCreateStreamFeatureGroup(FeatureStore featureStore, String name, Integer version, String description, List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, - boolean onlineEnabled, StatisticsConfig statisticsConfig, + boolean onlineEnabled, TimeTravelFormat timeTravelFormat, + StatisticsConfig statisticsConfig, String eventTime, OnlineConfig onlineConfig) throws IOException, FeatureStoreException { StreamFeatureGroup featureGroup; @@ -381,6 +382,7 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(FeatureStore featureStor .partitionKeys(partitionKeys) .hudiPrecombineKey(hudiPrecombineKey) .onlineEnabled(onlineEnabled) + .timeTravelFormat(timeTravelFormat) .statisticsConfig(statisticsConfig) .eventTime(eventTime) .onlineConfig(onlineConfig) diff --git a/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java b/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java index bedd9716e..86a85bbdc 100644 --- a/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java +++ b/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java @@ -20,6 +20,7 @@ import com.logicalclocks.hsfs.Feature; import com.logicalclocks.hsfs.FeatureStoreException; import com.logicalclocks.hsfs.Project; +import com.logicalclocks.hsfs.TimeTravelFormat; import com.logicalclocks.hsfs.metadata.FeatureGroupApi; import com.logicalclocks.hsfs.FeatureGroupBase; import com.logicalclocks.hsfs.metadata.HopsworksClient; @@ -67,7 +68,7 @@ public void testFeatureGroupPrimaryKey() { StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description", Collections.singletonList("primaryKey"), Collections.singletonList("partitionKey"), "hudiPrecombineKey", - true, features, null, "onlineTopicName", null, null, null, null, null, null); + true, TimeTravelFormat.HUDI, features, null, "onlineTopicName", null, null, null, null, null, null); Exception pkException = assertThrows(FeatureStoreException.class, () -> { featureGroupEngine.saveFeatureGroupMetaData(featureGroup, @@ -93,7 +94,7 @@ public void testFeatureGroupEventTimeFeature() { StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description", Collections.singletonList("featureA"), null, null, - true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null); + true, TimeTravelFormat.HUDI, features, null, "onlineTopicName", null, null, "eventTime", null, null, null); Exception eventTimeException = assertThrows(FeatureStoreException.class, () -> { streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup, @@ -119,7 +120,7 @@ public void testFeatureGroupPartitionPrecombineKeys() { StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description", Collections.singletonList("featureA"), Collections.singletonList("partitionKey"), "hudiPrecombineKey", - true, features, null, "onlineTopicName", null, null, null, null, null, null); + true, TimeTravelFormat.HUDI, features, null, "onlineTopicName", null, null, null, null, null, null); Exception partitionException = assertThrows(FeatureStoreException.class, () -> { streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup, @@ -164,7 +165,7 @@ public void testFeatureGroupAppendFeaturesResetSubject() throws FeatureStoreExce StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description", Collections.singletonList("featureA"), null, null, - true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null); + true, TimeTravelFormat.HUDI, features, null, "onlineTopicName", null, null, "eventTime", null, null, null); featureGroup.featureGroupEngine = featureGroupEngine; // Act From afe70f4bfb27c5c773ebabc35cd821e1e2967559 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Tue, 15 Oct 2024 11:34:59 +0300 Subject: [PATCH 11/37] ruff fix --- python/hsfs/engine/spark.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 5464f548e..4f10c31c1 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -630,7 +630,7 @@ def _serialize_to_avro( ).alias("value"), ] ) - + def _deserialize_from_avro( self, feature_group: Union[fg_mod.FeatureGroup, fg_mod.ExternalFeatureGroup], @@ -642,7 +642,7 @@ def _deserialize_from_avro( decoded_dataframe = dataframe.select( from_avro("value", feature_group._get_encoded_avro_schema()).alias("value") ).select(col("value.*")) - + """Decodes all complex type features from binary using their avro type as schema.""" return decoded_dataframe.select( [ From 32e49e79f10779423fbf3d20fb0c1d5cf8ce01f5 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Tue, 15 Oct 2024 12:09:51 +0300 Subject: [PATCH 12/37] some test fixes --- python/tests/core/test_kafka_engine.py | 4 +- python/tests/engine/test_python.py | 12 +- python/tests/engine/test_spark.py | 218 ++++++++----------------- 3 files changed, 80 insertions(+), 154 deletions(-) diff --git a/python/tests/core/test_kafka_engine.py b/python/tests/core/test_kafka_engine.py index e6bb48297..88085689e 100644 --- a/python/tests/core/test_kafka_engine.py +++ b/python/tests/core/test_kafka_engine.py @@ -340,7 +340,7 @@ def test_kafka_get_offsets_high(self, mocker): ) # Assert - assert result == f" -initialCheckPointString {topic_name},0:11" + assert result == f"{topic_name},0:11" def test_kafka_get_offsets_low(self, mocker): # Arrange @@ -372,7 +372,7 @@ def test_kafka_get_offsets_low(self, mocker): ) # Assert - assert result == f" -initialCheckPointString {topic_name},0:0" + assert result == f"{topic_name},0:0" def test_kafka_get_offsets_no_topic(self, mocker): # Arrange diff --git a/python/tests/engine/test_python.py b/python/tests/engine/test_python.py index 49a66dc46..a64c9ecbd 100644 --- a/python/tests/engine/test_python.py +++ b/python/tests/engine/test_python.py @@ -2962,7 +2962,7 @@ def test_materialization_kafka_first_job_execution(self, mocker): mocker.patch("hsfs.util.get_job_url") mocker.patch( "hsfs.core.kafka_engine.kafka_get_offsets", - return_value=" tests_offsets", + return_value="tests_offsets", ) mocker.patch( "hsfs.core.job_api.JobApi.last_execution", @@ -3004,7 +3004,7 @@ def test_materialization_kafka_first_job_execution(self, mocker): # Assert assert mock_python_engine_kafka_produce.call_count == 4 job_mock.run.assert_called_once_with( - args="defaults tests_offsets", + args="defaults -initialCheckPointString tests_offsets", await_termination=False, ) @@ -3020,7 +3020,7 @@ def test_materialization_kafka_skip_offsets(self, mocker): mocker.patch("hsfs.util.get_job_url") mocker.patch( "hsfs.core.kafka_engine.kafka_get_offsets", - return_value=" tests_offsets", + return_value="tests_offsets", ) mocker.patch("hopsworks_common.client.get_instance") @@ -3061,7 +3061,7 @@ def test_materialization_kafka_skip_offsets(self, mocker): # Assert assert mock_python_engine_kafka_produce.call_count == 4 job_mock.run.assert_called_once_with( - args="defaults tests_offsets", + args="defaults -initialCheckPointString tests_offsets", await_termination=False, ) @@ -3077,7 +3077,7 @@ def test_materialization_kafka_topic_doesnt_exist(self, mocker): mocker.patch("hsfs.util.get_job_url") mocker.patch( "hsfs.core.kafka_engine.kafka_get_offsets", - side_effect=["", " tests_offsets"], + side_effect=["", "tests_offsets"], ) mocker.patch("hopsworks_common.client.get_instance") @@ -3115,7 +3115,7 @@ def test_materialization_kafka_topic_doesnt_exist(self, mocker): # Assert assert mock_python_engine_kafka_produce.call_count == 4 job_mock.run.assert_called_once_with( - args="defaults tests_offsets", + args="defaults -initialCheckPointString tests_offsets", await_termination=False, ) diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index 488232dee..97f5826c4 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -873,9 +873,8 @@ def test_save_stream_dataframe(self, mocker, backend_fixtures): "hopsworks_common.client.get_instance" ) mocker.patch("hopsworks_common.client._is_external", return_value=False) - mocker.patch("hsfs.engine.spark.Engine._encode_complex_features") - mock_spark_engine_online_fg_to_avro = mocker.patch( - "hsfs.engine.spark.Engine._online_fg_to_avro" + mock_spark_engine_serialize_to_avro = mocker.patch( + "hsfs.engine.spark.Engine._serialize_to_avro" ) mock_engine_get_instance = mocker.patch("hsfs.engine.get_instance") @@ -921,35 +920,35 @@ def test_save_stream_dataframe(self, mocker, backend_fixtures): # Assert assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.call_args[0][0] + mock_spark_engine_serialize_to_avro.return_value.withColumn.call_args[0][0] == "headers" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.call_args[ 0 ][0] == "test_mode" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.call_args[ 0 ][0] == "kafka" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ 0 ][0] == "checkpointLocation" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ 0 ][1] == f"/Projects/test_project_name/Resources/{self._get_spark_query_name(project_id, fg)}-checkpoint" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.call_args[ 1 ] == { @@ -966,25 +965,25 @@ def test_save_stream_dataframe(self, mocker, backend_fixtures): } ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ 0 ][0] == "topic" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ 0 ][1] == "test_online_topic_name" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.call_args[ 0 ][0] == self._get_spark_query_name(project_id, fg) ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.return_value.start.return_value.awaitTermination.call_count + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.return_value.start.return_value.awaitTermination.call_count == 0 ) @@ -994,9 +993,8 @@ def test_save_stream_dataframe_transformations(self, mocker, backend_fixtures): "hopsworks_common.client.get_instance" ) mocker.patch("hopsworks_common.client._is_external", return_value=False) - mocker.patch("hsfs.engine.spark.Engine._encode_complex_features") - mock_spark_engine_online_fg_to_avro = mocker.patch( - "hsfs.engine.spark.Engine._online_fg_to_avro" + mock_spark_engine_serialize_to_avro = mocker.patch( + "hsfs.engine.spark.Engine._serialize_to_avro" ) mock_engine_get_instance = mocker.patch("hsfs.engine.get_instance") @@ -1052,35 +1050,35 @@ def test(feature): # Assert assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.call_args[0][0] + mock_spark_engine_serialize_to_avro.return_value.withColumn.call_args[0][0] == "headers" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.call_args[ 0 ][0] == "test_mode" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.call_args[ 0 ][0] == "kafka" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ 0 ][0] == "checkpointLocation" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ 0 ][1] == f"/Projects/test_project_name/Resources/{self._get_spark_query_name(project_id, fg)}-checkpoint" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.call_args[ 1 ] == { @@ -1097,25 +1095,25 @@ def test(feature): } ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ 0 ][0] == "topic" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ 0 ][1] == "test_online_topic_name" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.call_args[ 0 ][0] == self._get_spark_query_name(project_id, fg) ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.return_value.start.return_value.awaitTermination.call_count + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.return_value.start.return_value.awaitTermination.call_count == 0 ) assert mock_spark_engine_apply_transformations.call_count == 1 @@ -1126,9 +1124,8 @@ def test_save_stream_dataframe_query_name(self, mocker, backend_fixtures): "hopsworks_common.client.get_instance" ) mocker.patch("hopsworks_common.client._is_external", return_value=False) - mocker.patch("hsfs.engine.spark.Engine._encode_complex_features") - mock_spark_engine_online_fg_to_avro = mocker.patch( - "hsfs.engine.spark.Engine._online_fg_to_avro" + mock_spark_engine_serialize_to_avro = mocker.patch( + "hsfs.engine.spark.Engine._serialize_to_avro" ) mock_engine_get_instance = mocker.patch("hsfs.engine.get_instance") @@ -1172,35 +1169,35 @@ def test_save_stream_dataframe_query_name(self, mocker, backend_fixtures): # Assert assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.call_args[0][0] + mock_spark_engine_serialize_to_avro.return_value.withColumn.call_args[0][0] == "headers" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.call_args[ 0 ][0] == "test_mode" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.call_args[ 0 ][0] == "kafka" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ 0 ][0] == "checkpointLocation" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ 0 ][1] == "/Projects/test_project_name/Resources/test_query_name-checkpoint" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.call_args[ 1 ] == { @@ -1217,25 +1214,25 @@ def test_save_stream_dataframe_query_name(self, mocker, backend_fixtures): } ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ 0 ][0] == "topic" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ 0 ][1] == "test_online_topic_name" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.call_args[ 0 ][0] == "test_query_name" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.return_value.start.return_value.awaitTermination.call_count + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.return_value.start.return_value.awaitTermination.call_count == 0 ) @@ -1251,9 +1248,8 @@ def test_save_stream_dataframe_checkpoint_dir(self, mocker, backend_fixtures): "hopsworks_common.client.get_instance" ) mocker.patch("hopsworks_common.client._is_external", return_value=False) - mocker.patch("hsfs.engine.spark.Engine._encode_complex_features") - mock_spark_engine_online_fg_to_avro = mocker.patch( - "hsfs.engine.spark.Engine._online_fg_to_avro" + mock_spark_engine_serialize_to_avro = mocker.patch( + "hsfs.engine.spark.Engine._serialize_to_avro" ) mock_engine_get_instance = mocker.patch("hsfs.engine.get_instance") @@ -1299,35 +1295,35 @@ def test_save_stream_dataframe_checkpoint_dir(self, mocker, backend_fixtures): # Assert assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.call_args[0][0] + mock_spark_engine_serialize_to_avro.return_value.withColumn.call_args[0][0] == "headers" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.call_args[ 0 ][0] == "test_mode" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.call_args[ 0 ][0] == "kafka" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ 0 ][0] == "checkpointLocation" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ 0 ][1] == "test_checkpoint_dir" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.call_args[ 1 ] == { @@ -1344,25 +1340,25 @@ def test_save_stream_dataframe_checkpoint_dir(self, mocker, backend_fixtures): } ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ 0 ][0] == "topic" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ 0 ][1] == "test_online_topic_name" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.call_args[ 0 ][0] == self._get_spark_query_name(project_id, fg) ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.return_value.start.return_value.awaitTermination.call_count + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.return_value.start.return_value.awaitTermination.call_count == 0 ) @@ -1372,9 +1368,8 @@ def test_save_stream_dataframe_await_termination(self, mocker, backend_fixtures) "hopsworks_common.client.get_instance" ) mocker.patch("hopsworks_common.client._is_external", return_value=False) - mocker.patch("hsfs.engine.spark.Engine._encode_complex_features") - mock_spark_engine_online_fg_to_avro = mocker.patch( - "hsfs.engine.spark.Engine._online_fg_to_avro" + mock_spark_engine_serialize_to_avro = mocker.patch( + "hsfs.engine.spark.Engine._serialize_to_avro" ) mock_engine_get_instance = mocker.patch("hsfs.engine.get_instance") @@ -1420,35 +1415,35 @@ def test_save_stream_dataframe_await_termination(self, mocker, backend_fixtures) # Assert assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.call_args[0][0] + mock_spark_engine_serialize_to_avro.return_value.withColumn.call_args[0][0] == "headers" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.call_args[ 0 ][0] == "test_mode" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.call_args[ 0 ][0] == "kafka" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ 0 ][0] == "checkpointLocation" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ 0 ][1] == f"/Projects/test_project_name/Resources/{self._get_spark_query_name(project_id, fg)}-checkpoint" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.call_args[ 1 ] == { @@ -1465,29 +1460,29 @@ def test_save_stream_dataframe_await_termination(self, mocker, backend_fixtures) } ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ 0 ][0] == "topic" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ 0 ][1] == "test_online_topic_name" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.call_args[ 0 ][0] == self._get_spark_query_name(project_id, fg) ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.return_value.start.return_value.awaitTermination.call_count + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.return_value.start.return_value.awaitTermination.call_count == 1 ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.return_value.start.return_value.awaitTermination.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.return_value.start.return_value.awaitTermination.call_args[ 0 ][0] == 123 @@ -1630,9 +1625,8 @@ def test_save_online_dataframe(self, mocker, backend_fixtures): # Arrange mocker.patch("hopsworks_common.client.get_instance") mocker.patch("hopsworks_common.client._is_external", return_value=False) - mocker.patch("hsfs.engine.spark.Engine._encode_complex_features") - mock_spark_engine_online_fg_to_avro = mocker.patch( - "hsfs.engine.spark.Engine._online_fg_to_avro" + mock_spark_engine_serialize_to_avro = mocker.patch( + "hsfs.engine.spark.Engine._serialize_to_avro" ) mock_engine_get_instance = mocker.patch("hsfs.engine.get_instance") @@ -1668,19 +1662,19 @@ def test_save_online_dataframe(self, mocker, backend_fixtures): ) # Assert - assert mock_spark_engine_online_fg_to_avro.call_count == 1 + assert mock_spark_engine_serialize_to_avro.call_count == 1 assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.call_args[0][0] + mock_spark_engine_serialize_to_avro.return_value.withColumn.call_args[0][0] == "headers" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.write.format.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.write.format.call_args[ 0 ][0] == "kafka" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.write.format.return_value.options.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.write.format.return_value.options.call_args[ 1 ] == { @@ -1697,23 +1691,23 @@ def test_save_online_dataframe(self, mocker, backend_fixtures): } ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.write.format.return_value.options.return_value.option.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.write.format.return_value.options.return_value.option.call_args[ 0 ][0] == "topic" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.write.format.return_value.options.return_value.option.call_args[ + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.write.format.return_value.options.return_value.option.call_args[ 0 ][1] == "test_online_topic_name" ) assert ( - mock_spark_engine_online_fg_to_avro.return_value.withColumn.return_value.write.format.return_value.options.return_value.option.return_value.save.call_count + mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.write.format.return_value.options.return_value.option.return_value.save.call_count == 1 ) - def test_encode_complex_features(self, mocker): + def test_serialize_to_avro(self, mocker): # Arrange mocker.patch("hopsworks_common.client.get_instance") mocker.patch( @@ -1742,7 +1736,7 @@ def test_encode_complex_features(self, mocker): expected = pd.DataFrame(data={"col_0": ["test_1", "test_2"]}) # Act - result = spark_engine._encode_complex_features( + result = spark_engine._serialize_to_avro( feature_group=fg, dataframe=spark_df, ) @@ -1753,74 +1747,6 @@ def test_encode_complex_features(self, mocker): for column in list(result_df): assert result_df[column].equals(expected[column]) - def test_encode_complex_features_col_in_complex_features(self, mocker): - # Arrange - mocker.patch( - "hsfs.feature_group.FeatureGroup.get_complex_features", - return_value=["col_0"], - ) - mocker.patch("hsfs.feature_group.FeatureGroup._get_feature_avro_schema") - - spark_engine = spark.Engine() - - d = {"col_0": ["test_1", "test_2"], "col_1": ["test_1", "test_2"]} - df = pd.DataFrame(data=d) - - spark_df = spark_engine._spark_session.createDataFrame(df) - - fg = feature_group.FeatureGroup( - name="test", - version=1, - featurestore_id=99, - primary_key=[], - partition_key=[], - id=10, - ) - fg._subject = {"schema": '{"fields": [{"name": "col_0"}]}'} - - # Act - with pytest.raises( - TypeError - ) as e_info: # todo look into this (to_avro has to be mocked) - spark_engine._encode_complex_features( - feature_group=fg, - dataframe=spark_df, - ) - - # Assert - assert str(e_info.value) == "'JavaPackage' object is not callable" - - def test_online_fg_to_avro(self): - # Arrange - spark_engine = spark.Engine() - - d = {"col_0": ["test_1", "test_2"], "col_1": ["test_1", "test_2"]} - df = pd.DataFrame(data=d) - - spark_df = spark_engine._spark_session.createDataFrame(df) - - fg = feature_group.FeatureGroup( - name="test", - version=1, - featurestore_id=99, - primary_key=[], - partition_key=[], - id=10, - ) - fg._avro_schema = '{"fields": [{"name": "col_0"}]}' - - # Act - with pytest.raises( - TypeError - ) as e_info: # todo look into this (to_avro has to be mocked) - spark_engine._online_fg_to_avro( - feature_group=fg, - dataframe=spark_df, - ) - - # Assert - assert str(e_info.value) == "'JavaPackage' object is not callable" - def test_get_training_data(self, mocker): # Arrange mock_spark_engine_write_training_dataset = mocker.patch( From 2e52a99c2f1c34624e2e7c182f2ae8bf5b62983f Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Tue, 15 Oct 2024 14:21:19 +0300 Subject: [PATCH 13/37] small test fix --- python/hsfs/feature_group.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 328dba4df..6c89ecada 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -2332,14 +2332,14 @@ def __init__( self.partition_key = partition_key if ( time_travel_format is not None - and time_travel_format.upper() == "HUDI" + and time_travel_format.upper() == "HUDI" or time_travel_format is None and self._features ): self._hudi_precombine_key = ( util.autofix_feature_name(hudi_precombine_key) if hudi_precombine_key is not None and self._time_travel_format is not None - and self._time_travel_format == "HUDI" + and self._time_travel_format == "HUDI" or time_travel_format is None else None ) else: From 55bf7585da88193b8cce71d8fd5e7e5eb21bc552 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Tue, 15 Oct 2024 14:36:29 +0300 Subject: [PATCH 14/37] continue test fix --- python/hsfs/feature_group.py | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 6c89ecada..2ccedad71 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -2225,7 +2225,7 @@ def __init__( features: Optional[List[Union["feature.Feature", Dict[str, Any]]]] = None, location: Optional[str] = None, online_enabled: bool = False, - time_travel_format: Optional[str] = None, + time_travel_format: Optional[str] = "HUDI", statistics_config: Optional[Union["StatisticsConfig", Dict[str, Any]]] = None, online_topic_name: Optional[str] = None, topic_name: Optional[str] = None, @@ -2330,20 +2330,13 @@ def __init__( self.primary_key = primary_key self.partition_key = partition_key - if ( - time_travel_format is not None - and time_travel_format.upper() == "HUDI" or time_travel_format is None - and self._features - ): - self._hudi_precombine_key = ( - util.autofix_feature_name(hudi_precombine_key) - if hudi_precombine_key is not None - and self._time_travel_format is not None - and self._time_travel_format == "HUDI" or time_travel_format is None - else None - ) - else: - self._hudi_precombine_key: Optional[str] = None + self._hudi_precombine_key = ( + util.autofix_feature_name(hudi_precombine_key) + if hudi_precombine_key is not None + and self._time_travel_format is not None + and self._time_travel_format == "HUDI" + else None + ) self.statistics_config = statistics_config self._offline_backfill_every_hr = offline_backfill_every_hr From d910f928d8986df7127605c3b31bc8f24148f1a4 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Tue, 15 Oct 2024 14:52:04 +0300 Subject: [PATCH 15/37] _time_travel_format test --- python/hsfs/feature_group.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 2ccedad71..9d286cc29 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -2225,7 +2225,7 @@ def __init__( features: Optional[List[Union["feature.Feature", Dict[str, Any]]]] = None, location: Optional[str] = None, online_enabled: bool = False, - time_travel_format: Optional[str] = "HUDI", + time_travel_format: Optional[str] = None, statistics_config: Optional[Union["StatisticsConfig", Dict[str, Any]]] = None, online_topic_name: Optional[str] = None, topic_name: Optional[str] = None, @@ -2333,8 +2333,8 @@ def __init__( self._hudi_precombine_key = ( util.autofix_feature_name(hudi_precombine_key) if hudi_precombine_key is not None - and self._time_travel_format is not None - and self._time_travel_format == "HUDI" + and (self._time_travel_format is None + or self._time_travel_format == "HUDI") else None ) self.statistics_config = statistics_config From ea8aef6207387377f4b4552e5de0a382c76ca081 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Wed, 16 Oct 2024 10:30:02 +0300 Subject: [PATCH 16/37] test_serialize_deserialize_avro --- python/tests/engine/test_spark.py | 52 +++++++++++++++++++------------ 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index 97f5826c4..27c3b04d8 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -15,6 +15,7 @@ # from __future__ import annotations +import datetime import hopsworks_common import numpy import pandas as pd @@ -1707,21 +1708,24 @@ def test_save_online_dataframe(self, mocker, backend_fixtures): == 1 ) - def test_serialize_to_avro(self, mocker): + def test_serialize_deserialize_avro(self, mocker): # Arrange - mocker.patch("hopsworks_common.client.get_instance") - mocker.patch( - "hsfs.feature_group.FeatureGroup.get_complex_features", - return_value=["col_1"], - ) - mocker.patch("hsfs.feature_group.FeatureGroup._get_feature_avro_schema") - spark_engine = spark.Engine() - d = {"col_0": ["test_1", "test_2"], "col_1": ["test_1", "test_2"]} - df = pd.DataFrame(data=d) + now = datetime.datetime.now() - spark_df = spark_engine._spark_session.createDataFrame(df) + fg_data = [] + fg_data.append(("ekarson", ["GRAVITY RUSH 2", "KING'S QUEST"], pd.Timestamp(now.timestamp()))) + fg_data.append(("ratmilkdrinker", ["NBA 2K", "CALL OF DUTY"], pd.Timestamp(now.timestamp()))) + pandas_df = pd.DataFrame(fg_data, columns =["account_id", "last_played_games", "event_time"]) + + df = spark_engine._spark_session.createDataFrame(pandas_df) + + features = [ + feature.Feature(name="account_id", type="str"), + feature.Feature(name="last_played_games", type="xx"), + feature.Feature(name="event_time", type="timestamp"), + ] fg = feature_group.FeatureGroup( name="test", @@ -1730,22 +1734,30 @@ def test_serialize_to_avro(self, mocker): primary_key=[], partition_key=[], id=10, + features=features, ) - fg._subject = {"schema": '{"fields": [{"name": "col_0"}]}'} - - expected = pd.DataFrame(data={"col_0": ["test_1", "test_2"]}) + fg._subject = { + 'id': 1025, + 'subject': 'fg_1', + 'version': 1, + 'schema': '{"type":"record","name":"fg_1","namespace":"test_featurestore.db","fields":[{"name":"account_id","type":["null","string"]},{"name":"last_played_games","type":["null",{"type":"array","items":["null","string"]}]},{"name":"event_time","type":["null",{"type":"long","logicalType":"timestamp-micros"}]}]}' + } # Act - result = spark_engine._serialize_to_avro( + serialized_df = spark_engine._serialize_to_avro( feature_group=fg, - dataframe=spark_df, + dataframe=df, + ) + + deserialized_df = spark_engine._deserialize_from_avro( + feature_group=fg, + dataframe=serialized_df, ) # Assert - result_df = result.toPandas() - assert list(result_df) == list(expected) - for column in list(result_df): - assert result_df[column].equals(expected[column]) + assert serialized_df.schema.json() == '{"fields":[{"metadata":{},"name":"key","nullable":false,"type":"binary"},{"metadata":{},"name":"value","nullable":false,"type":"binary"}],"type":"struct"}' + assert df.schema == deserialized_df.schema + assert df.collect() == deserialized_df.collect() def test_get_training_data(self, mocker): # Arrange From 2135c5a779bf5876e3c9a494c283090eaaf31fbd Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Wed, 16 Oct 2024 10:30:59 +0300 Subject: [PATCH 17/37] ruff fix --- python/tests/engine/test_spark.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index 27c3b04d8..c8db9acb9 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -16,6 +16,7 @@ from __future__ import annotations import datetime + import hopsworks_common import numpy import pandas as pd @@ -1756,7 +1757,7 @@ def test_serialize_deserialize_avro(self, mocker): # Assert assert serialized_df.schema.json() == '{"fields":[{"metadata":{},"name":"key","nullable":false,"type":"binary"},{"metadata":{},"name":"value","nullable":false,"type":"binary"}],"type":"struct"}' - assert df.schema == deserialized_df.schema + assert df.schema == deserialized_df.schema assert df.collect() == deserialized_df.collect() def test_get_training_data(self, mocker): From fd63b1692ad1a98d134f599ab1918e626007d52c Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Wed, 16 Oct 2024 11:21:15 +0300 Subject: [PATCH 18/37] work on test --- python/tests/engine/test_spark.py | 102 +++++++++++++++++++++++++++++- 1 file changed, 99 insertions(+), 3 deletions(-) diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index c8db9acb9..d0605b7c7 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -1709,8 +1709,10 @@ def test_save_online_dataframe(self, mocker, backend_fixtures): == 1 ) - def test_serialize_deserialize_avro(self, mocker): + def test_serialize_to_avro(self, mocker): # Arrange + mocker.patch("pyspark.sql.avro.functions.to_avro") + spark_engine = spark.Engine() now = datetime.datetime.now() @@ -1724,7 +1726,7 @@ def test_serialize_deserialize_avro(self, mocker): features = [ feature.Feature(name="account_id", type="str"), - feature.Feature(name="last_played_games", type="xx"), + feature.Feature(name="last_played_games", type="array"), feature.Feature(name="event_time", type="timestamp"), ] @@ -1750,15 +1752,109 @@ def test_serialize_deserialize_avro(self, mocker): dataframe=df, ) + # Assert + assert serialized_df.schema.json() == '{"fields":[{"metadata":{},"name":"key","nullable":false,"type":"binary"},{"metadata":{},"name":"value","nullable":false,"type":"binary"}],"type":"struct"}' + + def test_deserialize_from_avro(self, mocker): + # Arrange + mocker.patch("pyspark.sql.avro.functions.from_avro") + + spark_engine = spark.Engine() + + data = [] + data.append((b"2121", b"21212121")) + data.append((b"1212", b"12121212")) + pandas_df = pd.DataFrame(data, columns =["key", "value"]) + + df = spark_engine._spark_session.createDataFrame(pandas_df) + + features = [ + feature.Feature(name="account_id", type="str"), + feature.Feature(name="last_played_games", type="array"), + feature.Feature(name="event_time", type="timestamp"), + ] + + fg = feature_group.FeatureGroup( + name="test", + version=1, + featurestore_id=99, + primary_key=[], + partition_key=[], + id=10, + features=features, + ) + fg._subject = { + 'id': 1025, + 'subject': 'fg_1', + 'version': 1, + 'schema': '{"type":"record","name":"fg_1","namespace":"test_featurestore.db","fields":[{"name":"account_id","type":["null","string"]},{"name":"last_played_games","type":["null",{"type":"array","items":["null","string"]}]},{"name":"event_time","type":["null",{"type":"long","logicalType":"timestamp-micros"}]}]}' + } + + # Act deserialized_df = spark_engine._deserialize_from_avro( feature_group=fg, - dataframe=serialized_df, + dataframe=df, ) # Assert + assert deserialized_df.schema.json() == '{"fields":[{"metadata":{},"name":"account_id","nullable":true,"type":"string"},{"metadata":{},"name":"last_played_games","nullable":true,"type":{"containsNull":true,"elementType":"string","type":"array"}},{"metadata":{},"name":"event_time","nullable":true,"type":"timestamp"}],"type":"struct"}' + + def test_serialize_deserialize_avro(self, mocker): + # Arrange + spark_engine = spark.Engine() + + now = datetime.datetime.now() + + fg_data = [] + fg_data.append(("ekarson", ["GRAVITY RUSH 2", "KING'S QUEST"], pd.Timestamp(now.timestamp()))) + fg_data.append(("ratmilkdrinker", ["NBA 2K", "CALL OF DUTY"], pd.Timestamp(now.timestamp()))) + pandas_df = pd.DataFrame(fg_data, columns =["account_id", "last_played_games", "event_time"]) + + df = spark_engine._spark_session.createDataFrame(pandas_df) + + features = [ + feature.Feature(name="account_id", type="str"), + feature.Feature(name="last_played_games", type="xx"), + feature.Feature(name="event_time", type="timestamp"), + ] + + fg = feature_group.FeatureGroup( + name="test", + version=1, + featurestore_id=99, + primary_key=[], + partition_key=[], + id=10, + features=features, + ) + fg._subject = { + 'id': 1025, + 'subject': 'fg_1', + 'version': 1, + 'schema': '{"type":"record","name":"fg_1","namespace":"test_featurestore.db","fields":[{"name":"account_id","type":["null","string"]},{"name":"last_played_games","type":["null",{"type":"array","items":["null","string"]}]},{"name":"event_time","type":["null",{"type":"long","logicalType":"timestamp-micros"}]}]}' + } + + # Act + with pytest.raises( + TypeError + ) as e_info: # todo look into this (to_avro from_avro has to be mocked) + serialized_df = spark_engine._serialize_to_avro( + feature_group=fg, + dataframe=df, + ) + + deserialized_df = spark_engine._deserialize_from_avro( + feature_group=fg, + dataframe=serialized_df, + ) + + # Assert + assert str(e_info.value) == "'JavaPackage' object is not callable" + ''' when to_avro/from_avro issue is resolved uncomment this line (it ensures that encoded df can be properly decoded) assert serialized_df.schema.json() == '{"fields":[{"metadata":{},"name":"key","nullable":false,"type":"binary"},{"metadata":{},"name":"value","nullable":false,"type":"binary"}],"type":"struct"}' assert df.schema == deserialized_df.schema assert df.collect() == deserialized_df.collect() + ''' def test_get_training_data(self, mocker): # Arrange From 1e162b6f2b926776ee168639ac09b01dd6a4da10 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Wed, 16 Oct 2024 11:23:33 +0300 Subject: [PATCH 19/37] ruff fix --- python/tests/engine/test_spark.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index d0605b7c7..0ceff6c63 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -1838,6 +1838,11 @@ def test_serialize_deserialize_avro(self, mocker): with pytest.raises( TypeError ) as e_info: # todo look into this (to_avro from_avro has to be mocked) + spark_engine._serialize_to_avro( + feature_group=fg, + dataframe=df, + ) + ''' serialized_df = spark_engine._serialize_to_avro( feature_group=fg, dataframe=df, @@ -1847,6 +1852,7 @@ def test_serialize_deserialize_avro(self, mocker): feature_group=fg, dataframe=serialized_df, ) + ''' # Assert assert str(e_info.value) == "'JavaPackage' object is not callable" From ba1fcacbe3a14ed4503edb8a66208d1e8d8d1100 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Wed, 16 Oct 2024 11:49:54 +0300 Subject: [PATCH 20/37] try setting spark session --- python/tests/engine/test_spark.py | 41 ++++++++++++++----------------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index 0ceff6c63..ffcf294f8 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -40,7 +40,7 @@ from hsfs.hopsworks_udf import udf from hsfs.training_dataset_feature import TrainingDatasetFeature from hsfs.transformation_function import TransformationType -from pyspark.sql import DataFrame +from pyspark.sql import DataFrame, SparkSession from pyspark.sql.types import ( ArrayType, BinaryType, @@ -1711,7 +1711,9 @@ def test_save_online_dataframe(self, mocker, backend_fixtures): def test_serialize_to_avro(self, mocker): # Arrange - mocker.patch("pyspark.sql.avro.functions.to_avro") + SparkSession.builder \ + .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.1.1") \ + .getOrCreate() spark_engine = spark.Engine() @@ -1757,7 +1759,9 @@ def test_serialize_to_avro(self, mocker): def test_deserialize_from_avro(self, mocker): # Arrange - mocker.patch("pyspark.sql.avro.functions.from_avro") + SparkSession.builder \ + .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.1.1") \ + .getOrCreate() spark_engine = spark.Engine() @@ -1801,6 +1805,9 @@ def test_deserialize_from_avro(self, mocker): def test_serialize_deserialize_avro(self, mocker): # Arrange + SparkSession.builder \ + .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.1.1") \ + .getOrCreate() spark_engine = spark.Engine() now = datetime.datetime.now() @@ -1835,32 +1842,20 @@ def test_serialize_deserialize_avro(self, mocker): } # Act - with pytest.raises( - TypeError - ) as e_info: # todo look into this (to_avro from_avro has to be mocked) - spark_engine._serialize_to_avro( - feature_group=fg, - dataframe=df, - ) - ''' - serialized_df = spark_engine._serialize_to_avro( - feature_group=fg, - dataframe=df, - ) + serialized_df = spark_engine._serialize_to_avro( + feature_group=fg, + dataframe=df, + ) - deserialized_df = spark_engine._deserialize_from_avro( - feature_group=fg, - dataframe=serialized_df, - ) - ''' + deserialized_df = spark_engine._deserialize_from_avro( + feature_group=fg, + dataframe=serialized_df, + ) # Assert - assert str(e_info.value) == "'JavaPackage' object is not callable" - ''' when to_avro/from_avro issue is resolved uncomment this line (it ensures that encoded df can be properly decoded) assert serialized_df.schema.json() == '{"fields":[{"metadata":{},"name":"key","nullable":false,"type":"binary"},{"metadata":{},"name":"value","nullable":false,"type":"binary"}],"type":"struct"}' assert df.schema == deserialized_df.schema assert df.collect() == deserialized_df.collect() - ''' def test_get_training_data(self, mocker): # Arrange From 575c0102dfd736cf4774dcbc2943ee4f0f49d324 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Wed, 16 Oct 2024 11:58:37 +0300 Subject: [PATCH 21/37] try more _spark_session --- python/tests/engine/test_spark.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index ffcf294f8..bba587165 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -1711,12 +1711,14 @@ def test_save_online_dataframe(self, mocker, backend_fixtures): def test_serialize_to_avro(self, mocker): # Arrange - SparkSession.builder \ + spark_engine = spark.Engine() + + spark_engine._spark_session.stop() + + spark_engine._spark_session = SparkSession.builder \ .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.1.1") \ .getOrCreate() - spark_engine = spark.Engine() - now = datetime.datetime.now() fg_data = [] @@ -1759,12 +1761,14 @@ def test_serialize_to_avro(self, mocker): def test_deserialize_from_avro(self, mocker): # Arrange - SparkSession.builder \ + spark_engine = spark.Engine() + + spark_engine._spark_session.stop() + + spark_engine._spark_session = SparkSession.builder \ .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.1.1") \ .getOrCreate() - spark_engine = spark.Engine() - data = [] data.append((b"2121", b"21212121")) data.append((b"1212", b"12121212")) @@ -1805,10 +1809,13 @@ def test_deserialize_from_avro(self, mocker): def test_serialize_deserialize_avro(self, mocker): # Arrange - SparkSession.builder \ + spark_engine = spark.Engine() + + spark_engine._spark_session.stop() + + spark_engine._spark_session = SparkSession.builder \ .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.1.1") \ .getOrCreate() - spark_engine = spark.Engine() now = datetime.datetime.now() From ec4b848531e090b8e84fe42b295f67e8b9274acd Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Wed, 16 Oct 2024 13:31:21 +0300 Subject: [PATCH 22/37] small changes --- python/tests/engine/test_spark.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index bba587165..84c27f5d3 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -1716,6 +1716,7 @@ def test_serialize_to_avro(self, mocker): spark_engine._spark_session.stop() spark_engine._spark_session = SparkSession.builder \ + .master("local[*]") \ .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.1.1") \ .getOrCreate() @@ -1766,6 +1767,7 @@ def test_deserialize_from_avro(self, mocker): spark_engine._spark_session.stop() spark_engine._spark_session = SparkSession.builder \ + .master("local[*]") \ .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.1.1") \ .getOrCreate() @@ -1814,6 +1816,7 @@ def test_serialize_deserialize_avro(self, mocker): spark_engine._spark_session.stop() spark_engine._spark_session = SparkSession.builder \ + .master("local[*]") \ .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.1.1") \ .getOrCreate() @@ -1828,7 +1831,7 @@ def test_serialize_deserialize_avro(self, mocker): features = [ feature.Feature(name="account_id", type="str"), - feature.Feature(name="last_played_games", type="xx"), + feature.Feature(name="last_played_games", type="array"), feature.Feature(name="event_time", type="timestamp"), ] From 60573622c459808c8080f5d2b14100b4e1c5bad4 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Wed, 16 Oct 2024 13:39:40 +0300 Subject: [PATCH 23/37] remove SparkSession.builder from tests --- python/tests/engine/test_spark.py | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index 84c27f5d3..a82a098af 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -1713,13 +1713,6 @@ def test_serialize_to_avro(self, mocker): # Arrange spark_engine = spark.Engine() - spark_engine._spark_session.stop() - - spark_engine._spark_session = SparkSession.builder \ - .master("local[*]") \ - .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.1.1") \ - .getOrCreate() - now = datetime.datetime.now() fg_data = [] @@ -1764,13 +1757,6 @@ def test_deserialize_from_avro(self, mocker): # Arrange spark_engine = spark.Engine() - spark_engine._spark_session.stop() - - spark_engine._spark_session = SparkSession.builder \ - .master("local[*]") \ - .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.1.1") \ - .getOrCreate() - data = [] data.append((b"2121", b"21212121")) data.append((b"1212", b"12121212")) @@ -1813,13 +1799,6 @@ def test_serialize_deserialize_avro(self, mocker): # Arrange spark_engine = spark.Engine() - spark_engine._spark_session.stop() - - spark_engine._spark_session = SparkSession.builder \ - .master("local[*]") \ - .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.1.1") \ - .getOrCreate() - now = datetime.datetime.now() fg_data = [] From d40b2d132ee0dd67926af477c8979ef6ace87254 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Wed, 16 Oct 2024 13:41:23 +0300 Subject: [PATCH 24/37] remove unused import --- python/tests/engine/test_spark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index a82a098af..8bb0bce14 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -40,7 +40,7 @@ from hsfs.hopsworks_udf import udf from hsfs.training_dataset_feature import TrainingDatasetFeature from hsfs.transformation_function import TransformationType -from pyspark.sql import DataFrame, SparkSession +from pyspark.sql import DataFrame from pyspark.sql.types import ( ArrayType, BinaryType, From 854eaffd1a60098776dbf4a133bcfe03ba2e6c6a Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Wed, 16 Oct 2024 15:48:33 +0300 Subject: [PATCH 25/37] Set spark jars --- .github/workflows/python.yml | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index b9566d4a1..fdac2eb66 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -86,6 +86,9 @@ jobs: - name: Display Python version run: python --version + - name: Set spark jars + run: wget https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar -P $SPARK_HOME/jars + - name: Run Pytest suite run: pytest python/tests @@ -116,6 +119,9 @@ jobs: cache-dependency-path: "python/setup.py" - run: pip install -e python[python,dev-no-opt] + - name: Set spark jars + run: wget https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar -P $SPARK_HOME/jars + - name: Run Pytest suite run: pytest python/tests @@ -149,6 +155,9 @@ jobs: - name: Display Python version run: python --version + - name: Set spark jars + run: wget https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar -P $SPARK_HOME/jars + - name: Run Pytest suite run: pytest python/tests @@ -181,6 +190,9 @@ jobs: - name: Display Python version run: python --version + + - name: Set spark jars + run: wget https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar -P $SPARK_HOME/jars - name: Run Pytest suite run: pytest python/tests @@ -203,6 +215,9 @@ jobs: cache-dependency-path: "python/setup.py" - run: pip install -e python[python,dev,docs] + - name: Set spark jars + run: wget https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar -P $SPARK_HOME/jars + - name: Run Pytest suite env: HOPSWORKS_RUN_WITH_TYPECHECK: "true" @@ -233,6 +248,9 @@ jobs: - name: Display Python version run: python --version + - name: Set spark jars + run: wget https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar -P $SPARK_HOME/jars + - name: Run Pytest suite run: pytest python/tests @@ -263,5 +281,8 @@ jobs: - name: Display pip freeze run: pip freeze + - name: Set spark jars + run: wget https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar -P $SPARK_HOME/jars + - name: Run Pytest suite run: pytest python/tests From 35933ef1adf71c47e26de993d4c4b13ae88437f3 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Wed, 16 Oct 2024 16:54:33 +0300 Subject: [PATCH 26/37] tmp test fix --- .github/workflows/python.yml | 21 --------------------- python/tests/engine/test_spark.py | 6 ++++++ 2 files changed, 6 insertions(+), 21 deletions(-) diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index fdac2eb66..b9566d4a1 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -86,9 +86,6 @@ jobs: - name: Display Python version run: python --version - - name: Set spark jars - run: wget https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar -P $SPARK_HOME/jars - - name: Run Pytest suite run: pytest python/tests @@ -119,9 +116,6 @@ jobs: cache-dependency-path: "python/setup.py" - run: pip install -e python[python,dev-no-opt] - - name: Set spark jars - run: wget https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar -P $SPARK_HOME/jars - - name: Run Pytest suite run: pytest python/tests @@ -155,9 +149,6 @@ jobs: - name: Display Python version run: python --version - - name: Set spark jars - run: wget https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar -P $SPARK_HOME/jars - - name: Run Pytest suite run: pytest python/tests @@ -190,9 +181,6 @@ jobs: - name: Display Python version run: python --version - - - name: Set spark jars - run: wget https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar -P $SPARK_HOME/jars - name: Run Pytest suite run: pytest python/tests @@ -215,9 +203,6 @@ jobs: cache-dependency-path: "python/setup.py" - run: pip install -e python[python,dev,docs] - - name: Set spark jars - run: wget https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar -P $SPARK_HOME/jars - - name: Run Pytest suite env: HOPSWORKS_RUN_WITH_TYPECHECK: "true" @@ -248,9 +233,6 @@ jobs: - name: Display Python version run: python --version - - name: Set spark jars - run: wget https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar -P $SPARK_HOME/jars - - name: Run Pytest suite run: pytest python/tests @@ -281,8 +263,5 @@ jobs: - name: Display pip freeze run: pip freeze - - name: Set spark jars - run: wget https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar -P $SPARK_HOME/jars - - name: Run Pytest suite run: pytest python/tests diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index 52fa0871a..517e40c7a 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -41,6 +41,7 @@ from hsfs.training_dataset_feature import TrainingDatasetFeature from hsfs.transformation_function import TransformationType from pyspark.sql import DataFrame +from pyspark.sql.functions import lit from pyspark.sql.types import ( ArrayType, BinaryType, @@ -1713,6 +1714,9 @@ def test_serialize_to_avro(self, mocker): # Arrange spark_engine = spark.Engine() + mock_to_avro = mocker.patch('hsfs.engine.spark.to_avro') + mock_to_avro.return_value = lit(b'111') + now = datetime.datetime.now() fg_data = [] @@ -1753,6 +1757,7 @@ def test_serialize_to_avro(self, mocker): # Assert assert serialized_df.schema.json() == '{"fields":[{"metadata":{},"name":"key","nullable":false,"type":"binary"},{"metadata":{},"name":"value","nullable":false,"type":"binary"}],"type":"struct"}' + ''' Need spark to run these tests properly def test_deserialize_from_avro(self, mocker): # Arrange spark_engine = spark.Engine() @@ -1845,6 +1850,7 @@ def test_serialize_deserialize_avro(self, mocker): assert serialized_df.schema.json() == '{"fields":[{"metadata":{},"name":"key","nullable":false,"type":"binary"},{"metadata":{},"name":"value","nullable":false,"type":"binary"}],"type":"struct"}' assert df.schema == deserialized_df.schema assert df.collect() == deserialized_df.collect() + ''' def test_get_training_data(self, mocker): # Arrange From 7cf4193d3d57f760e0cccb3fd3096cddb7bbbc6c Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Wed, 16 Oct 2024 17:13:09 +0300 Subject: [PATCH 27/37] remove datetime --- python/tests/engine/test_spark.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index 517e40c7a..22ec537de 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -15,8 +15,6 @@ # from __future__ import annotations -import datetime - import hopsworks_common import numpy import pandas as pd @@ -1717,19 +1715,16 @@ def test_serialize_to_avro(self, mocker): mock_to_avro = mocker.patch('hsfs.engine.spark.to_avro') mock_to_avro.return_value = lit(b'111') - now = datetime.datetime.now() - fg_data = [] - fg_data.append(("ekarson", ["GRAVITY RUSH 2", "KING'S QUEST"], pd.Timestamp(now.timestamp()))) - fg_data.append(("ratmilkdrinker", ["NBA 2K", "CALL OF DUTY"], pd.Timestamp(now.timestamp()))) - pandas_df = pd.DataFrame(fg_data, columns =["account_id", "last_played_games", "event_time"]) + fg_data.append(("ekarson", ["GRAVITY RUSH 2", "KING'S QUEST"])) + fg_data.append(("ratmilkdrinker", ["NBA 2K", "CALL OF DUTY"])) + pandas_df = pd.DataFrame(fg_data, columns =["account_id", "last_played_games"]) df = spark_engine._spark_session.createDataFrame(pandas_df) features = [ feature.Feature(name="account_id", type="str"), feature.Feature(name="last_played_games", type="array"), - feature.Feature(name="event_time", type="timestamp"), ] fg = feature_group.FeatureGroup( @@ -1745,7 +1740,7 @@ def test_serialize_to_avro(self, mocker): 'id': 1025, 'subject': 'fg_1', 'version': 1, - 'schema': '{"type":"record","name":"fg_1","namespace":"test_featurestore.db","fields":[{"name":"account_id","type":["null","string"]},{"name":"last_played_games","type":["null",{"type":"array","items":["null","string"]}]},{"name":"event_time","type":["null",{"type":"long","logicalType":"timestamp-micros"}]}]}' + 'schema': '{"type":"record","name":"fg_1","namespace":"test_featurestore.db","fields":[{"name":"account_id","type":["null","string"]},{"name":"last_played_games","type":["null",{"type":"array","items":["null","string"]}]}]}' } # Act From 25d2f929989ff1360a661f7d1bf08155f2aac0ad Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Wed, 16 Oct 2024 18:08:35 +0300 Subject: [PATCH 28/37] change location for saving kafka_offsets --- utils/python/hsfs_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index 08d10bd48..7d38c4716 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -274,7 +274,7 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in ) # get offsets - offset_location = entity.location + "_offsets" + offset_location = entity.prepare_spark_location() + "/kafka_offsets" try: if initial_check_point_string: offset_string = json.dumps(_build_starting_offsets(initial_check_point_string)) From dea2e252618ac063242bacbc6319aa2bc16e5e51 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Thu, 17 Oct 2024 15:24:42 +0300 Subject: [PATCH 29/37] python engine add_cols_to_delta_table no-op --- python/hsfs/engine/python.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index b0f354db8..b25eecb95 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1218,6 +1218,10 @@ def save_empty_dataframe( """Wrapper around save_dataframe in order to provide no-op.""" pass + def add_cols_to_delta_table(self, feature_group: FeatureGroup, new_features) -> None: + """Wrapper around add_cols_to_delta_table in order to provide no-op.""" + pass + def _get_app_options( self, user_write_options: Optional[Dict[str, Any]] = None ) -> ingestion_job_conf.IngestionJobConf: From 714981e51f6c330eabf8301e26c98df4c2b03035 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Fri, 18 Oct 2024 10:57:37 +0300 Subject: [PATCH 30/37] update to append_features --- python/hsfs/core/feature_group_engine.py | 4 ++-- python/hsfs/engine/python.py | 6 ++---- python/hsfs/engine/spark.py | 23 ++++++++--------------- 3 files changed, 12 insertions(+), 21 deletions(-) diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index f00a044e1..ba01bc4e3 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -297,9 +297,9 @@ def append_features(self, feature_group, new_features): # write empty dataframe to update parquet schema if feature_group.time_travel_format == "DELTA": - engine.get_instance().add_cols_to_delta_table(feature_group, new_features) + engine.get_instance().add_cols_to_delta_table(feature_group) else: - engine.get_instance().save_empty_dataframe(feature_group, new_features=new_features) + engine.get_instance().save_empty_dataframe(feature_group) def update_description(self, feature_group, description): """Updates the description of a feature group.""" diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index b25eecb95..9b08432e2 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1212,13 +1212,11 @@ def save_stream_dataframe( "Stream ingestion is not available on Python environments, because it requires Spark as engine." ) - def save_empty_dataframe( - self, feature_group: Union[FeatureGroup, ExternalFeatureGroup], new_features=None - ) -> None: + def save_empty_dataframe(self, feature_group: Union[FeatureGroup, ExternalFeatureGroup]) -> None: """Wrapper around save_dataframe in order to provide no-op.""" pass - def add_cols_to_delta_table(self, feature_group: FeatureGroup, new_features) -> None: + def add_cols_to_delta_table(self, feature_group: FeatureGroup) -> None: """Wrapper around add_cols_to_delta_table in order to provide no-op.""" pass diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 471133186..26c0a6cee 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -1324,18 +1324,14 @@ def is_spark_dataframe(self, dataframe): return True return False - def save_empty_dataframe(self, feature_group, new_features=None): + def save_empty_dataframe(self, feature_group): location = feature_group.prepare_spark_location() dataframe = self._spark_session.read.format("hudi").load(location) - if (new_features is not None): - if isinstance(new_features, list): - for new_feature in new_features: - dataframe = dataframe.withColumn(new_feature.name, lit(None).cast(new_feature.type)) - else: - dataframe = dataframe.withColumn(new_features.name, lit(None).cast(new_features.type)) - + for feature in feature_group.features: + if feature.name not in dataframe.columns: + dataframe = dataframe.withColumn(feature.name, lit(None).cast(feature.type)) self.save_dataframe( feature_group, @@ -1347,17 +1343,14 @@ def save_empty_dataframe(self, feature_group, new_features=None): {}, ) - def add_cols_to_delta_table(self, feature_group, new_features): + def add_cols_to_delta_table(self, feature_group): location = feature_group.prepare_spark_location() dataframe = self._spark_session.read.format("delta").load(location) - if (new_features is not None): - if isinstance(new_features, list): - for new_feature in new_features: - dataframe = dataframe.withColumn(new_feature.name, lit("").cast(new_feature.type)) - else: - dataframe = dataframe.withColumn(new_features.name, lit("").cast(new_features.type)) + for feature in feature_group.features: + if feature.name not in dataframe.columns: + dataframe = dataframe.withColumn(feature.name, lit(None).cast(feature.type)) dataframe.limit(0).write.format("delta").mode( "append" From 34c6963ab26ad691787743e43b3b04b168676fa0 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Fri, 18 Oct 2024 11:02:01 +0300 Subject: [PATCH 31/37] ruff fix --- python/hsfs/engine/spark.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 26c0a6cee..2c6d25b76 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -1329,9 +1329,9 @@ def save_empty_dataframe(self, feature_group): dataframe = self._spark_session.read.format("hudi").load(location) - for feature in feature_group.features: - if feature.name not in dataframe.columns: - dataframe = dataframe.withColumn(feature.name, lit(None).cast(feature.type)) + for _feature in feature_group.features: + if _feature.name not in dataframe.columns: + dataframe = dataframe.withColumn(_feature.name, lit(None).cast(_feature.type)) self.save_dataframe( feature_group, @@ -1348,9 +1348,9 @@ def add_cols_to_delta_table(self, feature_group): dataframe = self._spark_session.read.format("delta").load(location) - for feature in feature_group.features: - if feature.name not in dataframe.columns: - dataframe = dataframe.withColumn(feature.name, lit(None).cast(feature.type)) + for _feature in feature_group.features: + if _feature.name not in dataframe.columns: + dataframe = dataframe.withColumn(_feature.name, lit(None).cast(_feature.type)) dataframe.limit(0).write.format("delta").mode( "append" From 4b64f18bbe4754886e7c2bec576cb8e1054cb582 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Fri, 18 Oct 2024 14:06:36 +0300 Subject: [PATCH 32/37] update_table_schema also for delta table --- python/hsfs/core/feature_group_engine.py | 5 +- python/hsfs/core/hudi_engine.py | 19 ------- python/hsfs/engine/python.py | 8 +-- python/hsfs/engine/spark.py | 38 +++++++++++-- python/tests/client/test_base_client.py | 1 - .../tests/core/test_feature_group_engine.py | 2 +- python/tests/engine/test_python.py | 4 +- python/tests/engine/test_spark.py | 57 ++++++++++++++++++- 8 files changed, 95 insertions(+), 39 deletions(-) diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index ba01bc4e3..ea52355f7 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -296,10 +296,7 @@ def append_features(self, feature_group, new_features): ) # write empty dataframe to update parquet schema - if feature_group.time_travel_format == "DELTA": - engine.get_instance().add_cols_to_delta_table(feature_group) - else: - engine.get_instance().save_empty_dataframe(feature_group) + engine.get_instance().update_table_schema(feature_group) def update_description(self, feature_group, description): """Updates the description of a feature group.""" diff --git a/python/hsfs/core/hudi_engine.py b/python/hsfs/core/hudi_engine.py index 4492f0a19..e96b8ea56 100644 --- a/python/hsfs/core/hudi_engine.py +++ b/python/hsfs/core/hudi_engine.py @@ -234,25 +234,6 @@ def _setup_hudi_read_opts(self, hudi_fg_alias, read_options): return hudi_options - def reconcile_hudi_schema( - self, save_empty_dataframe_callback, hudi_fg_alias, read_options - ): - if sorted(self._spark_session.table(hudi_fg_alias.alias).columns) != sorted( - [feature.name for feature in hudi_fg_alias.feature_group._features] + self.HUDI_SPEC_FEATURE_NAMES - ): - full_fg = self._feature_group_api.get( - feature_store_id=hudi_fg_alias.feature_group._feature_store_id, - name=hudi_fg_alias.feature_group.name, - version=hudi_fg_alias.feature_group.version, - ) - - save_empty_dataframe_callback(full_fg) - - self.register_temporary_table( - hudi_fg_alias, - read_options, - ) - @staticmethod def _get_last_commit_metadata(spark_context, base_path): hopsfs_conf = spark_context._jvm.org.apache.hadoop.fs.FileSystem.get( diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 9b08432e2..574ab46fb 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1212,12 +1212,8 @@ def save_stream_dataframe( "Stream ingestion is not available on Python environments, because it requires Spark as engine." ) - def save_empty_dataframe(self, feature_group: Union[FeatureGroup, ExternalFeatureGroup]) -> None: - """Wrapper around save_dataframe in order to provide no-op.""" - pass - - def add_cols_to_delta_table(self, feature_group: FeatureGroup) -> None: - """Wrapper around add_cols_to_delta_table in order to provide no-op.""" + def update_table_schema(self, feature_group: Union[FeatureGroup, ExternalFeatureGroup]) -> None: + """Wrapper around update_table_schema in order to provide no-op.""" pass def _get_app_options( diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 2c6d25b76..e5cdf47c5 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -221,8 +221,8 @@ def register_hudi_temporary_table( read_options, ) - hudi_engine_instance.reconcile_hudi_schema( - self.save_empty_dataframe, hudi_fg_alias, read_options + self.reconcile_schema( + hudi_fg_alias, read_options, hudi_engine_instance ) def register_delta_temporary_table( @@ -241,6 +241,30 @@ def register_delta_temporary_table( read_options, ) + self.reconcile_schema( + delta_fg_alias, read_options, delta_engine_instance + ) + + def reconcile_schema( + self, fg_alias, read_options, engine_instance + ): + if sorted(self._spark_session.table(fg_alias.alias).columns) != sorted( + [feature.name for feature in fg_alias.feature_group._features] + + self.HUDI_SPEC_FEATURE_NAMES if fg_alias.feature_group.time_travel_format == "HUDI" else [] + ): + full_fg = self._feature_group_api.get( + feature_store_id=fg_alias.feature_group._feature_store_id, + name=fg_alias.feature_group.name, + version=fg_alias.feature_group.version, + ) + + self.update_table_schema(full_fg) + + engine_instance.register_temporary_table( + fg_alias, + read_options, + ) + def _return_dataframe_type(self, dataframe, dataframe_type): if dataframe_type.lower() in ["default", "spark"]: return dataframe @@ -1324,7 +1348,13 @@ def is_spark_dataframe(self, dataframe): return True return False - def save_empty_dataframe(self, feature_group): + def update_table_schema(self, feature_group): + if feature_group.time_travel_format == "DELTA": + self._add_cols_to_delta_table(feature_group) + else: + self._save_empty_dataframe(feature_group) + + def _save_empty_dataframe(self, feature_group): location = feature_group.prepare_spark_location() dataframe = self._spark_session.read.format("hudi").load(location) @@ -1343,7 +1373,7 @@ def save_empty_dataframe(self, feature_group): {}, ) - def add_cols_to_delta_table(self, feature_group): + def _add_cols_to_delta_table(self, feature_group): location = feature_group.prepare_spark_location() dataframe = self._spark_session.read.format("delta").load(location) diff --git a/python/tests/client/test_base_client.py b/python/tests/client/test_base_client.py index 2a4e5a3e1..6ea90a7e8 100644 --- a/python/tests/client/test_base_client.py +++ b/python/tests/client/test_base_client.py @@ -20,7 +20,6 @@ import requests from hsfs.client.base import Client from hsfs.client.exceptions import RestAPIError - from tests.util import changes_environ diff --git a/python/tests/core/test_feature_group_engine.py b/python/tests/core/test_feature_group_engine.py index 91f1086ed..cdeaa1eec 100644 --- a/python/tests/core/test_feature_group_engine.py +++ b/python/tests/core/test_feature_group_engine.py @@ -709,7 +709,7 @@ def test_append_features(self, mocker): # Assert assert ( - mock_engine_get_instance.return_value.save_empty_dataframe.call_count == 1 + mock_engine_get_instance.return_value.update_table_schema.call_count == 1 ) assert len(mock_fg_engine_update_features_metadata.call_args[0][1]) == 4 diff --git a/python/tests/engine/test_python.py b/python/tests/engine/test_python.py index 5515cfdb4..80784a236 100644 --- a/python/tests/engine/test_python.py +++ b/python/tests/engine/test_python.py @@ -2565,12 +2565,12 @@ def test_save_stream_dataframe(self): == "Stream ingestion is not available on Python environments, because it requires Spark as engine." ) - def test_save_empty_dataframe(self): + def test_update_table_schema(self): # Arrange python_engine = python.Engine() # Act - result = python_engine.save_empty_dataframe(feature_group=None) + result = python_engine.update_table_schema(feature_group=None) # Assert assert result is None diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index 22ec537de..5cf30b712 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -203,6 +203,7 @@ def test_register_hudi_temporary_table(self, mocker): # Arrange mock_hudi_engine = mocker.patch("hsfs.core.hudi_engine.HudiEngine") mocker.patch("hsfs.feature_group.FeatureGroup.from_response_json") + mock_reconcile_schema = mocker.patch("hsfs.engine.spark.Engine.reconcile_schema") spark_engine = spark.Engine() @@ -220,6 +221,31 @@ def test_register_hudi_temporary_table(self, mocker): # Assert assert mock_hudi_engine.return_value.register_temporary_table.call_count == 1 + assert mock_reconcile_schema.call_count == 1 + + def test_register_delta_temporary_table(self, mocker): + # Arrange + mock_delta_engine = mocker.patch("hsfs.core.delta_engine.DeltaEngine") + mocker.patch("hsfs.feature_group.FeatureGroup.from_response_json") + mock_reconcile_schema = mocker.patch("hsfs.engine.spark.Engine.reconcile_schema") + + spark_engine = spark.Engine() + + hudi_fg_alias = hudi_feature_group_alias.HudiFeatureGroupAlias( + feature_group=None, alias=None + ) + + # Act + spark_engine.register_delta_temporary_table( + delta_fg_alias=hudi_fg_alias, + feature_store_id=None, + feature_store_name=None, + read_options=None, + ) + + # Assert + assert mock_delta_engine.return_value.register_temporary_table.call_count == 1 + assert mock_reconcile_schema.call_count == 1 def test_return_dataframe_type_default(self, mocker): # Arrange @@ -4540,7 +4566,7 @@ def test_is_spark_dataframe_spark_dataframe(self): # Assert assert result is True - def test_save_empty_dataframe(self, mocker): + def test_update_table_schema_hudi(self, mocker): # Arrange mock_spark_engine_save_dataframe = mocker.patch( "hsfs.engine.spark.Engine.save_dataframe" @@ -4560,15 +4586,42 @@ def test_save_empty_dataframe(self, mocker): partition_key=[], id=10, featurestore_name="test_featurestore", + time_travel_format="HUDI", ) # Act - spark_engine.save_empty_dataframe(feature_group=fg) + spark_engine.update_table_schema(feature_group=fg) # Assert assert mock_spark_engine_save_dataframe.call_count == 1 assert mock_spark_read.format.call_count == 1 + def test_update_table_schema_delta(self, mocker): + # Arrange + mock_spark_read = mocker.patch("pyspark.sql.SparkSession.read") + mock_format = mocker.Mock() + mock_spark_read.format.return_value = mock_format + + # Arrange + spark_engine = spark.Engine() + + fg = feature_group.FeatureGroup( + name="test", + version=1, + featurestore_id=99, + primary_key=[], + partition_key=[], + id=10, + featurestore_name="test_featurestore", + time_travel_format="DELTA", + ) + + # Act + spark_engine.update_table_schema(feature_group=fg) + + # Assert + assert mock_spark_read.format.call_count == 1 + def test_apply_transformation_function_single_output_udf_default_mode(self, mocker): # Arrange mocker.patch("hopsworks_common.client.get_instance") From 0a0a5606f8ea3c8f5f6bdd84f1bc1bad99540aff Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Fri, 18 Oct 2024 14:09:36 +0300 Subject: [PATCH 33/37] ruff fix --- python/tests/client/test_base_client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/tests/client/test_base_client.py b/python/tests/client/test_base_client.py index 6ea90a7e8..2a4e5a3e1 100644 --- a/python/tests/client/test_base_client.py +++ b/python/tests/client/test_base_client.py @@ -20,6 +20,7 @@ import requests from hsfs.client.base import Client from hsfs.client.exceptions import RestAPIError + from tests.util import changes_environ From b859e1684500e9f1cd2ac0e627b814636ba0f4d8 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Fri, 18 Oct 2024 14:18:24 +0300 Subject: [PATCH 34/37] small fix --- python/hsfs/engine/spark.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index e5cdf47c5..0582aa585 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -35,6 +35,7 @@ import tzlocal from hopsworks_common.core.constants import HAS_NUMPY, HAS_PANDAS from hsfs.constructor import query +from hsfs.core import feature_group_api # in case importing in %%local from hsfs.core.vector_db_client import VectorDbClient @@ -252,7 +253,7 @@ def reconcile_schema( [feature.name for feature in fg_alias.feature_group._features] + self.HUDI_SPEC_FEATURE_NAMES if fg_alias.feature_group.time_travel_format == "HUDI" else [] ): - full_fg = self._feature_group_api.get( + full_fg = feature_group_api.FeatureGroupApi().get( feature_store_id=fg_alias.feature_group._feature_store_id, name=fg_alias.feature_group.name, version=fg_alias.feature_group.version, From 774d5d66bdae4f66aa5926844704f830ec34b363 Mon Sep 17 00:00:00 2001 From: Ralf <bubriks@gmail.com> Date: Mon, 21 Oct 2024 14:06:35 +0300 Subject: [PATCH 35/37] update table schema job --- python/hsfs/core/feature_group_api.py | 37 +++++++++++++++++++++++- python/hsfs/core/feature_group_engine.py | 2 ++ python/hsfs/engine/python.py | 6 ++-- python/hsfs/engine/spark.py | 2 +- python/tests/engine/test_python.py | 9 +++++- utils/python/hsfs_utils.py | 15 ++++++++++ 6 files changed, 66 insertions(+), 5 deletions(-) diff --git a/python/hsfs/core/feature_group_api.py b/python/hsfs/core/feature_group_api.py index ab05fb9b5..037228c73 100644 --- a/python/hsfs/core/feature_group_api.py +++ b/python/hsfs/core/feature_group_api.py @@ -21,7 +21,12 @@ from hopsworks_common import client from hsfs import feature_group as fg_mod from hsfs import feature_group_commit, util -from hsfs.core import explicit_provenance, ingestion_job, ingestion_job_conf +from hsfs.core import ( + explicit_provenance, + ingestion_job, + ingestion_job_conf, + job, +) class FeatureGroupApi: @@ -416,6 +421,36 @@ def ingestion( ), ) + def update_table_schema( + self, + feature_group_instance: fg_mod.FeatureGroup, + ) -> job.Job: + """ + Setup a Hopsworks job to update table schema + Args: + feature_group_instance: FeatureGroup, required + metadata object of feature group. + job_conf: the configuration for the job application + """ + + _client = client.get_instance() + path_params = [ + "project", + _client._project_id, + "featurestores", + feature_group_instance.feature_store_id, + "featuregroups", + feature_group_instance.id, + "updatetableschema", + ] + + headers = {"content-type": "application/json"} + return job.Job.from_response_json( + _client._send_request( + "POST", path_params, headers=headers + ), + ) + def get_parent_feature_groups( self, feature_group_instance: Union[ diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index ea52355f7..541934d97 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -249,6 +249,8 @@ def commit_delete(feature_group, delete_df, write_options): @staticmethod def delta_vacuum(feature_group, retention_hours): if feature_group.time_travel_format == "DELTA": + # TODO: This should change, DeltaEngine and HudiEngine always assumes spark client! + # Cannot properly manage what should happen when using python. delta_engine_instance = delta_engine.DeltaEngine( feature_group.feature_store_id, feature_group.feature_store_name, diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 574ab46fb..3406ea7d6 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1213,8 +1213,10 @@ def save_stream_dataframe( ) def update_table_schema(self, feature_group: Union[FeatureGroup, ExternalFeatureGroup]) -> None: - """Wrapper around update_table_schema in order to provide no-op.""" - pass + _job = self._feature_group_api.update_table_schema(feature_group) + _job._wait_for_job( + await_termination=True + ) def _get_app_options( self, user_write_options: Optional[Dict[str, Any]] = None diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 0582aa585..c4bb0bc35 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -251,7 +251,7 @@ def reconcile_schema( ): if sorted(self._spark_session.table(fg_alias.alias).columns) != sorted( [feature.name for feature in fg_alias.feature_group._features] + - self.HUDI_SPEC_FEATURE_NAMES if fg_alias.feature_group.time_travel_format == "HUDI" else [] + hudi_engine.HudiEngine.HUDI_SPEC_FEATURE_NAMES if fg_alias.feature_group.time_travel_format == "HUDI" else [] ): full_fg = feature_group_api.FeatureGroupApi().get( feature_store_id=fg_alias.feature_group._feature_store_id, diff --git a/python/tests/engine/test_python.py b/python/tests/engine/test_python.py index 80784a236..250342a12 100644 --- a/python/tests/engine/test_python.py +++ b/python/tests/engine/test_python.py @@ -2565,15 +2565,22 @@ def test_save_stream_dataframe(self): == "Stream ingestion is not available on Python environments, because it requires Spark as engine." ) - def test_update_table_schema(self): + def test_update_table_schema(self, mocker): # Arrange + mock_fg_api = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi") + python_engine = python.Engine() + mock_fg_api.return_value.update_table_schema.return_value.job = job.Job( + 1, "test_job", None, None, None, None + ) + # Act result = python_engine.update_table_schema(feature_group=None) # Assert assert result is None + assert mock_fg_api.return_value.update_table_schema.call_count == 1 def test_get_app_options(self, mocker): # Arrange diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index 7d38c4716..687a1db0c 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -323,6 +323,18 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in offset_df = spark.createDataFrame([offset_dict]) offset_df.coalesce(1).write.mode("overwrite").json(offset_location) +def update_table_schema_fg(spark: SparkSession, job_conf: Dict[Any, Any]) -> None: + """ + Run table schema update job on a feature group. + """ + feature_store = job_conf.pop("feature_store") + fs = get_feature_store_handle(feature_store) + + entity = fs.get_feature_group(name=job_conf["name"], version=job_conf["version"]) + + entity.stream = False + engine.get_instance().update_table_schema(entity) + def _build_starting_offsets(initial_check_point_string: str): if not initial_check_point_string: return "" @@ -358,6 +370,7 @@ def _build_starting_offsets(initial_check_point_string: str): "run_feature_monitoring", "delta_vacuum_fg", "offline_fg_materialization", + "update_table_schema_fg", ], help="Operation type", ) @@ -406,6 +419,8 @@ def parse_isoformat_date(da: str) -> datetime: delta_vacuum_fg(spark, job_conf) elif args.op == "offline_fg_materialization": offline_fg_materialization(spark, job_conf, args.initialCheckPointString) + elif args.op == "update_table_schema_fg": + update_table_schema_fg(spark, job_conf) success = True except Exception: From 317ece065454c27c20530d72dd275187044ddc76 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Mon, 21 Oct 2024 17:59:54 +0300 Subject: [PATCH 36/37] add startingOffsets log --- utils/python/hsfs_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index 687a1db0c..15d7c5d9e 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -290,6 +290,7 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in high=False, ) offset_string = json.dumps(_build_starting_offsets(initial_check_point_string)) + print(f"startingOffsets: {offset_string}") # read kafka topic df = ( From 542b09618a5cdc1961ce247a874eb5a11a5a9297 Mon Sep 17 00:00:00 2001 From: bubriks <bubriks@gmail.com> Date: Tue, 29 Oct 2024 14:07:53 +0200 Subject: [PATCH 37/37] MERGE FIX --- python/tests/engine/test_spark.py | 131 ------------------------------ 1 file changed, 131 deletions(-) diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index e9259d778..05bb33180 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -969,137 +969,6 @@ def test_save_stream_dataframe(self, mocker, backend_fixtures): == 0 ) - def test_save_stream_dataframe_transformations(self, mocker, backend_fixtures): - # Arrange - mock_common_client_get_instance = mocker.patch( - "hopsworks_common.client.get_instance" - ) - mocker.patch("hopsworks_common.client._is_external", return_value=False) - mock_spark_engine_serialize_to_avro = mocker.patch( - "hsfs.engine.spark.Engine._serialize_to_avro" - ) - - mock_engine_get_instance = mocker.patch("hsfs.engine.get_instance") - mock_engine_get_instance.return_value.add_file.return_value = ( - "result_from_add_file" - ) - - mock_storage_connector_api = mocker.patch( - "hsfs.core.storage_connector_api.StorageConnectorApi" - ) - - mock_spark_engine_apply_transformations = mocker.patch( - "hsfs.engine.spark.Engine._apply_transformation_function" - ) - - json = backend_fixtures["storage_connector"]["get_kafka_external"]["response"] - sc = storage_connector.StorageConnector.from_response_json(json) - mock_storage_connector_api.return_value.get_kafka_connector.return_value = sc - - spark_engine = spark.Engine() - - @udf(int) - def test(feature): - return feature + 1 - - fg = feature_group.FeatureGroup( - name="test", - version=1, - featurestore_id=99, - primary_key=[], - partition_key=[], - id=10, - online_topic_name="test_online_topic_name", - transformation_functions=[test], - ) - fg.feature_store = mocker.Mock() - project_id = 1 - fg.feature_store.project_id = project_id - - mock_common_client_get_instance.return_value._project_name = "test_project_name" - - # Act - spark_engine.save_stream_dataframe( - feature_group=fg, - dataframe=None, - query_name=None, - output_mode="test_mode", - await_termination=None, - timeout=None, - checkpoint_dir=None, - write_options={"test_name": "test_value"}, - ) - - # Assert - assert ( - mock_spark_engine_serialize_to_avro.return_value.withColumn.call_args[0][0] - == "headers" - ) - assert ( - mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.call_args[ - 0 - ][0] - == "test_mode" - ) - assert ( - mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.call_args[ - 0 - ][0] - == "kafka" - ) - assert ( - mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ - 0 - ][0] - == "checkpointLocation" - ) - assert ( - mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.call_args[ - 0 - ][1] - == f"/Projects/test_project_name/Resources/{self._get_spark_query_name(project_id, fg)}-checkpoint" - ) - assert ( - mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.call_args[ - 1 - ] - == { - "kafka.bootstrap.servers": "test_bootstrap_servers", - "kafka.security.protocol": "test_security_protocol", - "kafka.ssl.endpoint.identification.algorithm": "test_ssl_endpoint_identification_algorithm", - "kafka.ssl.key.password": "test_ssl_key_password", - "kafka.ssl.keystore.location": "result_from_add_file", - "kafka.ssl.keystore.password": "test_ssl_keystore_password", - "kafka.ssl.truststore.location": "result_from_add_file", - "kafka.ssl.truststore.password": "test_ssl_truststore_password", - "kafka.test_option_name": "test_option_value", - "test_name": "test_value", - } - ) - assert ( - mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ - 0 - ][0] - == "topic" - ) - assert ( - mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.call_args[ - 0 - ][1] - == "test_online_topic_name" - ) - assert ( - mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.call_args[ - 0 - ][0] - == self._get_spark_query_name(project_id, fg) - ) - assert ( - mock_spark_engine_serialize_to_avro.return_value.withColumn.return_value.writeStream.outputMode.return_value.format.return_value.option.return_value.options.return_value.option.return_value.queryName.return_value.start.return_value.awaitTermination.call_count - == 0 - ) - assert mock_spark_engine_apply_transformations.call_count == 1 - def test_save_stream_dataframe_query_name(self, mocker, backend_fixtures): # Arrange mock_common_client_get_instance = mocker.patch(