diff --git a/docs/templates/api/hopsworks_udf.md b/docs/templates/api/hopsworks_udf.md index cc155c42e..a408a946b 100644 --- a/docs/templates/api/hopsworks_udf.md +++ b/docs/templates/api/hopsworks_udf.md @@ -6,6 +6,10 @@ {{hopsworks_udf_properties}} +## Methods + +{{hopsworks_udf_methods}} + ## TransformationFeature -{{transformation_feature}} \ No newline at end of file +{{transformation_feature}} diff --git a/python/auto_doc.py b/python/auto_doc.py index 49129f52a..fe40e02f2 100644 --- a/python/auto_doc.py +++ b/python/auto_doc.py @@ -451,6 +451,15 @@ "hopsworks_udf_properties": keras_autodoc.get_properties( "hsfs.hopsworks_udf.HopsworksUdf" ), + "hopsworks_udf_methods": keras_autodoc.get_methods( + "hsfs.hopsworks_udf.HopsworksUdf", + exclude=[ + "update_return_type_one_hot", + "python_udf_wrapper", + "pandas_udf_wrapper", + "get_udf", + ], + ), "transformation_feature": ["hsfs.hopsworks_udf.TransformationFeature"], }, "api/transformation_statistics.md": { diff --git a/python/hopsworks_common/constants.py b/python/hopsworks_common/constants.py index b98ed8497..20ec72e77 100644 --- a/python/hopsworks_common/constants.py +++ b/python/hopsworks_common/constants.py @@ -70,6 +70,14 @@ class OPENSEARCH_CONFIG: CA_CERTS = "ca_certs" +class FEATURES: + """ + Class that stores constants about a feature. + """ + + MAX_LENGTH_NAME = 63 + + class KAFKA_SSL_CONFIG: """ Kafka SSL constant strings for configuration diff --git a/python/hsfs/core/feature_monitoring_config.py b/python/hsfs/core/feature_monitoring_config.py index 5b7f9fb71..5668dcaf3 100644 --- a/python/hsfs/core/feature_monitoring_config.py +++ b/python/hsfs/core/feature_monitoring_config.py @@ -22,6 +22,7 @@ import humps from hopsworks_common.client.exceptions import FeatureStoreException +from hopsworks_common.constants import FEATURES from hsfs import util from hsfs.core import ( feature_monitoring_config_engine, @@ -34,7 +35,6 @@ from hsfs.core.job_schedule import JobSchedule -MAX_LENGTH_NAME = 63 MAX_LENGTH_DESCRIPTION = 2000 @@ -686,8 +686,10 @@ def name(self, name: str): raise AttributeError("The name of a registered config is read-only.") elif not isinstance(name, str): raise TypeError("name must be of type str") - if len(name) > MAX_LENGTH_NAME: - raise ValueError("name must be less than {MAX_LENGTH_NAME} characters.") + if len(name) > FEATURES.MAX_LENGTH_NAME: + raise ValueError( + "name must be less than {FEATURES.MAX_LENGTH_NAME} characters." + ) self._name = name @property diff --git a/python/hsfs/hopsworks_udf.py b/python/hsfs/hopsworks_udf.py index 057c16b88..d71191dcf 100644 --- a/python/hsfs/hopsworks_udf.py +++ b/python/hsfs/hopsworks_udf.py @@ -27,6 +27,7 @@ import humps from hopsworks_common.client.exceptions import FeatureStoreException +from hopsworks_common.constants import FEATURES from hsfs import engine, util from hsfs.core.feature_descriptive_statistics import FeatureDescriptiveStatistics from hsfs.decorators import typechecked @@ -146,7 +147,9 @@ class HopsworksUdf: transformation_function_argument_names : `Optional[List[TransformationFeature]]`. The argument names of the transformation function. dropped_argument_names : `Optional[List[str]]`. The arguments to be dropped from the finial DataFrame after the transformation functions are applied. dropped_feature_names : `Optional[List[str]]`. The feature name corresponding to the arguments names that are dropped - feature_name_prefix: `Optional[str]` = None. Prefixes if any used in the feature view. + feature_name_prefix: `Optional[str]`. Prefixes if any used in the feature view. + output_column_names: `Optional[List[str]]`. The names of the output columns returned from the transformation function. + generate_output_col_names: `bool`. Generate default output column names for the transformation function. Default's to True. """ # Mapping for converting python types to spark types - required for creating pandas UDF's. @@ -173,6 +176,8 @@ def __init__( dropped_argument_names: Optional[List[str]] = None, dropped_feature_names: Optional[List[str]] = None, feature_name_prefix: Optional[str] = None, + output_column_names: Optional[str] = None, + generate_output_col_names: bool = True, ): self._return_types: List[str] = HopsworksUdf._validate_and_convert_output_types( return_types @@ -191,6 +196,12 @@ def __init__( if isinstance(func, Callable) else func ) + + # The parameter `output_column_names` is initialized lazily. + # It is only initialized if the output column names are retrieved from the backend or explicitly specified using the `alias` function or is initialized with default column names if the UDF is accessed from a transformation function. + # Output column names are only stored in the backend when a model dependent or on demand transformation function is created using the defined UDF. + self._output_column_names: List[str] = [] + if not transformation_features: # New transformation function being declared so extract source code from function self._transformation_features: List[TransformationFeature] = ( @@ -211,6 +222,7 @@ def __init__( ) ) self._dropped_features = self._dropped_argument_names + else: self._transformation_features = transformation_features self._transformation_function_argument_names = ( @@ -222,6 +234,9 @@ def __init__( if dropped_feature_names else dropped_argument_names ) + self._output_column_names = ( + output_column_names if output_column_names else [] + ) self._formatted_function_source, self._module_imports = ( HopsworksUdf._format_source_code(self._function_source) @@ -229,7 +244,9 @@ def __init__( self._statistics: Optional[TransformationStatistics] = None - self._output_column_names: List[str] = [] + # Denote if the output feature names have to be generated. + # Set to `False` if the output column names are saved in the backend and the udf is constructed from it using `from_response_json` function or if user has specified the output feature names using the `alias`` function. + self._generate_output_col_name: bool = generate_output_col_names @staticmethod def _validate_and_convert_drop_features( @@ -691,6 +708,47 @@ def __call__(self, *features: List[str]) -> "HopsworksUdf": udf.dropped_features = updated_dropped_features return udf + def alias(self, *args: str): + """ + Set the names of the transformed features output by the UDF. + """ + if len(args) == 1 and isinstance(args[0], list): + # If a single list is passed, use it directly + output_col_names = args[0] + else: + # Otherwise, use the individual arguments as a list + output_col_names = list(args) + if any( + not isinstance(output_col_name, str) for output_col_name in output_col_names + ): + raise FeatureStoreException( + f"Invalid output feature names provided for the transformation function '{repr(self)}'. Please ensure all arguments are strings." + ) + + self._generate_output_col_name = False + self.output_column_names = output_col_names + + return self + + def _validate_output_col_name(self, output_col_names): + if any( + len(output_col_name) > FEATURES.MAX_LENGTH_NAME + for output_col_name in output_col_names + ): + raise FeatureStoreException( + f"Invalid output feature names specified for the transformation function '{repr(self)}'. Please provide names shorter than {FEATURES.MAX_LENGTH_NAME} characters." + ) + + if len(output_col_names) != len(set(output_col_names)): + raise FeatureStoreException( + f"Duplicate output feature names provided for the transformation function '{repr(self)}'. Please ensure all arguments names are unique." + ) + + if output_col_names and len(output_col_names) != len(self.return_types): + raise FeatureStoreException( + f"The number of output feature names provided does not match the number of features returned by the transformation function '{repr(self)}'. Pease provide exactly {len(self.return_types)} feature name(s) to match the output." + ) + def update_return_type_one_hot(self): self._return_types = [ self._return_types[0] @@ -765,6 +823,7 @@ def to_dict(self) -> Dict[str, Any]: "name": self.function_name, "featureNamePrefix": self._feature_name_prefix, "executionMode": self.execution_mode.value.upper(), + "outputColumnNames": self.output_column_names, } def json(self) -> str: @@ -826,6 +885,12 @@ def from_response_json( else None ) + output_column_names = ( + [feature.strip() for feature in json_decamelized["output_column_names"]] + if json_decamelized.get("output_column_names", None) + else None + ) + # Reconstructing statistics arguments. arg_list, _, _, _ = HopsworksUdf._parse_function_signature(function_source_code) @@ -858,7 +923,7 @@ def from_response_json( for arg_index in range(len(arg_list)) ] - hopsworks_udf = cls( + hopsworks_udf: HopsworksUdf = cls( func=function_source_code, return_types=output_types, name=function_name, @@ -870,6 +935,8 @@ def from_response_json( execution_mode=UDFExecutionMode.from_string( json_decamelized["execution_mode"] ), + output_column_names=output_column_names, + generate_output_col_names=not output_column_names, # Do not generate output column names if they are retrieved from the back ) # Set transformation features if already set. @@ -998,12 +1065,8 @@ def transformation_statistics( def output_column_names(self, output_col_names: Union[str, List[str]]) -> None: if not isinstance(output_col_names, List): output_col_names = [output_col_names] - if not output_col_names and len(output_col_names) != len(self.return_types): - raise FeatureStoreException( - f"Provided names for output columns does not match the number of columns returned from the UDF. Please provide {len(self.return_types)} names." - ) - else: - self._output_column_names = output_col_names + self._validate_output_col_name(output_col_names) + self._output_column_names = output_col_names def __repr__(self): return f'{self.function_name}({", ".join(self.transformation_features)})' diff --git a/python/hsfs/transformation_function.py b/python/hsfs/transformation_function.py index d39097b9a..a480ee521 100644 --- a/python/hsfs/transformation_function.py +++ b/python/hsfs/transformation_function.py @@ -16,11 +16,13 @@ import copy import json +import logging from enum import Enum from typing import Any, Dict, List, Optional, Union import humps from hopsworks_common.client.exceptions import FeatureStoreException +from hopsworks_common.constants import FEATURES from hsfs import util from hsfs.core import transformation_function_engine from hsfs.core.feature_descriptive_statistics import FeatureDescriptiveStatistics @@ -29,6 +31,9 @@ from hsfs.transformation_statistics import TransformationStatistics +_logger = logging.getLogger(__name__) + + class TransformationType(Enum): """ Class that store the possible types of transformation functions. @@ -78,12 +83,17 @@ def __init__( "Please use the hopsworks_udf decorator when defining transformation functions." ) - self._hopsworks_udf: HopsworksUdf = hopsworks_udf + self.__hopsworks_udf: HopsworksUdf = hopsworks_udf TransformationFunction._validate_transformation_type( transformation_type=transformation_type, hopsworks_udf=hopsworks_udf ) self.transformation_type = transformation_type + if self.__hopsworks_udf._generate_output_col_name: + # Reset output column names so that they would be regenerated. + # Handles the use case in which the same UDF is used to define both on-demand and model dependent transformations. + self.__hopsworks_udf._output_column_names = [] + def save(self) -> None: """Save a transformation function into the backend. @@ -152,11 +162,8 @@ def __call__(self, *features: List[str]) -> TransformationFunction: """ # Deep copy so that the same transformation function can be used to create multiple new transformation function with different features. transformation = copy.deepcopy(self) - transformation._hopsworks_udf = transformation._hopsworks_udf(*features) - # Regenerate output column names when setting new transformation features. - transformation._hopsworks_udf.output_column_names = ( - transformation._get_output_column_names() - ) + transformation.__hopsworks_udf = transformation.__hopsworks_udf(*features) + return transformation @classmethod @@ -227,9 +234,17 @@ def to_dict(self) -> Dict[str, Any]: "id": self._id, "version": self._version, "featurestoreId": self._featurestore_id, - "hopsworksUdf": self._hopsworks_udf.to_dict(), + "hopsworksUdf": self.hopsworks_udf.to_dict(), } + def alias(self, *args: str): + """ + Set the names of the transformed features output by the transformation function. + """ + self.__hopsworks_udf.alias(*args) + + return self + def _get_output_column_names(self) -> str: """ Function that generates feature names for the transformed features @@ -240,33 +255,53 @@ def _get_output_column_names(self) -> str: # If function name matches the name of an input feature and the transformation function only returns one output feature then # then the transformed output feature would have the same name as the input feature. i.e the input feature will get overwritten. if ( - len(self._hopsworks_udf.return_types) == 1 + len(self.__hopsworks_udf.return_types) == 1 and any( [ - self.hopsworks_udf.function_name + self.__hopsworks_udf.function_name == transformation_feature.feature_name - for transformation_feature in self.hopsworks_udf._transformation_features + for transformation_feature in self.__hopsworks_udf._transformation_features ] ) and ( - not self.hopsworks_udf.dropped_features - or self.hopsworks_udf.function_name - not in self.hopsworks_udf.dropped_features + not self.__hopsworks_udf.dropped_features + or self.__hopsworks_udf.function_name + not in self.__hopsworks_udf.dropped_features ) ): - return [self.hopsworks_udf.function_name] + output_col_names = [self.__hopsworks_udf.function_name] if self.transformation_type == TransformationType.MODEL_DEPENDENT: - _BASE_COLUMN_NAME = f'{self._hopsworks_udf.function_name}_{"_".join(self._hopsworks_udf.transformation_features)}_' - if len(self._hopsworks_udf.return_types) > 1: - return [ + _BASE_COLUMN_NAME = f'{self.__hopsworks_udf.function_name}_{"_".join(self.__hopsworks_udf.transformation_features)}_' + if len(self.__hopsworks_udf.return_types) > 1: + output_col_names = [ f"{_BASE_COLUMN_NAME}{i}" - for i in range(len(self._hopsworks_udf.return_types)) + for i in range(len(self.__hopsworks_udf.return_types)) ] else: - return [f"{_BASE_COLUMN_NAME}"] + output_col_names = [f"{_BASE_COLUMN_NAME}"] elif self.transformation_type == TransformationType.ON_DEMAND: - return [self._hopsworks_udf.function_name] + output_col_names = [self.__hopsworks_udf.function_name] + + if any( + len(output_col_name) > FEATURES.MAX_LENGTH_NAME + for output_col_name in output_col_names + ): + _logger.warning( + f"The default output feature names generated by the transformation function {repr(self.__hopsworks_udf)} exceed the maximum allowed length of {FEATURES.MAX_LENGTH_NAME} characters. Default names have been truncated to fit within the size limit. To avoid this, consider using the alias function to explicitly specify output column names." + ) + if len(output_col_names) > 1: + # Slicing the output column names + for index, output_col_name in enumerate(output_col_names): + output_col_names[index] = ( + f"{output_col_name[:FEATURES.MAX_LENGTH_NAME-(len(output_col_names) + 1)]}_{str(index)}" + if len(output_col_name) > FEATURES.MAX_LENGTH_NAME + else output_col_name + ) + else: + output_col_names = [output_col_names[0][: FEATURES.MAX_LENGTH_NAME]] + + return output_col_names @staticmethod def _validate_transformation_type( @@ -311,7 +346,10 @@ def version(self, version: int) -> None: @property def hopsworks_udf(self) -> HopsworksUdf: """Meta data class for the user defined transformation function.""" - return self._hopsworks_udf + # Make sure that the output column names for a model-dependent or on-demand transformation function, when accessed externally from the class. + if self.transformation_type and not self.__hopsworks_udf.output_column_names: + self.__hopsworks_udf.output_column_names = self._get_output_column_names() + return self.__hopsworks_udf @property def transformation_type(self) -> TransformationType: @@ -321,41 +359,39 @@ def transformation_type(self) -> TransformationType: @transformation_type.setter def transformation_type(self, transformation_type) -> None: self._transformation_type = transformation_type - # Generate output column names when setting transformation type - self._hopsworks_udf.output_column_names = self._get_output_column_names() @property def transformation_statistics( self, ) -> Optional[TransformationStatistics]: """Feature statistics required for the defined UDF""" - return self.hopsworks_udf.transformation_statistics + return self.__hopsworks_udf.transformation_statistics @transformation_statistics.setter def transformation_statistics( self, statistics: List[FeatureDescriptiveStatistics] ) -> None: - self.hopsworks_udf.transformation_statistics = statistics + self.__hopsworks_udf.transformation_statistics = statistics # Generate output column names for one-hot encoder after transformation statistics is set. # This is done because the number of output columns for one-hot encoding dependents on number of unique values in training dataset statistics. - if self.hopsworks_udf.function_name == "one_hot_encoder": - self._hopsworks_udf.output_column_names = self._get_output_column_names() + if self.__hopsworks_udf.function_name == "one_hot_encoder": + self.__hopsworks_udf.output_column_names = self._get_output_column_names() @property def output_column_names(self) -> List[str]: """Names of the output columns generated by the transformation functions""" - if self._hopsworks_udf.function_name == "one_hot_encoder" and len( - self._hopsworks_udf.output_column_names - ) != len(self._hopsworks_udf.return_types): - self._hopsworks_udf.output_column_names = self._get_output_column_names() - return self._hopsworks_udf.output_column_names + if ( + self.__hopsworks_udf.function_name == "one_hot_encoder" + and len(self.__hopsworks_udf.output_column_names) + != len(self.__hopsworks_udf.return_types) + ) or not self.__hopsworks_udf.output_column_names: + self.__hopsworks_udf.output_column_names = self._get_output_column_names() + return self.__hopsworks_udf.output_column_names def __repr__(self): if self.transformation_type == TransformationType.MODEL_DEPENDENT: - return ( - f"Model-Dependent Transformation Function : {repr(self.hopsworks_udf)}" - ) + return f"Model-Dependent Transformation Function : {repr(self.__hopsworks_udf)}" elif self.transformation_type == TransformationType.ON_DEMAND: - return f"On-Demand Transformation Function : {repr(self.hopsworks_udf)}" + return f"On-Demand Transformation Function : {repr(self.__hopsworks_udf)}" else: - return f"Transformation Function : {repr(self.hopsworks_udf)}" + return f"Transformation Function : {repr(self.__hopsworks_udf)}" diff --git a/python/tests/test_hopswork_udf.py b/python/tests/test_hopswork_udf.py index d85e5425c..44de7cced 100644 --- a/python/tests/test_hopswork_udf.py +++ b/python/tests/test_hopswork_udf.py @@ -1236,3 +1236,103 @@ def test_validate_and_convert_drop_features_dropped_prefix_invalid(self): str(exp.value) == "Cannot drop features 'test_feature1', 'test_feature2' as they are not features given as arguments in the defined UDF." ) + + def test_alias_one_output(self): + @udf(int) + def add_one(feature): + return feature + 1 + + add_one = add_one.alias("feature_plus_one") + + assert add_one.output_column_names == ["feature_plus_one"] + + def test_alias_one_output_list(self): + @udf(int) + def add_one(feature): + return feature + 1 + + add_one = add_one.alias(["feature_plus_one"]) + + assert add_one.output_column_names == ["feature_plus_one"] + + def test_alias_multiple_output(self): + @udf([int, int]) + def add_and_sub(feature): + return feature + 1, feature - 1 + + add_one = add_and_sub.alias("feature_plus_one", "feature_minus_one") + + assert add_one.output_column_names == ["feature_plus_one", "feature_minus_one"] + + def test_alias_multiple_output_list(self): + @udf([int, int]) + def add_and_sub(feature): + return feature + 1, feature - 1 + + add_one = add_and_sub.alias(["feature_plus_one", "feature_minus_one"]) + + assert add_one.output_column_names == ["feature_plus_one", "feature_minus_one"] + + def test_alias_invalid_number_column_names(self): + @udf([int, int]) + def add_and_sub(feature): + return feature + 1, feature - 1 + + with pytest.raises(FeatureStoreException) as exp: + add_and_sub.alias(["feature_plus_one", "feature_minus_one", "invalid_col"]) + + assert ( + str(exp.value) + == "The number of output feature names provided does not match the number of features returned by the transformation function 'add_and_sub(feature)'. Pease provide exactly 2 feature name(s) to match the output." + ) + + def test_alias_invalid_type(self): + @udf([int, int]) + def add_and_sub(feature): + return feature + 1, feature - 1 + + with pytest.raises(FeatureStoreException) as exp: + add_and_sub.alias("feature_plus_one", {"name": "col1"}) + + assert ( + str(exp.value) + == "Invalid output feature names provided for the transformation function 'add_and_sub(feature)'. Please ensure all arguments are strings." + ) + + def test_alias_duplicates(self): + @udf([int, int]) + def add_and_sub(feature): + return feature + 1, feature - 1 + + with pytest.raises(FeatureStoreException) as exp: + add_and_sub.alias("feature_plus_one", "feature_plus_one") + + assert ( + str(exp.value) + == "Duplicate output feature names provided for the transformation function 'add_and_sub(feature)'. Please ensure all arguments names are unique." + ) + + def test_call_and_alias(self): + @udf([int, int]) + def add_and_sub(feature): + return feature + 1, feature - 1 + + add_one = add_and_sub("feature2").alias( + ["feature_plus_one", "feature_minus_one"] + ) + + assert add_one.output_column_names == ["feature_plus_one", "feature_minus_one"] + assert add_one.transformation_features == ["feature2"] + + def test_alias_invalid_length(self): + @udf([int, int]) + def add_and_sub(feature): + return feature + 1, feature - 1 + + with pytest.raises(FeatureStoreException) as exp: + add_and_sub.alias(["invalid" * 10, "feature_minus_one"]) + + assert ( + str(exp.value) + == "Invalid output feature names specified for the transformation function 'add_and_sub(feature)'. Please provide names shorter than 63 characters." + ) diff --git a/python/tests/test_transformation_function.py b/python/tests/test_transformation_function.py index 305fe04da..2f10d9924 100644 --- a/python/tests/test_transformation_function.py +++ b/python/tests/test_transformation_function.py @@ -15,6 +15,8 @@ # +import logging + import pandas as pd import pytest from hsfs.client.exceptions import FeatureStoreException @@ -237,9 +239,14 @@ def test2(col1): transformation_type=TransformationType.MODEL_DEPENDENT, ) - assert tf.hopsworks_udf == test2 + # Creating dict representation of udf. + udf_json = test2.to_dict() + # Adding output column names to dict for testing since it would be generated when UDF is accessed out the transformation function. + udf_json["outputColumnNames"] = ["test2_col1_"] + + assert tf.hopsworks_udf.to_dict() == udf_json - def test_generate_output_column_names_one_argument_one_output_type(self): + def test_generate_output_column_names_one_argument_one_output_type_mdt(self): @udf(int) def test_func(col1): return col1 + 1 @@ -251,6 +258,11 @@ def test_func(col1): ) assert mdt._get_output_column_names() == ["test_func_col1_"] + def test_generate_output_column_names_one_argument_one_output_type_odt(self): + @udf(int) + def test_func(col1): + return col1 + 1 + odt = TransformationFunction( featurestore_id=10, hopsworks_udf=test_func, @@ -258,7 +270,7 @@ def test_func(col1): ) assert odt._get_output_column_names() == ["test_func"] - def test_generate_output_column_names_one_argument_one_output_type_prefix(self): + def test_generate_output_column_names_one_argument_one_output_type_prefix_mdt(self): @udf(int) def test_func(col1): return col1 + 1 @@ -271,7 +283,13 @@ def test_func(col1): transformation_type=TransformationType.MODEL_DEPENDENT, ) assert mdt._get_output_column_names() == ["test_func_prefix_col1_"] - assert mdt.output_column_names == ["prefix_test_func_prefix_col1_"] + + def test_generate_output_column_names_one_argument_one_output_type_prefix_odt(self): + @udf(int) + def test_func(col1): + return col1 + 1 + + test_func._feature_name_prefix = "prefix_" odt = TransformationFunction( featurestore_id=10, @@ -281,7 +299,7 @@ def test_func(col1): assert odt._get_output_column_names() == ["test_func"] assert odt.output_column_names == ["prefix_test_func"] - def test_generate_output_column_names_multiple_argument_one_output_type(self): + def test_generate_output_column_names_multiple_argument_one_output_type_mdt(self): @udf(int) def test_func(col1, col2, col3): return col1 + 1 @@ -292,6 +310,12 @@ def test_func(col1, col2, col3): transformation_type=TransformationType.MODEL_DEPENDENT, ) assert mdt._get_output_column_names() == ["test_func_col1_col2_col3_"] + + def test_generate_output_column_names_multiple_argument_one_output_type_odt(self): + @udf(int) + def test_func(col1, col2, col3): + return col1 + 1 + odt = TransformationFunction( featurestore_id=10, hopsworks_udf=test_func, @@ -299,7 +323,7 @@ def test_func(col1, col2, col3): ) assert odt._get_output_column_names() == ["test_func"] - def test_generate_output_column_names_multiple_argument_one_output_type_prefix( + def test_generate_output_column_names_multiple_argument_one_output_type_prefix_mdt( self, ): @udf(int) @@ -319,6 +343,16 @@ def test_func(col1, col2, col3): assert mdt.output_column_names == [ "prefix_test_func_prefix_col1_prefix_col2_prefix_col3_" ] + + def test_generate_output_column_names_multiple_argument_one_output_type_prefix_odt( + self, + ): + @udf(int) + def test_func(col1, col2, col3): + return col1 + 1 + + test_func._feature_name_prefix = "prefix_" + odt = TransformationFunction( featurestore_id=10, hopsworks_udf=test_func, @@ -327,7 +361,9 @@ def test_func(col1, col2, col3): assert odt._get_output_column_names() == ["test_func"] assert odt.output_column_names == ["prefix_test_func"] - def test_generate_output_column_names_single_argument_multiple_output_type(self): + def test_generate_output_column_names_single_argument_multiple_output_type_mdt( + self, + ): @udf([int, float, int]) def test_func(col1): return pd.DataFrame( @@ -345,7 +381,7 @@ def test_func(col1): "test_func_col1_2", ] - def test_generate_output_column_names_single_argument_multiple_output_type_prefix( + def test_generate_output_column_names_single_argument_multiple_output_type_prefix_mdt( self, ): @udf([int, float, int]) @@ -372,7 +408,9 @@ def test_func(col1): "prefix_test_func_prefix_col1_2", ] - def test_generate_output_column_names_multiple_argument_multiple_output_type(self): + def test_generate_output_column_names_multiple_argument_multiple_output_type_mdt( + self, + ): @udf([int, float, int]) def test_func(col1, col2, col3): return pd.DataFrame( @@ -390,7 +428,7 @@ def test_func(col1, col2, col3): "test_func_col1_col2_col3_2", ] - def test_generate_output_column_names_multiple_argument_multiple_output_type_prefix( + def test_generate_output_column_names_multiple_argument_multiple_output_type_prefix_mdt( self, ): @udf([int, float, int]) @@ -454,3 +492,379 @@ def test_func(col1, statistics=stats): str(exe.value) == "On-Demand Transformation functions cannot use statistics, please remove statistics parameters from the functions" ) + + def test_alias_one_output_mdt(self): + @udf(int) + def add_one(feature): + return feature + 1 + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + mdt = mdt.alias("feature_plus_one_mdt") + + assert mdt.output_column_names == ["feature_plus_one_mdt"] + + def test_alias_one_output_odt(self): + @udf(int) + def add_one(feature): + return feature + 1 + + odt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.ON_DEMAND, + ) + + odt = odt.alias("feature_plus_one_odt") + + assert odt.output_column_names == ["feature_plus_one_odt"] + + def test_alias_one_output_list_mdt(self): + @udf(int) + def add_one(feature): + return feature + 1 + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + mdt = mdt.alias(["feature_plus_one_mdt"]) + + assert mdt.output_column_names == ["feature_plus_one_mdt"] + + def test_alias_one_output_list_odt(self): + @udf(int) + def add_one(feature): + return feature + 1 + + odt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.ON_DEMAND, + ) + + odt = odt.alias(["feature_plus_one_odt"]) + + assert odt.output_column_names == ["feature_plus_one_odt"] + + def test_alias_multiple_output_mdt(self): + @udf([int, int]) + def add_and_sub(feature): + return feature + 1, feature - 1 + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_and_sub, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + mdt = mdt.alias("feature_plus_one_mdt", "feature_minus_one_mdt") + + assert mdt.output_column_names == [ + "feature_plus_one_mdt", + "feature_minus_one_mdt", + ] + + def test_alias_multiple_output_list_mdt(self): + @udf([int, int]) + def add_and_sub(feature): + return feature + 1, feature - 1 + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_and_sub, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + mdt = mdt.alias(["feature_plus_one_mdt", "feature_minus_one_mdt"]) + + assert mdt.output_column_names == [ + "feature_plus_one_mdt", + "feature_minus_one_mdt", + ] + + def test_alias_invalid_number_column_names_mdt(self): + @udf([int, int]) + def add_and_sub(feature): + return feature + 1, feature - 1 + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_and_sub, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + with pytest.raises(FeatureStoreException) as exp: + mdt.alias(["feature_plus_one", "feature_minus_one", "invalid_col"]) + + assert ( + str(exp.value) + == "The number of output feature names provided does not match the number of features returned by the transformation function 'add_and_sub(feature)'. Pease provide exactly 2 feature name(s) to match the output." + ) + + def test_alias_invalid_number_column_names_odt(self): + @udf(int) + def add_one(feature): + return feature + 1 + + odt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.ON_DEMAND, + ) + + with pytest.raises(FeatureStoreException) as exp: + odt.alias(["feature_plus_one", "feature_minus_one", "invalid_col"]) + + assert ( + str(exp.value) + == "The number of output feature names provided does not match the number of features returned by the transformation function 'add_one(feature)'. Pease provide exactly 1 feature name(s) to match the output." + ) + + def test_alias_invalid_type_mdt(self): + @udf([int]) + def add_one(feature): + return feature + 1 + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + with pytest.raises(FeatureStoreException) as exp: + mdt.alias({"name": "col1"}) + + assert ( + str(exp.value) + == "Invalid output feature names provided for the transformation function 'add_one(feature)'. Please ensure all arguments are strings." + ) + + def test_alias_invalid_type_odt(self): + @udf([int]) + def add_one(feature): + return feature + 1 + + odt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.ON_DEMAND, + ) + + with pytest.raises(FeatureStoreException) as exp: + odt.alias({"name": "col1"}) + + assert ( + str(exp.value) + == "Invalid output feature names provided for the transformation function 'add_one(feature)'. Please ensure all arguments are strings." + ) + + def test_alias_duplicates_mdt(self): + @udf([int, int]) + def add_and_sub(feature): + return feature + 1, feature - 1 + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_and_sub, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + with pytest.raises(FeatureStoreException) as exp: + mdt.alias("feature_plus_one", "feature_plus_one") + + assert ( + str(exp.value) + == "Duplicate output feature names provided for the transformation function 'add_and_sub(feature)'. Please ensure all arguments names are unique." + ) + + def test_call_and_alias_mdt(self): + @udf(int) + def add_one(feature): + return feature + 1 + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + mdt = mdt("feature2_mdt").alias(["feature_plus_one_mdt"]) + + assert mdt.output_column_names == ["feature_plus_one_mdt"] + assert mdt.hopsworks_udf.transformation_features == ["feature2_mdt"] + + def test_call_and_alias_odt(self): + @udf(int) + def add_one(feature): + return feature + 1 + + odt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.ON_DEMAND, + ) + + odt = odt("feature2_odt").alias(["feature_plus_one_odt"]) + + assert odt.output_column_names == ["feature_plus_one_odt"] + assert odt.hopsworks_udf.transformation_features == ["feature2_odt"] + + def test_alias_invalid_length_mdt(self): + @udf(int) + def add_one(feature): + return feature + 1 + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + with pytest.raises(FeatureStoreException) as exp: + mdt.alias(["invalid" * 10]) + + assert ( + str(exp.value) + == "Invalid output feature names specified for the transformation function 'add_one(feature)'. Please provide names shorter than 63 characters." + ) + + def test_alias_invalid_length_odt(self): + @udf(int) + def add_one(feature): + return feature + 1 + + odt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=add_one, + transformation_type=TransformationType.ON_DEMAND, + ) + + with pytest.raises(FeatureStoreException) as exp: + odt.alias(["invalid" * 10]) + + assert ( + str(exp.value) + == "Invalid output feature names specified for the transformation function 'add_one(feature)'. Please provide names shorter than 63 characters." + ) + + def test_generated_output_col_name_invalid_mdt(self, caplog): + @udf(int) + def test( + long_feature_name1, + long_feature_name2, + long_feature_name3, + long_feature_name4, + long_feature_name5, + long_feature_name6, + long_feature_name7, + ): + return long_feature_name1 + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=test, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + with caplog.at_level(logging.WARNING): + mdt._get_output_column_names() + + assert ( + "The default output feature names generated by the transformation function test(long_feature_name1, long_feature_name2, long_feature_name3, long_feature_name4, long_feature_name5, long_feature_name6, long_feature_name7) exceed the maximum allowed length of 63 characters. Default names have been truncated to fit within the size limit. To avoid this, consider using the alias function to explicitly specify output column names." + in caplog.text + ) + + assert mdt.output_column_names == [ + "test_long_feature_name1_long_feature_name2_long_feature_name3_l" + ] + + def test_generated_output_col_name_invalid_mdt_multiple_returns(self, caplog): + @udf([int, int, int]) + def test( + long_feature_name1, + long_feature_name2, + long_feature_name3, + long_feature_name4, + long_feature_name5, + long_feature_name6, + long_feature_name7, + ): + return long_feature_name1, long_feature_name2, long_feature_name3 + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=test, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + with caplog.at_level(logging.WARNING): + mdt._get_output_column_names() + + assert ( + "The default output feature names generated by the transformation function test(long_feature_name1, long_feature_name2, long_feature_name3, long_feature_name4, long_feature_name5, long_feature_name6, long_feature_name7) exceed the maximum allowed length of 63 characters. Default names have been truncated to fit within the size limit. To avoid this, consider using the alias function to explicitly specify output column names." + in caplog.text + ) + + assert mdt.output_column_names == [ + "test_long_feature_name1_long_feature_name2_long_feature_nam_0", + "test_long_feature_name1_long_feature_name2_long_feature_nam_1", + "test_long_feature_name1_long_feature_name2_long_feature_nam_2", + ] + + def test_generate_output_col_name_invalid_odt(self, caplog): + @udf(int) + def really_long_function_name_that_exceed_63_characters_causing_invalid_name_for_on_demand_features( + features, + ): + return features + + odt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=really_long_function_name_that_exceed_63_characters_causing_invalid_name_for_on_demand_features, + transformation_type=TransformationType.ON_DEMAND, + ) + + with caplog.at_level(logging.WARNING): + odt._get_output_column_names() + + assert ( + "The default output feature names generated by the transformation function really_long_function_name_that_exceed_63_characters_causing_invalid_name_for_on_demand_features(features) exceed the maximum allowed length of 63 characters. Default names have been truncated to fit within the size limit. To avoid this, consider using the alias function to explicitly specify output column names." + in caplog.text + ) + + assert odt.hopsworks_udf.output_column_names == [ + "really_long_function_name_that_exceed_63_characters_causing_inv" + ] + + def test_generate_output_col_name_invalid_mdt(self, caplog): + @udf(int) + def really_long_function_name_that_exceed_63_characters_causing_invalid_name_for_on_demand_features( + features, + ): + return features + + mdt = TransformationFunction( + featurestore_id=10, + hopsworks_udf=really_long_function_name_that_exceed_63_characters_causing_invalid_name_for_on_demand_features, + transformation_type=TransformationType.MODEL_DEPENDENT, + ) + + with caplog.at_level(logging.WARNING): + mdt._get_output_column_names() + + assert ( + "The default output feature names generated by the transformation function really_long_function_name_that_exceed_63_characters_causing_invalid_name_for_on_demand_features(features) exceed the maximum allowed length of 63 characters. Default names have been truncated to fit within the size limit. To avoid this, consider using the alias function to explicitly specify output column names." + in caplog.text + ) + + # Asserting that the output column names generate are stripped to the required length + assert mdt.hopsworks_udf.output_column_names == [ + "really_long_function_name_that_exceed_63_characters_causing_inv" + ]