Skip to content

Commit 8d52d52

Browse files
committed
adding soft fail for dataframes, and adding extended type
1 parent ce8d1fe commit 8d52d52

File tree

4 files changed

+66
-6
lines changed

4 files changed

+66
-6
lines changed

python/hopsworks_common/core/type_systems.py

+32-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import ast
2020
import datetime
2121
import decimal
22-
from typing import TYPE_CHECKING, Literal, Union
22+
from typing import TYPE_CHECKING, Literal, NewType, Union
2323

2424
import pytz
2525
from hopsworks_common.core.constants import (
@@ -132,6 +132,37 @@
132132
}
133133

134134

135+
def create_extended_type(base_type: type) -> "ExtendedType":
136+
"""
137+
This is wrapper function to create a new class that extends the base_type class with a new attribute that can be used to store metadata.
138+
139+
Args:
140+
base_type : The base class to extend
141+
"""
142+
143+
class ExtendedType(base_type):
144+
"""
145+
This is a class that extends the base_type class with a new attribute `hopsworks_meta_data` that can be used to store metadata.
146+
"""
147+
148+
@property
149+
def hopsworks_meta_data(self):
150+
if not hasattr(self, "_hopsworks_meta_data"):
151+
return None
152+
return self._hopsworks_meta_data
153+
154+
@hopsworks_meta_data.setter
155+
def hopsworks_meta_data(self, meta_data: dict):
156+
self._hopsworks_meta_data = meta_data
157+
158+
return ExtendedType
159+
160+
161+
ExtendedType = NewType(
162+
"ExtendedType", create_extended_type(type)
163+
) # Adding new type for type hinting and static analysis.
164+
165+
135166
def convert_pandas_dtype_to_offline_type(arrow_type: str) -> str:
136167
# This is a simple type conversion between pandas dtypes and pyspark (hive) types,
137168
# using pyarrow types obatined from pandas dataframe to convert pandas typed fields,

python/hsfs/core/feature_view_engine.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class FeatureViewEngine:
5757

5858
_LOG_TD_VERSION = "td_version"
5959
_LOG_TIME = "log_time"
60-
_HSML_MODEL = "hsml_model"
60+
_HSML_MODEL = "hopsworks_model"
6161

6262
def __init__(self, feature_store_id):
6363
self._feature_store_id = feature_store_id

python/hsfs/core/vector_server.py

+9-4
Original file line numberDiff line numberDiff line change
@@ -1308,8 +1308,13 @@ def apply_transformation(
13081308
):
13091309
"""
13101310
Function that applies both on-demand and model dependent transformation to the input dictonary
1311+
1312+
Returns:
1313+
feature_vector: The untransformed feature vector with untransformed features.
1314+
encoded_feature_dict: The transformed feature vector with transformed features.
13111315
"""
1312-
encoded_feature_dict = row_dict
1316+
feature_dict = row_dict
1317+
encoded_feature_dict = None
13131318

13141319
if transform or on_demand_features:
13151320
# Check for any missing request parameters
@@ -1318,17 +1323,17 @@ def apply_transformation(
13181323
)
13191324

13201325
# Apply on-demand transformations
1321-
encoded_feature_dict = self.apply_on_demand_transformations(
1326+
feature_dict = self.apply_on_demand_transformations(
13221327
row_dict, request_parameter, transformation_context
13231328
)
13241329

13251330
if transform:
13261331
# Apply model dependent transformations
13271332
encoded_feature_dict = self.apply_model_dependent_transformations(
1328-
encoded_feature_dict, transformation_context
1333+
feature_dict, transformation_context
13291334
)
13301335

1331-
return encoded_feature_dict
1336+
return feature_dict, encoded_feature_dict
13321337

13331338
def apply_return_value_handlers(
13341339
self, row_dict: Dict[str, Any], client: Literal["rest", "sql"]

python/hsfs/engine/python.py

+24
Original file line numberDiff line numberDiff line change
@@ -1707,6 +1707,8 @@ def get_feature_logging_df(
17071707
predictions[f.name] = cast_column_to_offline_type(
17081708
predictions[f.name], f.type
17091709
)
1710+
# Addin the prefix prediction_ to the prediction columns
1711+
predictions = predictions.add_prefix("predicted_")
17101712
if not set(predictions.columns).intersection(set(features.columns)):
17111713
features = pd.concat([features, predictions], axis=1)
17121714

@@ -1723,6 +1725,28 @@ def get_feature_logging_df(
17231725
features[k] = pd.Series(v)
17241726
# _cast_column_to_offline_type cannot cast string type
17251727
features[model_col_name] = features[model_col_name].astype(pd.StringDtype())
1728+
1729+
logging_feature_group_features = [feat.name for feat in fg.features]
1730+
missing_logging_features = set(logging_feature_group_features).difference(
1731+
set(features.columns)
1732+
)
1733+
additional_logging_features = set(features.columns).difference(
1734+
set(logging_feature_group_features)
1735+
)
1736+
1737+
if additional_logging_features:
1738+
_logger.info(
1739+
f"The following columns : `{'`, `'.join(additional_logging_features)}` are additional columns in the logged {'untransformed' if 'untransformed' in fg.name else 'transformed'} dataframe and is not present in the logging feature groups. They will be ignored."
1740+
)
1741+
1742+
if missing_logging_features:
1743+
_logger.info(
1744+
f"The following columns : `{'`, `'.join(missing_logging_features)}` are missing in the logged {'untransformed' if 'untransformed' in fg.name else 'transformed'} dataframe. Setting them to None."
1745+
)
1746+
# Set missing columns to None
1747+
for col in missing_logging_features:
1748+
features[col] = None
1749+
17261750
return features[[feat.name for feat in fg.features]]
17271751

17281752
@staticmethod

0 commit comments

Comments
 (0)