Skip to content

Commit 034b269

Browse files
committed
allowing multiple on-demand features from a single feature group and also supporting passing of context variables in transformation functions
1 parent 587d8fb commit 034b269

11 files changed

+394
-117
lines changed

python/hsfs/core/feature_group_engine.py

+21-9
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from __future__ import annotations
1616

1717
import warnings
18-
from typing import List, Union
18+
from typing import Any, Dict, List, Union
1919

2020
from hsfs import engine, feature, util
2121
from hsfs import feature_group as fg
@@ -49,12 +49,18 @@ def _update_feature_group_schema_on_demand_transformations(
4949
transformed_features = []
5050
dropped_features = []
5151
for tf in feature_group.transformation_functions:
52-
transformed_features.append(
53-
feature.Feature(
54-
tf.hopsworks_udf.output_column_names[0],
55-
tf.hopsworks_udf.return_types[0],
56-
on_demand=True,
57-
)
52+
transformed_features.extend(
53+
[
54+
feature.Feature(
55+
output_column_name,
56+
return_type,
57+
on_demand=True,
58+
)
59+
for output_column_name, return_type in zip(
60+
tf.hopsworks_udf.output_column_names,
61+
tf.hopsworks_udf.return_types,
62+
)
63+
]
5864
)
5965
if tf.hopsworks_udf.dropped_features:
6066
dropped_features.extend(tf.hopsworks_udf.dropped_features)
@@ -141,6 +147,7 @@ def insert(
141147
storage,
142148
write_options,
143149
validation_options: dict = None,
150+
transformation_context: Dict[str, Any] = None,
144151
):
145152
dataframe_features = engine.get_instance().parse_schema_feature_group(
146153
feature_dataframe,
@@ -154,7 +161,9 @@ def insert(
154161
and feature_group.transformation_functions
155162
):
156163
feature_dataframe = engine.get_instance()._apply_transformation_function(
157-
feature_group.transformation_functions, feature_dataframe
164+
feature_group.transformation_functions,
165+
feature_dataframe,
166+
transformation_context=transformation_context,
158167
)
159168

160169
dataframe_features = (
@@ -358,6 +367,7 @@ def insert_stream(
358367
timeout,
359368
checkpoint_dir,
360369
write_options,
370+
transformation_context: Dict[str, Any] = None,
361371
):
362372
if not feature_group.online_enabled and not feature_group.stream:
363373
raise exceptions.FeatureStoreException(
@@ -376,7 +386,9 @@ def insert_stream(
376386

377387
if feature_group.transformation_functions:
378388
dataframe = engine.get_instance()._apply_transformation_function(
379-
feature_group.transformation_functions, dataframe
389+
feature_group.transformation_functions,
390+
dataframe,
391+
transformation_context=transformation_context,
380392
)
381393

382394
util.validate_embedding_feature_type(

python/hsfs/core/feature_view_engine.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -913,6 +913,7 @@ def get_batch_data(
913913
inference_helper_columns=False,
914914
dataframe_type="default",
915915
transformed=True,
916+
transformation_context: Dict[str, Any] = None,
916917
):
917918
self._check_feature_group_accessibility(feature_view_obj)
918919

@@ -936,7 +937,9 @@ def get_batch_data(
936937
).read(read_options=read_options, dataframe_type=dataframe_type)
937938
if transformation_functions and transformed:
938939
return engine.get_instance()._apply_transformation_function(
939-
transformation_functions, dataset=feature_dataframe
940+
transformation_functions,
941+
dataset=feature_dataframe,
942+
transformation_context=transformation_context,
940943
)
941944
else:
942945
return feature_dataframe

0 commit comments

Comments
 (0)