Skip to content

Commit 39eb6e6

Browse files
authored
[FSTORE-1507] Python UDF's (#300)
* python udf working code * adding comments and type hints * adding unit tests * fixing unit tests * fixing unit tests * adding support for python udfs in vector server * adding unit tests and allowing features to be overwritten * removing prints and also updating renaming string * adding unit test for date time tests * fixing engine not found issue in tests * fixing unit tests * fixing unit tests * making datetime unit tests consitent and same as the earlier existing tests * enabling support to overwrite features in spark * correcting the order of elements when overwriting features for python kernel * udpated attribute names as per review comments * converting execution mode to upper to match enum in backend * skiping polars transformation function tests if polars is not installed
1 parent 1c40438 commit 39eb6e6

13 files changed

+3226
-272
lines changed

python/hsfs/core/vector_server.py

+67-26
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
from hsfs.core import (
5252
transformation_function_engine as tf_engine_mod,
5353
)
54+
from hsfs.hopsworks_udf import UDFExecutionMode
5455

5556

5657
if HAS_NUMPY:
@@ -999,43 +1000,83 @@ def apply_on_demand_transformations(
9991000
) -> dict:
10001001
_logger.debug("Applying On-Demand transformation functions.")
10011002
for tf in self._on_demand_transformation_functions:
1002-
# Check if feature provided as request parameter if not get it from retrieved feature vector.
1003-
features = [
1004-
pd.Series(request_parameter[feature])
1005-
if feature in request_parameter.keys()
1006-
else (
1007-
pd.Series(
1008-
rows[feature]
1009-
if (not isinstance(rows[feature], pd.Series))
1010-
else rows[feature]
1003+
if (
1004+
tf.hopsworks_udf.execution_mode.get_current_execution_mode(online=True)
1005+
== UDFExecutionMode.PANDAS
1006+
):
1007+
# Check if feature provided as request parameter if not get it from retrieved feature vector.
1008+
features = [
1009+
pd.Series(request_parameter[feature])
1010+
if feature in request_parameter.keys()
1011+
else (
1012+
pd.Series(
1013+
rows[feature]
1014+
if (not isinstance(rows[feature], pd.Series))
1015+
else rows[feature]
1016+
)
10111017
)
1012-
)
1013-
for feature in tf.hopsworks_udf.transformation_features
1014-
]
1015-
on_demand_feature = tf.hopsworks_udf.get_udf(force_python_udf=True)(
1018+
for feature in tf.hopsworks_udf.transformation_features
1019+
]
1020+
else:
1021+
# No need to cast to pandas Series for Python UDF's
1022+
features = [
1023+
request_parameter[feature]
1024+
if feature in request_parameter.keys()
1025+
else rows[feature]
1026+
for feature in tf.hopsworks_udf.transformation_features
1027+
]
1028+
1029+
on_demand_feature = tf.hopsworks_udf.get_udf(online=True)(
10161030
*features
10171031
) # Get only python compatible UDF irrespective of engine
1018-
1019-
rows[on_demand_feature.name] = on_demand_feature.values[0]
1032+
if (
1033+
tf.hopsworks_udf.execution_mode.get_current_execution_mode(online=True)
1034+
== UDFExecutionMode.PANDAS
1035+
):
1036+
rows[on_demand_feature.name] = on_demand_feature.values[0]
1037+
else:
1038+
rows[tf.output_column_names[0]] = on_demand_feature
10201039
return rows
10211040

10221041
def apply_model_dependent_transformations(self, rows: Union[dict, pd.DataFrame]):
10231042
_logger.debug("Applying Model-Dependent transformation functions.")
10241043
for tf in self.model_dependent_transformation_functions:
1025-
features = [
1026-
pd.Series(rows[feature])
1027-
if (not isinstance(rows[feature], pd.Series))
1028-
else rows[feature]
1029-
for feature in tf.hopsworks_udf.transformation_features
1030-
]
1031-
transformed_result = tf.hopsworks_udf.get_udf(force_python_udf=True)(
1044+
if (
1045+
tf.hopsworks_udf.execution_mode.get_current_execution_mode(online=True)
1046+
== UDFExecutionMode.PANDAS
1047+
):
1048+
features = [
1049+
pd.Series(rows[feature])
1050+
if (not isinstance(rows[feature], pd.Series))
1051+
else rows[feature]
1052+
for feature in tf.hopsworks_udf.transformation_features
1053+
]
1054+
else:
1055+
# No need to cast to pandas Series for Python UDF's
1056+
# print("executing as python udfs")
1057+
features = [
1058+
rows[feature]
1059+
for feature in tf.hopsworks_udf.transformation_features
1060+
]
1061+
transformed_result = tf.hopsworks_udf.get_udf(online=True)(
10321062
*features
10331063
) # Get only python compatible UDF irrespective of engine
1034-
if isinstance(transformed_result, pd.Series):
1035-
rows[transformed_result.name] = transformed_result.values[0]
1064+
1065+
if (
1066+
tf.hopsworks_udf.execution_mode.get_current_execution_mode(online=True)
1067+
== UDFExecutionMode.PANDAS
1068+
):
1069+
if isinstance(transformed_result, pd.Series):
1070+
rows[transformed_result.name] = transformed_result.values[0]
1071+
else:
1072+
for col in transformed_result:
1073+
rows[col] = transformed_result[col].values[0]
10361074
else:
1037-
for col in transformed_result:
1038-
rows[col] = transformed_result[col].values[0]
1075+
if isinstance(transformed_result, tuple):
1076+
for index, result in enumerate(transformed_result):
1077+
rows[tf.output_column_names[index]] = result
1078+
else:
1079+
rows[tf.output_column_names[0]] = transformed_result
10391080
return rows
10401081

10411082
def apply_transformation(

python/hsfs/engine/python.py

+122-13
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
from hsfs.core.type_systems import PYARROW_HOPSWORKS_DTYPE_MAPPING
9191
from hsfs.core.vector_db_client import VectorDbClient
9292
from hsfs.feature_group import ExternalFeatureGroup, FeatureGroup
93+
from hsfs.hopsworks_udf import HopsworksUdf, UDFExecutionMode
9394
from hsfs.training_dataset import TrainingDataset
9495
from hsfs.training_dataset_feature import TrainingDatasetFeature
9596
from hsfs.training_dataset_split import TrainingDatasetSplit
@@ -1262,6 +1263,7 @@ def _apply_transformation_function(
12621263
self,
12631264
transformation_functions: List[transformation_function.TransformationFunction],
12641265
dataset: Union[pd.DataFrame, pl.DataFrame],
1266+
online_inference: bool = False,
12651267
) -> Union[pd.DataFrame, pl.DataFrame]:
12661268
"""
12671269
Apply transformation function to the dataframe.
@@ -1299,22 +1301,129 @@ def _apply_transformation_function(
12991301
)
13001302
if tf.hopsworks_udf.dropped_features:
13011303
dropped_features.update(tf.hopsworks_udf.dropped_features)
1302-
dataset = pd.concat(
1304+
1305+
if (
1306+
hopsworks_udf.execution_mode.get_current_execution_mode(
1307+
online=online_inference
1308+
)
1309+
== UDFExecutionMode.PANDAS
1310+
):
1311+
dataset = self._apply_pandas_udf(
1312+
hopsworks_udf=hopsworks_udf, dataframe=dataset
1313+
)
1314+
else:
1315+
dataset = self._apply_python_udf(
1316+
hopsworks_udf=hopsworks_udf, dataframe=dataset
1317+
)
1318+
dataset = dataset.drop(dropped_features, axis=1)
1319+
return dataset
1320+
1321+
def _apply_python_udf(
1322+
self,
1323+
hopsworks_udf: HopsworksUdf,
1324+
dataframe: Union[pd.DataFrame, pl.DataFrame],
1325+
) -> Union[pd.DataFrame, pl.DataFrame]:
1326+
"""
1327+
Apply a python udf to a dataframe
1328+
1329+
# Arguments
1330+
transformation_functions `List[transformation_function.TransformationFunction]` : List of transformation functions.
1331+
dataset `Union[pd.DataFrame, pl.DataFrame]`: A pandas or polars dataframe.
1332+
# Returns
1333+
`DataFrame`: A pandas dataframe with the transformed data.
1334+
# Raises
1335+
`FeatureStoreException`: If any of the features mentioned in the transformation function is not present in the Feature View.
1336+
"""
1337+
udf = hopsworks_udf.get_udf(online=False)
1338+
if isinstance(dataframe, pd.DataFrame):
1339+
if len(hopsworks_udf.return_types) > 1:
1340+
dataframe[hopsworks_udf.output_column_names] = dataframe.apply(
1341+
lambda x: udf(*x[hopsworks_udf.transformation_features]),
1342+
axis=1,
1343+
result_type="expand",
1344+
)
1345+
else:
1346+
dataframe[hopsworks_udf.output_column_names[0]] = dataframe.apply(
1347+
lambda x: udf(*x[hopsworks_udf.transformation_features]),
1348+
axis=1,
1349+
result_type="expand",
1350+
)
1351+
if hopsworks_udf.output_column_names[0] in dataframe.columns:
1352+
# Overwriting features so reordering dataframe to move overwritten column to the end of the dataframe
1353+
cols = dataframe.columns.tolist()
1354+
cols.append(
1355+
cols.pop(cols.index(hopsworks_udf.output_column_names[0]))
1356+
)
1357+
dataframe = dataframe[cols]
1358+
else:
1359+
# Dynamically creating lambda function so that we do not need to loop though to extract features required for the udf.
1360+
# This is done because polars 'map_rows' provides rows as tuples to the udf.
1361+
transformation_features = ", ".join(
13031362
[
1304-
dataset.reset_index(drop=True),
1305-
tf.hopsworks_udf.get_udf()(
1306-
*(
1307-
[
1308-
dataset[feature]
1309-
for feature in tf.hopsworks_udf.transformation_features
1310-
]
1363+
f"x[{dataframe.columns.index(feature)}]"
1364+
for feature in hopsworks_udf.transformation_features
1365+
]
1366+
)
1367+
feature_mapping_wrapper = eval(
1368+
f"lambda x: udf({transformation_features})", locals()
1369+
)
1370+
transformed_features = dataframe.map_rows(feature_mapping_wrapper)
1371+
dataframe = dataframe.with_columns(
1372+
transformed_features.rename(
1373+
dict(
1374+
zip(
1375+
transformed_features.columns,
1376+
hopsworks_udf.output_column_names,
13111377
)
1312-
).reset_index(drop=True),
1313-
],
1314-
axis=1,
1378+
)
1379+
)
13151380
)
1316-
dataset = dataset.drop(dropped_features, axis=1)
1317-
return dataset
1381+
return dataframe
1382+
1383+
def _apply_pandas_udf(
1384+
self,
1385+
hopsworks_udf: HopsworksUdf,
1386+
dataframe: Union[pd.DataFrame, pl.DataFrame],
1387+
) -> Union[pd.DataFrame, pl.DataFrame]:
1388+
"""
1389+
Apply a pandas udf to a dataframe
1390+
1391+
# Arguments
1392+
transformation_functions `List[transformation_function.TransformationFunction]` : List of transformation functions.
1393+
dataset `Union[pd.DataFrame, pl.DataFrame]`: A pandas or polars dataframe.
1394+
# Returns
1395+
`DataFrame`: A pandas dataframe with the transformed data.
1396+
# Raises
1397+
`FeatureStoreException`: If any of the features mentioned in the transformation function is not present in the Feature View.
1398+
"""
1399+
if len(hopsworks_udf.return_types) > 1:
1400+
dataframe[hopsworks_udf.output_column_names] = hopsworks_udf.get_udf(
1401+
online=False
1402+
)(
1403+
*(
1404+
[
1405+
dataframe[feature]
1406+
for feature in hopsworks_udf.transformation_features
1407+
]
1408+
)
1409+
)
1410+
else:
1411+
dataframe[hopsworks_udf.output_column_names[0]] = hopsworks_udf.get_udf(
1412+
online=False
1413+
)(
1414+
*(
1415+
[
1416+
dataframe[feature]
1417+
for feature in hopsworks_udf.transformation_features
1418+
]
1419+
)
1420+
)
1421+
if hopsworks_udf.output_column_names[0] in dataframe.columns:
1422+
# Overwriting features so reordering dataframe to move overwritten column to the end of the dataframe
1423+
cols = dataframe.columns.tolist()
1424+
cols.append(cols.pop(cols.index(hopsworks_udf.output_column_names[0])))
1425+
dataframe = dataframe[cols]
1426+
return dataframe
13181427

13191428
@staticmethod
13201429
def get_unique_values(

python/hsfs/engine/spark.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -1382,7 +1382,13 @@ def _apply_transformation_function(
13821382
f"Features {missing_features} specified in the transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please specify the feature required correctly."
13831383
)
13841384
if tf.hopsworks_udf.dropped_features:
1385-
dropped_features.update(tf.hopsworks_udf.dropped_features)
1385+
dropped_features.update(hopsworks_udf.dropped_features)
1386+
1387+
# Add to dropped features if the feature need to overwritten to avoid ambiguous columns.
1388+
if len(hopsworks_udf.return_types) == 1 and (
1389+
hopsworks_udf.function_name == hopsworks_udf.output_column_names[0]
1390+
):
1391+
dropped_features.update(hopsworks_udf.output_column_names)
13861392

13871393
pandas_udf = hopsworks_udf.get_udf()
13881394
output_col_name = hopsworks_udf.output_column_names[0]

0 commit comments

Comments
 (0)