Skip to content

Commit ace0ce6

Browse files
committed
allowing insertion of features without application of on-demand transformation functions into a feature group
1 parent 3cd3c35 commit ace0ce6

File tree

3 files changed

+16
-2
lines changed

3 files changed

+16
-2
lines changed

python/hsfs/core/feature_group_engine.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ def insert(
148148
write_options,
149149
validation_options: dict = None,
150150
transformation_context: Dict[str, Any] = None,
151+
transform: bool = True,
151152
):
152153
dataframe_features = engine.get_instance().parse_schema_feature_group(
153154
feature_dataframe,
@@ -159,6 +160,7 @@ def insert(
159160
if (
160161
not isinstance(feature_group, fg.ExternalFeatureGroup)
161162
and feature_group.transformation_functions
163+
and transform
162164
):
163165
feature_dataframe = engine.get_instance()._apply_transformation_function(
164166
feature_group.transformation_functions,
@@ -368,6 +370,7 @@ def insert_stream(
368370
checkpoint_dir,
369371
write_options,
370372
transformation_context: Dict[str, Any] = None,
373+
transform: bool = True,
371374
):
372375
if not feature_group.online_enabled and not feature_group.stream:
373376
raise exceptions.FeatureStoreException(
@@ -384,7 +387,7 @@ def insert_stream(
384387
)
385388
)
386389

387-
if feature_group.transformation_functions:
390+
if feature_group.transformation_functions and transform:
388391
dataframe = engine.get_instance()._apply_transformation_function(
389392
feature_group.transformation_functions,
390393
dataframe,

python/hsfs/feature_group.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -2824,6 +2824,7 @@ def insert(
28242824
validation_options: Optional[Dict[str, Any]] = None,
28252825
wait: bool = False,
28262826
transformation_context: Dict[str, Any] = None,
2827+
transform: bool = True,
28272828
) -> Tuple[Optional[Job], Optional[ValidationReport]]:
28282829
"""Persist the metadata and materialize the feature group to the feature store
28292830
or insert data from a dataframe into the existing feature group.
@@ -2932,6 +2933,7 @@ def insert(
29322933
Shortcut for read_options `{"wait_for_job": False}`.
29332934
transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime.
29342935
These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`.
2936+
transform: `bool`. When set to `False`, the dataframe is inserted without applying any on-demand transformations. In this case, all required on-demand features must already exist in the provided dataframe. Defaults to `True`.
29352937
29362938
# Returns
29372939
(`Job`, `ValidationReport`) A tuple with job information if python engine is used and the validation report if validation is enabled.
@@ -2968,6 +2970,7 @@ def insert(
29682970
write_options=write_options,
29692971
validation_options={"save_report": True, **validation_options},
29702972
transformation_context=transformation_context,
2973+
transform=transform,
29712974
)
29722975

29732976
if engine.get_type().startswith("spark") and not self.stream:
@@ -2998,6 +3001,7 @@ def multi_part_insert(
29983001
write_options: Optional[Dict[str, Any]] = None,
29993002
validation_options: Optional[Dict[str, Any]] = None,
30003003
transformation_context: Dict[str, Any] = None,
3004+
transform: bool = True,
30013005
) -> Union[
30023006
Tuple[Optional[Job], Optional[ValidationReport]],
30033007
feature_group_writer.FeatureGroupWriter,
@@ -3098,6 +3102,7 @@ def multi_part_insert(
30983102
to control whether the expectation suite of the feature group should be fetched before every insert.
30993103
transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime.
31003104
These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`.
3105+
transform: `bool`. When set to `False`, the dataframe is inserted without applying any on-demand transformations. In this case, all required on-demand features must already exist in the provided dataframe. Defaults to `True`.
31013106
31023107
# Returns
31033108
(`Job`, `ValidationReport`) A tuple with job information if python engine is used and the validation report if validation is enabled.
@@ -3117,6 +3122,7 @@ def multi_part_insert(
31173122
write_options or {},
31183123
validation_options or {},
31193124
transformation_context,
3125+
transform=transform,
31203126
)
31213127

31223128
def finalize_multi_part_insert(self) -> None:
@@ -3160,6 +3166,7 @@ def insert_stream(
31603166
checkpoint_dir: Optional[str] = None,
31613167
write_options: Optional[Dict[str, Any]] = None,
31623168
transformation_context: Dict[str, Any] = None,
3169+
transform: bool = True,
31633170
) -> TypeVar("StreamingQuery"):
31643171
"""Ingest a Spark Structured Streaming Dataframe to the online feature store.
31653172
@@ -3215,6 +3222,7 @@ def insert_stream(
32153222
Defaults to `{}`.
32163223
transformation_context: `Dict[str, Any]` A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime.
32173224
These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. If no context variables are provided, this parameter defaults to `None`.
3225+
transform: `bool`. When set to `False`, the dataframe is inserted without applying any on-demand transformations. In this case, all required on-demand features must already exist in the provided dataframe. Defaults to `True`.
32183226
32193227
# Returns
32203228
`StreamingQuery`: Spark Structured Streaming Query object.
@@ -3249,7 +3257,8 @@ def insert_stream(
32493257
timeout,
32503258
checkpoint_dir,
32513259
write_options or {},
3252-
transformation_context,
3260+
transformation_context=transformation_context,
3261+
transform=transform,
32533262
)
32543263

32553264
def commit_details(

python/hsfs/feature_group_writer.py

+2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def insert(
4949
write_options: Optional[Dict[str, Any]] = None,
5050
validation_options: Optional[Dict[str, Any]] = None,
5151
transformation_context: Dict[str, Any] = None,
52+
transform: bool = True,
5253
) -> Tuple[Optional[Job], Optional[ValidationReport]]:
5354
if validation_options is None:
5455
validation_options = {}
@@ -62,6 +63,7 @@ def insert(
6263
write_options={"start_offline_materialization": False, **write_options},
6364
validation_options={"fetch_expectation_suite": False, **validation_options},
6465
transformation_context=transformation_context,
66+
transform=transform,
6567
)
6668

6769
def __exit__(self, exc_type, exc_value, exc_tb):

0 commit comments

Comments
 (0)