21
21
22
22
import humps
23
23
from hopsworks_common .client .exceptions import FeatureStoreException
24
+ from hopsworks_common .constants import FEATURES
24
25
from hsfs import util
25
26
from hsfs .core import transformation_function_engine
26
27
from hsfs .core .feature_descriptive_statistics import FeatureDescriptiveStatistics
@@ -77,8 +78,13 @@ def __init__(
77
78
raise FeatureStoreException (
78
79
"Please use the hopsworks_udf decorator when defining transformation functions."
79
80
)
81
+ if not id and hopsworks_udf .output_column_names :
82
+ # Create a copy and reset the output column names of the UDF if the transformation function is newly created and the UDF has output column names assigned already.
83
+ # This happens for example if the same udf is used in a on-demand and a model-dependent transformation function.
84
+ hopsworks_udf ._output_column_names = []
85
+ hopsworks_udf = copy .copy (hopsworks_udf )
80
86
81
- self ._hopsworks_udf : HopsworksUdf = hopsworks_udf
87
+ self .__hopsworks_udf : HopsworksUdf = hopsworks_udf
82
88
TransformationFunction ._validate_transformation_type (
83
89
transformation_type = transformation_type , hopsworks_udf = hopsworks_udf
84
90
)
@@ -152,11 +158,8 @@ def __call__(self, *features: List[str]) -> TransformationFunction:
152
158
"""
153
159
# Deep copy so that the same transformation function can be used to create multiple new transformation function with different features.
154
160
transformation = copy .deepcopy (self )
155
- transformation ._hopsworks_udf = transformation ._hopsworks_udf (* features )
156
- # Regenerate output column names when setting new transformation features.
157
- transformation ._hopsworks_udf .output_column_names = (
158
- transformation ._get_output_column_names ()
159
- )
161
+ transformation .__hopsworks_udf = transformation .__hopsworks_udf (* features )
162
+
160
163
return transformation
161
164
162
165
@classmethod
@@ -227,9 +230,17 @@ def to_dict(self) -> Dict[str, Any]:
227
230
"id" : self ._id ,
228
231
"version" : self ._version ,
229
232
"featurestoreId" : self ._featurestore_id ,
230
- "hopsworksUdf" : self ._hopsworks_udf .to_dict (),
233
+ "hopsworksUdf" : self .hopsworks_udf .to_dict (),
231
234
}
232
235
236
+ def alias (self , * args : str ):
237
+ """
238
+ Set the names of the transformed features output by the transformation function.
239
+ """
240
+ self .__hopsworks_udf .alias (* args )
241
+
242
+ return self
243
+
233
244
def _get_output_column_names (self ) -> str :
234
245
"""
235
246
Function that generates feature names for the transformed features
@@ -240,33 +251,43 @@ def _get_output_column_names(self) -> str:
240
251
# If function name matches the name of an input feature and the transformation function only returns one output feature then
241
252
# then the transformed output feature would have the same name as the input feature. i.e the input feature will get overwritten.
242
253
if (
243
- len (self ._hopsworks_udf .return_types ) == 1
254
+ len (self .__hopsworks_udf .return_types ) == 1
244
255
and any (
245
256
[
246
- self .hopsworks_udf .function_name
257
+ self .__hopsworks_udf .function_name
247
258
== transformation_feature .feature_name
248
- for transformation_feature in self .hopsworks_udf ._transformation_features
259
+ for transformation_feature in self .__hopsworks_udf ._transformation_features
249
260
]
250
261
)
251
262
and (
252
- not self .hopsworks_udf .dropped_features
253
- or self .hopsworks_udf .function_name
254
- not in self .hopsworks_udf .dropped_features
263
+ not self .__hopsworks_udf .dropped_features
264
+ or self .__hopsworks_udf .function_name
265
+ not in self .__hopsworks_udf .dropped_features
255
266
)
256
267
):
257
- return [self .hopsworks_udf .function_name ]
268
+ output_col_names = [self .__hopsworks_udf .function_name ]
258
269
259
270
if self .transformation_type == TransformationType .MODEL_DEPENDENT :
260
- _BASE_COLUMN_NAME = f'{ self ._hopsworks_udf .function_name } _{ "_" .join (self ._hopsworks_udf .transformation_features )} _'
261
- if len (self ._hopsworks_udf .return_types ) > 1 :
262
- return [
271
+ _BASE_COLUMN_NAME = f'{ self .__hopsworks_udf .function_name } _{ "_" .join (self .__hopsworks_udf .transformation_features )} _'
272
+ if len (self .__hopsworks_udf .return_types ) > 1 :
273
+ output_col_names = [
263
274
f"{ _BASE_COLUMN_NAME } { i } "
264
- for i in range (len (self ._hopsworks_udf .return_types ))
275
+ for i in range (len (self .__hopsworks_udf .return_types ))
265
276
]
266
277
else :
267
- return [f"{ _BASE_COLUMN_NAME } " ]
278
+ output_col_names = [f"{ _BASE_COLUMN_NAME } " ]
268
279
elif self .transformation_type == TransformationType .ON_DEMAND :
269
- return [self ._hopsworks_udf .function_name ]
280
+ output_col_names = [self .__hopsworks_udf .function_name ]
281
+
282
+ if any (
283
+ len (output_col_name ) > FEATURES .MAX_LENGTH_NAME
284
+ for output_col_name in output_col_names
285
+ ):
286
+ raise FeatureStoreException (
287
+ f"The default names for output features generated by the transformation function `{ repr (self .__hopsworks_udf )} ` exceeds the maximum length of { FEATURES .MAX_LENGTH_NAME } characters. Please use the `alias` function to assign shorter names to the output features."
288
+ )
289
+
290
+ return output_col_names
270
291
271
292
@staticmethod
272
293
def _validate_transformation_type (
@@ -311,7 +332,10 @@ def version(self, version: int) -> None:
311
332
@property
312
333
def hopsworks_udf (self ) -> HopsworksUdf :
313
334
"""Meta data class for the user defined transformation function."""
314
- return self ._hopsworks_udf
335
+ # Make sure that the output column names for a model-dependent or on-demand transformation function, when accessed externally from the class.
336
+ if self .transformation_type and not self .__hopsworks_udf .output_column_names :
337
+ self .__hopsworks_udf .output_column_names = self ._get_output_column_names ()
338
+ return self .__hopsworks_udf
315
339
316
340
@property
317
341
def transformation_type (self ) -> TransformationType :
@@ -321,41 +345,39 @@ def transformation_type(self) -> TransformationType:
321
345
@transformation_type .setter
322
346
def transformation_type (self , transformation_type ) -> None :
323
347
self ._transformation_type = transformation_type
324
- # Generate output column names when setting transformation type
325
- self ._hopsworks_udf .output_column_names = self ._get_output_column_names ()
326
348
327
349
@property
328
350
def transformation_statistics (
329
351
self ,
330
352
) -> Optional [TransformationStatistics ]:
331
353
"""Feature statistics required for the defined UDF"""
332
- return self .hopsworks_udf .transformation_statistics
354
+ return self .__hopsworks_udf .transformation_statistics
333
355
334
356
@transformation_statistics .setter
335
357
def transformation_statistics (
336
358
self , statistics : List [FeatureDescriptiveStatistics ]
337
359
) -> None :
338
- self .hopsworks_udf .transformation_statistics = statistics
360
+ self .__hopsworks_udf .transformation_statistics = statistics
339
361
# Generate output column names for one-hot encoder after transformation statistics is set.
340
362
# This is done because the number of output columns for one-hot encoding dependents on number of unique values in training dataset statistics.
341
- if self .hopsworks_udf .function_name == "one_hot_encoder" :
342
- self ._hopsworks_udf .output_column_names = self ._get_output_column_names ()
363
+ if self .__hopsworks_udf .function_name == "one_hot_encoder" :
364
+ self .__hopsworks_udf .output_column_names = self ._get_output_column_names ()
343
365
344
366
@property
345
367
def output_column_names (self ) -> List [str ]:
346
368
"""Names of the output columns generated by the transformation functions"""
347
- if self ._hopsworks_udf .function_name == "one_hot_encoder" and len (
348
- self ._hopsworks_udf .output_column_names
349
- ) != len (self ._hopsworks_udf .return_types ):
350
- self ._hopsworks_udf .output_column_names = self ._get_output_column_names ()
351
- return self ._hopsworks_udf .output_column_names
369
+ if (
370
+ self .__hopsworks_udf .function_name == "one_hot_encoder"
371
+ and len (self .__hopsworks_udf .output_column_names )
372
+ != len (self .__hopsworks_udf .return_types )
373
+ ) or not self .__hopsworks_udf .output_column_names :
374
+ self .__hopsworks_udf .output_column_names = self ._get_output_column_names ()
375
+ return self .__hopsworks_udf .output_column_names
352
376
353
377
def __repr__ (self ):
354
378
if self .transformation_type == TransformationType .MODEL_DEPENDENT :
355
- return (
356
- f"Model-Dependent Transformation Function : { repr (self .hopsworks_udf )} "
357
- )
379
+ return f"Model-Dependent Transformation Function : { repr (self .__hopsworks_udf )} "
358
380
elif self .transformation_type == TransformationType .ON_DEMAND :
359
- return f"On-Demand Transformation Function : { repr (self .hopsworks_udf )} "
381
+ return f"On-Demand Transformation Function : { repr (self .__hopsworks_udf )} "
360
382
else :
361
- return f"Transformation Function : { repr (self .hopsworks_udf )} "
383
+ return f"Transformation Function : { repr (self .__hopsworks_udf )} "
0 commit comments