@@ -969,137 +969,6 @@ def test_save_stream_dataframe(self, mocker, backend_fixtures):
969
969
== 0
970
970
)
971
971
972
- def test_save_stream_dataframe_transformations (self , mocker , backend_fixtures ):
973
- # Arrange
974
- mock_common_client_get_instance = mocker .patch (
975
- "hopsworks_common.client.get_instance"
976
- )
977
- mocker .patch ("hopsworks_common.client._is_external" , return_value = False )
978
- mock_spark_engine_serialize_to_avro = mocker .patch (
979
- "hsfs.engine.spark.Engine._serialize_to_avro"
980
- )
981
-
982
- mock_engine_get_instance = mocker .patch ("hsfs.engine.get_instance" )
983
- mock_engine_get_instance .return_value .add_file .return_value = (
984
- "result_from_add_file"
985
- )
986
-
987
- mock_storage_connector_api = mocker .patch (
988
- "hsfs.core.storage_connector_api.StorageConnectorApi"
989
- )
990
-
991
- mock_spark_engine_apply_transformations = mocker .patch (
992
- "hsfs.engine.spark.Engine._apply_transformation_function"
993
- )
994
-
995
- json = backend_fixtures ["storage_connector" ]["get_kafka_external" ]["response" ]
996
- sc = storage_connector .StorageConnector .from_response_json (json )
997
- mock_storage_connector_api .return_value .get_kafka_connector .return_value = sc
998
-
999
- spark_engine = spark .Engine ()
1000
-
1001
- @udf (int )
1002
- def test (feature ):
1003
- return feature + 1
1004
-
1005
- fg = feature_group .FeatureGroup (
1006
- name = "test" ,
1007
- version = 1 ,
1008
- featurestore_id = 99 ,
1009
- primary_key = [],
1010
- partition_key = [],
1011
- id = 10 ,
1012
- online_topic_name = "test_online_topic_name" ,
1013
- transformation_functions = [test ],
1014
- )
1015
- fg .feature_store = mocker .Mock ()
1016
- project_id = 1
1017
- fg .feature_store .project_id = project_id
1018
-
1019
- mock_common_client_get_instance .return_value ._project_name = "test_project_name"
1020
-
1021
- # Act
1022
- spark_engine .save_stream_dataframe (
1023
- feature_group = fg ,
1024
- dataframe = None ,
1025
- query_name = None ,
1026
- output_mode = "test_mode" ,
1027
- await_termination = None ,
1028
- timeout = None ,
1029
- checkpoint_dir = None ,
1030
- write_options = {"test_name" : "test_value" },
1031
- )
1032
-
1033
- # Assert
1034
- assert (
1035
- mock_spark_engine_serialize_to_avro .return_value .withColumn .call_args [0 ][0 ]
1036
- == "headers"
1037
- )
1038
- assert (
1039
- mock_spark_engine_serialize_to_avro .return_value .withColumn .return_value .writeStream .outputMode .call_args [
1040
- 0
1041
- ][0 ]
1042
- == "test_mode"
1043
- )
1044
- assert (
1045
- mock_spark_engine_serialize_to_avro .return_value .withColumn .return_value .writeStream .outputMode .return_value .format .call_args [
1046
- 0
1047
- ][0 ]
1048
- == "kafka"
1049
- )
1050
- assert (
1051
- mock_spark_engine_serialize_to_avro .return_value .withColumn .return_value .writeStream .outputMode .return_value .format .return_value .option .call_args [
1052
- 0
1053
- ][0 ]
1054
- == "checkpointLocation"
1055
- )
1056
- assert (
1057
- mock_spark_engine_serialize_to_avro .return_value .withColumn .return_value .writeStream .outputMode .return_value .format .return_value .option .call_args [
1058
- 0
1059
- ][1 ]
1060
- == f"/Projects/test_project_name/Resources/{ self ._get_spark_query_name (project_id , fg )} -checkpoint"
1061
- )
1062
- assert (
1063
- mock_spark_engine_serialize_to_avro .return_value .withColumn .return_value .writeStream .outputMode .return_value .format .return_value .option .return_value .options .call_args [
1064
- 1
1065
- ]
1066
- == {
1067
- "kafka.bootstrap.servers" : "test_bootstrap_servers" ,
1068
- "kafka.security.protocol" : "test_security_protocol" ,
1069
- "kafka.ssl.endpoint.identification.algorithm" : "test_ssl_endpoint_identification_algorithm" ,
1070
- "kafka.ssl.key.password" : "test_ssl_key_password" ,
1071
- "kafka.ssl.keystore.location" : "result_from_add_file" ,
1072
- "kafka.ssl.keystore.password" : "test_ssl_keystore_password" ,
1073
- "kafka.ssl.truststore.location" : "result_from_add_file" ,
1074
- "kafka.ssl.truststore.password" : "test_ssl_truststore_password" ,
1075
- "kafka.test_option_name" : "test_option_value" ,
1076
- "test_name" : "test_value" ,
1077
- }
1078
- )
1079
- assert (
1080
- mock_spark_engine_serialize_to_avro .return_value .withColumn .return_value .writeStream .outputMode .return_value .format .return_value .option .return_value .options .return_value .option .call_args [
1081
- 0
1082
- ][0 ]
1083
- == "topic"
1084
- )
1085
- assert (
1086
- mock_spark_engine_serialize_to_avro .return_value .withColumn .return_value .writeStream .outputMode .return_value .format .return_value .option .return_value .options .return_value .option .call_args [
1087
- 0
1088
- ][1 ]
1089
- == "test_online_topic_name"
1090
- )
1091
- assert (
1092
- mock_spark_engine_serialize_to_avro .return_value .withColumn .return_value .writeStream .outputMode .return_value .format .return_value .option .return_value .options .return_value .option .return_value .queryName .call_args [
1093
- 0
1094
- ][0 ]
1095
- == self ._get_spark_query_name (project_id , fg )
1096
- )
1097
- assert (
1098
- mock_spark_engine_serialize_to_avro .return_value .withColumn .return_value .writeStream .outputMode .return_value .format .return_value .option .return_value .options .return_value .option .return_value .queryName .return_value .start .return_value .awaitTermination .call_count
1099
- == 0
1100
- )
1101
- assert mock_spark_engine_apply_transformations .call_count == 1
1102
-
1103
972
def test_save_stream_dataframe_query_name (self , mocker , backend_fixtures ):
1104
973
# Arrange
1105
974
mock_common_client_get_instance = mocker .patch (
0 commit comments