Skip to content

Commit 236ead9

Browse files
committed
adding alias function to modify output column names
1 parent ad441ee commit 236ead9

File tree

5 files changed

+135
-49
lines changed

5 files changed

+135
-49
lines changed

python/hopsworks_common/constants.py

+8
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,14 @@ class OPENSEARCH_CONFIG:
7070
CA_CERTS = "ca_certs"
7171

7272

73+
class FEATURES:
74+
"""
75+
Class that stores constants about a feature.
76+
"""
77+
78+
MAX_LENGTH_NAME = 63
79+
80+
7381
class KAFKA_SSL_CONFIG:
7482
"""
7583
Kafka SSL constant strings for configuration

python/hsfs/core/feature_monitoring_config.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import humps
2424
from hopsworks_common.client.exceptions import FeatureStoreException
25+
from hopsworks_common.constants import FEATURES
2526
from hsfs import util
2627
from hsfs.core import (
2728
feature_monitoring_config_engine,
@@ -34,7 +35,6 @@
3435
from hsfs.core.job_schedule import JobSchedule
3536

3637

37-
MAX_LENGTH_NAME = 63
3838
MAX_LENGTH_DESCRIPTION = 2000
3939

4040

@@ -686,8 +686,10 @@ def name(self, name: str):
686686
raise AttributeError("The name of a registered config is read-only.")
687687
elif not isinstance(name, str):
688688
raise TypeError("name must be of type str")
689-
if len(name) > MAX_LENGTH_NAME:
690-
raise ValueError("name must be less than {MAX_LENGTH_NAME} characters.")
689+
if len(name) > FEATURES.MAX_LENGTH_NAME:
690+
raise ValueError(
691+
"name must be less than {FEATURES.MAX_LENGTH_NAME} characters."
692+
)
691693
self._name = name
692694

693695
@property

python/hsfs/hopsworks_udf.py

+57-8
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import humps
2929
from hopsworks_common.client.exceptions import FeatureStoreException
30+
from hopsworks_common.constants import FEATURES
3031
from hsfs import engine, util
3132
from hsfs.core.feature_descriptive_statistics import FeatureDescriptiveStatistics
3233
from hsfs.decorators import typechecked
@@ -173,6 +174,7 @@ def __init__(
173174
dropped_argument_names: Optional[List[str]] = None,
174175
dropped_feature_names: Optional[List[str]] = None,
175176
feature_name_prefix: Optional[str] = None,
177+
output_column_names: Optional[str] = None,
176178
):
177179
self._return_types: List[str] = HopsworksUdf._validate_and_convert_output_types(
178180
return_types
@@ -191,6 +193,12 @@ def __init__(
191193
if isinstance(func, Callable)
192194
else func
193195
)
196+
197+
# The parameter `output_column_names` is initialized lazily.
198+
# It is only initialized if the output column names are retrieved from the backend or explicitly specified using the `alias` function or is initialized with default column names if the UDF is accessed from a transformation function.
199+
# Output column names are only stored in the backend when a model dependent or on demand transformation function is created using the defined UDF.
200+
self._output_column_names: List[str] = []
201+
194202
if not transformation_features:
195203
# New transformation function being declared so extract source code from function
196204
self._transformation_features: List[TransformationFeature] = (
@@ -211,6 +219,7 @@ def __init__(
211219
)
212220
)
213221
self._dropped_features = self._dropped_argument_names
222+
214223
else:
215224
self._transformation_features = transformation_features
216225
self._transformation_function_argument_names = (
@@ -222,15 +231,16 @@ def __init__(
222231
if dropped_feature_names
223232
else dropped_argument_names
224233
)
234+
self._output_column_names = (
235+
output_column_names if output_column_names else []
236+
)
225237

226238
self._formatted_function_source, self._module_imports = (
227239
HopsworksUdf._format_source_code(self._function_source)
228240
)
229241

230242
self._statistics: Optional[TransformationStatistics] = None
231243

232-
self._output_column_names: List[str] = []
233-
234244
@staticmethod
235245
def _validate_and_convert_drop_features(
236246
dropped_features: Union[str, List[str]],
@@ -691,6 +701,41 @@ def __call__(self, *features: List[str]) -> "HopsworksUdf":
691701
udf.dropped_features = updated_dropped_features
692702
return udf
693703

704+
def alias(self, *args: str):
705+
"""
706+
Set the names of the transformed features output by the UDF.
707+
"""
708+
if len(args) == 1 and isinstance(args[0], list):
709+
# If a single list is passed, use it directly
710+
output_col_names = args[0]
711+
else:
712+
# Otherwise, use the individual arguments as a list
713+
output_col_names = list(args)
714+
if any(
715+
not isinstance(output_col_name, str) for output_col_name in output_col_names
716+
):
717+
raise FeatureStoreException(
718+
f"Invalid output feature names provided for the transformation function '{repr(self)}'. Please ensure all arguments are strings."
719+
)
720+
721+
self.output_column_names = output_col_names
722+
723+
return self
724+
725+
def _validate_output_col_name(self, output_col_names):
726+
if any(
727+
len(output_col_name) > FEATURES.MAX_LENGTH_NAME
728+
for output_col_name in output_col_names
729+
):
730+
raise FeatureStoreException(
731+
f"Invalid output feature names specified for the transformation function '{repr(self)}'. Please provide names shorter than {FEATURES.MAX_LENGTH_NAME} characters."
732+
)
733+
734+
if output_col_names and len(output_col_names) != len(self.return_types):
735+
raise FeatureStoreException(
736+
f"Provided names for output columns does not match the number of columns returned from the UDF. Please provide {len(self.return_types)} names."
737+
)
738+
694739
def update_return_type_one_hot(self):
695740
self._return_types = [
696741
self._return_types[0]
@@ -765,6 +810,7 @@ def to_dict(self) -> Dict[str, Any]:
765810
"name": self.function_name,
766811
"featureNamePrefix": self._feature_name_prefix,
767812
"executionMode": self.execution_mode.value.upper(),
813+
"outputColumnNames": self.output_column_names,
768814
}
769815

770816
def json(self) -> str:
@@ -826,6 +872,12 @@ def from_response_json(
826872
else None
827873
)
828874

875+
output_column_names = (
876+
[feature.strip() for feature in json_decamelized["output_column_names"]]
877+
if json_decamelized.get("output_column_names", None)
878+
else None
879+
)
880+
829881
# Reconstructing statistics arguments.
830882
arg_list, _, _, _ = HopsworksUdf._parse_function_signature(function_source_code)
831883

@@ -870,6 +922,7 @@ def from_response_json(
870922
execution_mode=UDFExecutionMode.from_string(
871923
json_decamelized["execution_mode"]
872924
),
925+
output_column_names=output_column_names,
873926
)
874927

875928
# Set transformation features if already set.
@@ -998,12 +1051,8 @@ def transformation_statistics(
9981051
def output_column_names(self, output_col_names: Union[str, List[str]]) -> None:
9991052
if not isinstance(output_col_names, List):
10001053
output_col_names = [output_col_names]
1001-
if not output_col_names and len(output_col_names) != len(self.return_types):
1002-
raise FeatureStoreException(
1003-
f"Provided names for output columns does not match the number of columns returned from the UDF. Please provide {len(self.return_types)} names."
1004-
)
1005-
else:
1006-
self._output_column_names = output_col_names
1054+
self._validate_output_col_name(output_col_names)
1055+
self._output_column_names = output_col_names
10071056

10081057
def __repr__(self):
10091058
return f'{self.function_name}({", ".join(self.transformation_features)})'

python/hsfs/transformation_function.py

+59-37
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import humps
2323
from hopsworks_common.client.exceptions import FeatureStoreException
24+
from hopsworks_common.constants import FEATURES
2425
from hsfs import util
2526
from hsfs.core import transformation_function_engine
2627
from hsfs.core.feature_descriptive_statistics import FeatureDescriptiveStatistics
@@ -77,8 +78,13 @@ def __init__(
7778
raise FeatureStoreException(
7879
"Please use the hopsworks_udf decorator when defining transformation functions."
7980
)
81+
if not id and hopsworks_udf.output_column_names:
82+
# Create a copy and reset the output column names of the UDF if the transformation function is newly created and the UDF has output column names assigned already.
83+
# This happens for example if the same udf is used in a on-demand and a model-dependent transformation function.
84+
hopsworks_udf._output_column_names = []
85+
hopsworks_udf = copy.copy(hopsworks_udf)
8086

81-
self._hopsworks_udf: HopsworksUdf = hopsworks_udf
87+
self.__hopsworks_udf: HopsworksUdf = hopsworks_udf
8288
TransformationFunction._validate_transformation_type(
8389
transformation_type=transformation_type, hopsworks_udf=hopsworks_udf
8490
)
@@ -152,11 +158,8 @@ def __call__(self, *features: List[str]) -> TransformationFunction:
152158
"""
153159
# Deep copy so that the same transformation function can be used to create multiple new transformation function with different features.
154160
transformation = copy.deepcopy(self)
155-
transformation._hopsworks_udf = transformation._hopsworks_udf(*features)
156-
# Regenerate output column names when setting new transformation features.
157-
transformation._hopsworks_udf.output_column_names = (
158-
transformation._get_output_column_names()
159-
)
161+
transformation.__hopsworks_udf = transformation.__hopsworks_udf(*features)
162+
160163
return transformation
161164

162165
@classmethod
@@ -227,9 +230,17 @@ def to_dict(self) -> Dict[str, Any]:
227230
"id": self._id,
228231
"version": self._version,
229232
"featurestoreId": self._featurestore_id,
230-
"hopsworksUdf": self._hopsworks_udf.to_dict(),
233+
"hopsworksUdf": self.hopsworks_udf.to_dict(),
231234
}
232235

236+
def alias(self, *args: str):
237+
"""
238+
Set the names of the transformed features output by the transformation function.
239+
"""
240+
self.__hopsworks_udf.alias(*args)
241+
242+
return self
243+
233244
def _get_output_column_names(self) -> str:
234245
"""
235246
Function that generates feature names for the transformed features
@@ -240,33 +251,43 @@ def _get_output_column_names(self) -> str:
240251
# If function name matches the name of an input feature and the transformation function only returns one output feature then
241252
# then the transformed output feature would have the same name as the input feature. i.e the input feature will get overwritten.
242253
if (
243-
len(self._hopsworks_udf.return_types) == 1
254+
len(self.__hopsworks_udf.return_types) == 1
244255
and any(
245256
[
246-
self.hopsworks_udf.function_name
257+
self.__hopsworks_udf.function_name
247258
== transformation_feature.feature_name
248-
for transformation_feature in self.hopsworks_udf._transformation_features
259+
for transformation_feature in self.__hopsworks_udf._transformation_features
249260
]
250261
)
251262
and (
252-
not self.hopsworks_udf.dropped_features
253-
or self.hopsworks_udf.function_name
254-
not in self.hopsworks_udf.dropped_features
263+
not self.__hopsworks_udf.dropped_features
264+
or self.__hopsworks_udf.function_name
265+
not in self.__hopsworks_udf.dropped_features
255266
)
256267
):
257-
return [self.hopsworks_udf.function_name]
268+
output_col_names = [self.__hopsworks_udf.function_name]
258269

259270
if self.transformation_type == TransformationType.MODEL_DEPENDENT:
260-
_BASE_COLUMN_NAME = f'{self._hopsworks_udf.function_name}_{"_".join(self._hopsworks_udf.transformation_features)}_'
261-
if len(self._hopsworks_udf.return_types) > 1:
262-
return [
271+
_BASE_COLUMN_NAME = f'{self.__hopsworks_udf.function_name}_{"_".join(self.__hopsworks_udf.transformation_features)}_'
272+
if len(self.__hopsworks_udf.return_types) > 1:
273+
output_col_names = [
263274
f"{_BASE_COLUMN_NAME}{i}"
264-
for i in range(len(self._hopsworks_udf.return_types))
275+
for i in range(len(self.__hopsworks_udf.return_types))
265276
]
266277
else:
267-
return [f"{_BASE_COLUMN_NAME}"]
278+
output_col_names = [f"{_BASE_COLUMN_NAME}"]
268279
elif self.transformation_type == TransformationType.ON_DEMAND:
269-
return [self._hopsworks_udf.function_name]
280+
output_col_names = [self.__hopsworks_udf.function_name]
281+
282+
if any(
283+
len(output_col_name) > FEATURES.MAX_LENGTH_NAME
284+
for output_col_name in output_col_names
285+
):
286+
raise FeatureStoreException(
287+
f"The default names for output features generated by the transformation function `{repr(self.__hopsworks_udf)}` exceeds the maximum length of {FEATURES.MAX_LENGTH_NAME} characters. Please use the `alias` function to assign shorter names to the output features."
288+
)
289+
290+
return output_col_names
270291

271292
@staticmethod
272293
def _validate_transformation_type(
@@ -311,7 +332,10 @@ def version(self, version: int) -> None:
311332
@property
312333
def hopsworks_udf(self) -> HopsworksUdf:
313334
"""Meta data class for the user defined transformation function."""
314-
return self._hopsworks_udf
335+
# Make sure that the output column names for a model-dependent or on-demand transformation function, when accessed externally from the class.
336+
if self.transformation_type and not self.__hopsworks_udf.output_column_names:
337+
self.__hopsworks_udf.output_column_names = self._get_output_column_names()
338+
return self.__hopsworks_udf
315339

316340
@property
317341
def transformation_type(self) -> TransformationType:
@@ -321,41 +345,39 @@ def transformation_type(self) -> TransformationType:
321345
@transformation_type.setter
322346
def transformation_type(self, transformation_type) -> None:
323347
self._transformation_type = transformation_type
324-
# Generate output column names when setting transformation type
325-
self._hopsworks_udf.output_column_names = self._get_output_column_names()
326348

327349
@property
328350
def transformation_statistics(
329351
self,
330352
) -> Optional[TransformationStatistics]:
331353
"""Feature statistics required for the defined UDF"""
332-
return self.hopsworks_udf.transformation_statistics
354+
return self.__hopsworks_udf.transformation_statistics
333355

334356
@transformation_statistics.setter
335357
def transformation_statistics(
336358
self, statistics: List[FeatureDescriptiveStatistics]
337359
) -> None:
338-
self.hopsworks_udf.transformation_statistics = statistics
360+
self.__hopsworks_udf.transformation_statistics = statistics
339361
# Generate output column names for one-hot encoder after transformation statistics is set.
340362
# This is done because the number of output columns for one-hot encoding dependents on number of unique values in training dataset statistics.
341-
if self.hopsworks_udf.function_name == "one_hot_encoder":
342-
self._hopsworks_udf.output_column_names = self._get_output_column_names()
363+
if self.__hopsworks_udf.function_name == "one_hot_encoder":
364+
self.__hopsworks_udf.output_column_names = self._get_output_column_names()
343365

344366
@property
345367
def output_column_names(self) -> List[str]:
346368
"""Names of the output columns generated by the transformation functions"""
347-
if self._hopsworks_udf.function_name == "one_hot_encoder" and len(
348-
self._hopsworks_udf.output_column_names
349-
) != len(self._hopsworks_udf.return_types):
350-
self._hopsworks_udf.output_column_names = self._get_output_column_names()
351-
return self._hopsworks_udf.output_column_names
369+
if (
370+
self.__hopsworks_udf.function_name == "one_hot_encoder"
371+
and len(self.__hopsworks_udf.output_column_names)
372+
!= len(self.__hopsworks_udf.return_types)
373+
) or not self.__hopsworks_udf.output_column_names:
374+
self.__hopsworks_udf.output_column_names = self._get_output_column_names()
375+
return self.__hopsworks_udf.output_column_names
352376

353377
def __repr__(self):
354378
if self.transformation_type == TransformationType.MODEL_DEPENDENT:
355-
return (
356-
f"Model-Dependent Transformation Function : {repr(self.hopsworks_udf)}"
357-
)
379+
return f"Model-Dependent Transformation Function : {repr(self.__hopsworks_udf)}"
358380
elif self.transformation_type == TransformationType.ON_DEMAND:
359-
return f"On-Demand Transformation Function : {repr(self.hopsworks_udf)}"
381+
return f"On-Demand Transformation Function : {repr(self.__hopsworks_udf)}"
360382
else:
361-
return f"Transformation Function : {repr(self.hopsworks_udf)}"
383+
return f"Transformation Function : {repr(self.__hopsworks_udf)}"

python/tests/test_transformation_function.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,12 @@ def test2(col1):
237237
transformation_type=TransformationType.MODEL_DEPENDENT,
238238
)
239239

240-
assert tf.hopsworks_udf == test2
240+
# Creating dict representation of udf.
241+
udf_json = test2.to_dict()
242+
# Adding output column names to dict for testing since it would be generated when UDF is accessed out the transformation function.
243+
udf_json["outputColumnNames"] = ["test2_col1_"]
244+
245+
assert tf.hopsworks_udf.to_dict() == udf_json
241246

242247
def test_generate_output_column_names_one_argument_one_output_type(self):
243248
@udf(int)

0 commit comments

Comments
 (0)