Skip to content

Commit faa17a5

Browse files
authored
[FSTORE-1424] Feature logging for spark (logicalclocks#242)
* fix untransformed features fix feature type use transformed features feature logging spark * run materialzation job * fix * refactor feature logging property * fix feature logging getter * fix feature logging getter * fix fv loggingEnabled * fix fv log * ruff fix * fix test * fix parse_schema_feature_group * fix copy polars df * fix copy polars df * address comments * style
1 parent b510e30 commit faa17a5

7 files changed

+226
-99
lines changed

python/hsfs/core/feature_group_engine.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ def insert(
128128
validation_options: dict = None,
129129
):
130130
dataframe_features = engine.get_instance().parse_schema_feature_group(
131-
feature_dataframe, feature_group.time_travel_format
131+
feature_dataframe, feature_group.time_travel_format, features=feature_group.features
132132
)
133133
dataframe_features = (
134134
self._update_feature_group_schema_on_demand_transformations(

python/hsfs/core/feature_logging.py

+15-4
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ def from_response_json(cls, json_dict: Dict[str, Any]) -> "FeatureLogging":
2121
from hsfs.feature_group import FeatureGroup # avoid circular import
2222

2323
json_decamelized = humps.decamelize(json_dict)
24-
transformed_features = json_decamelized.get("transformed_log")
25-
untransformed_features = json_decamelized.get("untransformed_log")
24+
transformed_features = json_decamelized.get("transformed_log_fg")
25+
untransformed_features = json_decamelized.get("untransformed_log_fg")
2626
if transformed_features:
2727
transformed_features = FeatureGroup.from_response_json(transformed_features)
2828
if untransformed_features:
@@ -33,6 +33,11 @@ def from_response_json(cls, json_dict: Dict[str, Any]) -> "FeatureLogging":
3333
json_decamelized.get("id"), transformed_features, untransformed_features
3434
)
3535

36+
def update(self, others):
37+
self._transformed_features = others.transformed_features
38+
self._untransformed_features = others.untransformed_features
39+
return self
40+
3641
@property
3742
def transformed_features(self) -> "feature_group.FeatureGroup":
3843
return self._transformed_features
@@ -41,15 +46,21 @@ def transformed_features(self) -> "feature_group.FeatureGroup":
4146
def untransformed_features(self) -> "feature_group.FeatureGroup":
4247
return self._untransformed_features
4348

49+
def get_feature_group(self, transformed):
50+
if transformed:
51+
return self._transformed_features
52+
else:
53+
return self._untransformed_features
54+
4455
@property
4556
def id(self) -> str:
4657
return self._id
4758

4859
def to_dict(self):
4960
return {
5061
"id": self._id,
51-
"transformed_log": self._transformed_features,
52-
"untransformed_log": self._untransformed_features,
62+
"transformed_log_fg": self._transformed_features,
63+
"untransformed_log_fg": self._untransformed_features,
5364
}
5465

5566
def json(self) -> Dict[str, Any]:

python/hsfs/core/feature_view_engine.py

+45-33
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
import datetime
1919
import warnings
20-
from typing import Dict, List, Optional, Union
20+
from typing import Any, Dict, List, Optional, TypeVar, Union
2121

22+
import numpy as np
23+
import pandas as pd
2224
from hsfs import (
2325
client,
2426
engine,
@@ -40,6 +42,7 @@
4042
training_dataset_engine,
4143
)
4244
from hsfs.core.feature_logging import FeatureLogging
45+
from hsfs.feature_view import FeatureView
4346
from hsfs.training_dataset_split import TrainingDatasetSplit
4447

4548

@@ -967,39 +970,46 @@ def get_feature_logging(self, fv):
967970
)
968971

969972
def _get_logging_fg(self, fv, transformed):
970-
feature_logging = self.get_feature_logging(fv)
971-
if transformed:
972-
return feature_logging.transformed_features
973-
else:
974-
return feature_logging.untransformed_features
973+
return self.get_feature_logging(fv).get_feature_group(transformed)
975974

976975
def log_features(
977976
self,
978-
fv,
979-
features,
980-
prediction=None,
981-
transformed=False,
982-
write_options=None,
983-
training_dataset_version=None,
977+
fv: FeatureView,
978+
feature_logging: FeatureLogging,
979+
features_rows: Union[
980+
pd.DataFrame, list[list], np.ndarray, TypeVar("pyspark.sql.DataFrame")
981+
],
982+
predictions: Optional[Union[pd.DataFrame, list[list], np.ndarray]] = None,
983+
transformed: Optional[bool] = False,
984+
write_options: Optional[Dict[str, Any]] = None,
985+
training_dataset_version: Optional[int] = None,
984986
hsml_model=None,
985987
):
986988
default_write_options = {
987989
"start_offline_materialization": False,
988990
}
989991
if write_options:
990992
default_write_options.update(write_options)
991-
fg = self._get_logging_fg(fv, transformed)
993+
fg = feature_logging.get_feature_group(transformed)
994+
td_predictions = [feature for feature in fv.features if feature.label]
995+
td_predictions_names = set([feature.name for feature in td_predictions])
996+
if transformed:
997+
td_features = [feature_name for feature_name in fv.transformed_features if feature_name not in td_predictions_names]
998+
else:
999+
td_features = [feature.name for feature in
1000+
fv.features if
1001+
feature.name not in td_predictions_names]
9921002
df = engine.get_instance().get_feature_logging_df(
993-
fg,
994-
features,
995-
[feature for feature in fv.features if not feature.label],
996-
[feature for feature in fv.features if feature.label],
997-
FeatureViewEngine._LOG_TD_VERSION,
998-
FeatureViewEngine._LOG_TIME,
999-
FeatureViewEngine._HSML_MODEL,
1000-
prediction,
1001-
training_dataset_version,
1002-
hsml_model,
1003+
features_rows,
1004+
fg=fg,
1005+
td_features=td_features,
1006+
td_predictions=td_predictions,
1007+
td_col_name=FeatureViewEngine._LOG_TD_VERSION,
1008+
time_col_name=FeatureViewEngine._LOG_TIME,
1009+
model_col_name=FeatureViewEngine._HSML_MODEL,
1010+
predictions=predictions,
1011+
training_dataset_version=training_dataset_version,
1012+
hsml_model=hsml_model,
10031013
)
10041014
return fg.insert(df, write_options=default_write_options)
10051015

@@ -1038,9 +1048,7 @@ def read_feature_logs(
10381048
query = query.filter(
10391049
self._convert_to_log_fg_filter(fg, fv, filter, fv_feat_name_map)
10401050
)
1041-
df = query.read()
1042-
df = df.drop(["log_id", FeatureViewEngine._LOG_TIME], axis=1)
1043-
return df
1051+
return engine.get_instance().read_feature_log(query)
10441052

10451053
@staticmethod
10461054
def get_hsml_model_value(hsml_model):
@@ -1099,15 +1107,19 @@ def pause_logging(self, fv):
10991107
def resume_logging(self, fv):
11001108
self._feature_view_api.resume_feature_logging(fv.name, fv.version)
11011109

1102-
def materialize_feature_logs(self, fv, wait):
1103-
jobs = self._feature_view_api.materialize_feature_logging(fv.name, fv.version)
1110+
def materialize_feature_logs(self, fv, wait, transform):
1111+
if transform is None:
1112+
jobs = [self._get_logging_fg(fv, True).materialization_job,
1113+
self._get_logging_fg(fv, False).materialization_job]
1114+
else:
1115+
jobs = [self._get_logging_fg(fv, transform).materialization_job]
1116+
for job in jobs:
1117+
job.run(await_termination=False)
11041118
if wait:
11051119
for job in jobs:
1106-
try:
1107-
job._wait_for_job(wait)
1108-
except Exception:
1109-
pass
1120+
job._wait_for_job(wait)
11101121
return jobs
11111122

1112-
def delete_feature_logs(self, fv, transformed):
1123+
def delete_feature_logs(self, fv, feature_logging, transformed):
11131124
self._feature_view_api.delete_feature_logs(fv.name, fv.version, transformed)
1125+
feature_logging.update(self.get_feature_logging(fv))

python/hsfs/engine/python.py

+43-32
Original file line numberDiff line numberDiff line change
@@ -734,20 +734,28 @@ def parse_schema_feature_group(
734734
self,
735735
dataframe: Union[pd.DataFrame, pl.DataFrame],
736736
time_travel_format: Optional[str] = None,
737+
features: Optional[List[feature.Feature]] = None,
737738
) -> List[feature.Feature]:
739+
feature_type_map = {}
740+
if features:
741+
for _feature in features:
742+
feature_type_map[_feature.name] = _feature.type
738743
if isinstance(dataframe, pd.DataFrame):
739744
arrow_schema = pa.Schema.from_pandas(dataframe, preserve_index=False)
740745
elif isinstance(dataframe, pl.DataFrame) or isinstance(
741746
dataframe, pl.dataframe.frame.DataFrame
742747
):
743748
arrow_schema = dataframe.to_arrow().schema
744749
features = []
745-
for feat_name in arrow_schema.names:
750+
for i in range(len(arrow_schema.names)):
751+
feat_name = arrow_schema.names[i]
746752
name = util.autofix_feature_name(feat_name)
747753
try:
748-
converted_type = convert_pandas_dtype_to_offline_type(
749-
arrow_schema.field(feat_name).type
750-
)
754+
pd_type = arrow_schema.field(feat_name).type
755+
if pa.types.is_null(pd_type) and feature_type_map.get(name):
756+
converted_type = feature_type_map.get(name)
757+
else:
758+
converted_type = convert_pandas_dtype_to_offline_type(pd_type)
751759
except ValueError as e:
752760
raise FeatureStoreException(f"Feature '{name}': {str(e)}") from e
753761
features.append(feature.Feature(name, converted_type))
@@ -1422,7 +1430,7 @@ def _start_offline_materialization(offline_write_options: Dict[str, Any]) -> boo
14221430
return True
14231431

14241432
@staticmethod
1425-
def _convert_feature_log_to_df(feature_log, cols):
1433+
def _convert_feature_log_to_df(feature_log, cols) -> pd.DataFrame:
14261434
if feature_log is None and cols:
14271435
return pd.DataFrame(columns=cols)
14281436
if not (
@@ -1442,40 +1450,40 @@ def _convert_feature_log_to_df(feature_log, cols):
14421450

14431451
return pd.DataFrame(feature_log, columns=cols)
14441452
else:
1445-
return feature_log.copy(deep=False)
1453+
if isinstance(feature_log, pl.DataFrame):
1454+
return feature_log.clone().to_pandas()
1455+
elif isinstance(feature_log, pd.DataFrame):
1456+
return feature_log.copy(deep=False)
14461457

14471458
@staticmethod
14481459
def get_feature_logging_df(
1449-
fg,
1450-
features,
1451-
fg_features: List[TrainingDatasetFeature],
1452-
td_predictions: List[TrainingDatasetFeature],
1453-
td_col_name,
1454-
time_col_name,
1455-
model_col_name,
1456-
prediction=None,
1457-
training_dataset_version=None,
1460+
features: Union[pd.DataFrame, list[list], np.ndarray],
1461+
fg: FeatureGroup = None,
1462+
td_features: List[str] = None,
1463+
td_predictions: List[TrainingDatasetFeature] = None,
1464+
td_col_name: Optional[str] = None,
1465+
time_col_name: Optional[str] = None,
1466+
model_col_name: Optional[str] = None,
1467+
predictions: Optional[Union[pd.DataFrame, list[list], np.ndarray]] = None,
1468+
training_dataset_version: Optional[int] = None,
14581469
hsml_model=None,
14591470
) -> pd.DataFrame:
1460-
import uuid
1461-
14621471
features = Engine._convert_feature_log_to_df(
1463-
features, [f.name for f in fg_features]
1472+
features, td_features
14641473
)
14651474
if td_predictions:
1466-
prediction = Engine._convert_feature_log_to_df(
1467-
prediction, [f.name for f in td_predictions]
1475+
predictions = Engine._convert_feature_log_to_df(
1476+
predictions, [f.name for f in td_predictions]
14681477
)
14691478
for f in td_predictions:
1470-
prediction[f.name] = Engine._cast_column_to_offline_type(
1471-
prediction[f.name], f.type
1479+
predictions[f.name] = cast_column_to_offline_type(
1480+
predictions[f.name], f.type
14721481
)
1473-
if not set(prediction.columns).intersection(set(features.columns)):
1474-
features = pd.concat([features, prediction], axis=1)
1475-
# need to case the column type as if it is None, type cannot be inferred.
1476-
features[td_col_name] = Engine._cast_column_to_offline_type(
1477-
pd.Series([training_dataset_version for _ in range(len(features))]),
1478-
fg.get_feature(td_col_name).type,
1482+
if not set(predictions.columns).intersection(set(features.columns)):
1483+
features = pd.concat([features, predictions], axis=1)
1484+
1485+
features[td_col_name] = pd.Series(
1486+
[training_dataset_version for _ in range(len(features))]
14791487
)
14801488
# _cast_column_to_offline_type cannot cast string type
14811489
features[model_col_name] = pd.Series(
@@ -1488,9 +1496,12 @@ def get_feature_logging_df(
14881496
dtype=pd.StringDtype(),
14891497
)
14901498
now = datetime.now()
1491-
features[time_col_name] = Engine._cast_column_to_offline_type(
1492-
pd.Series([now for _ in range(len(features))]),
1493-
fg.get_feature(time_col_name).type,
1494-
)
1499+
1500+
features[time_col_name] = pd.Series([now for _ in range(len(features))])
14951501
features["log_id"] = [str(uuid.uuid4()) for _ in range(len(features))]
14961502
return features[[feat.name for feat in fg.features]]
1503+
1504+
@staticmethod
1505+
def read_feature_log(query):
1506+
df = query.read()
1507+
return df.drop(["log_id", FeatureViewEngine._LOG_TIME], axis=1)

python/hsfs/engine/spark.py

+52-3
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@
2121
import os
2222
import re
2323
import shutil
24+
import uuid
2425
import warnings
2526
from datetime import date, datetime, timezone
2627
from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypeVar, Union
2728

29+
from hsfs.core.feature_view_engine import FeatureViewEngine
30+
from hsfs.training_dataset_feature import TrainingDatasetFeature
31+
2832

2933
if TYPE_CHECKING:
3034
import great_expectations
@@ -1150,6 +1154,7 @@ def parse_schema_feature_group(
11501154
self,
11511155
dataframe,
11521156
time_travel_format=None,
1157+
**kwargs,
11531158
):
11541159
features = []
11551160

@@ -1477,11 +1482,55 @@ def cast_columns(df, schema, online=False):
14771482
def is_connector_type_supported(type):
14781483
return True
14791484

1480-
@staticmethod
1481-
def get_feature_logging_df(features, prediction=None):
1485+
def get_feature_logging_df(
1486+
self,
1487+
features: Union[
1488+
pd.DataFrame, list[list], np.ndarray, TypeVar("pyspark.sql.DataFrame")
1489+
],
1490+
fg: fg_mod.FeatureGroup = None,
1491+
td_features: List[str] = None,
1492+
td_predictions: List[TrainingDatasetFeature] = None,
1493+
td_col_name: Optional[str] = None,
1494+
time_col_name: Optional[str] = None,
1495+
model_col_name: Optional[str] = None,
1496+
predictions: Optional[Union[pd.DataFrame, list[list], np.ndarray]] = None,
1497+
training_dataset_version: Optional[int] = None,
1498+
hsml_model=None,
1499+
**kwargs,
1500+
):
14821501
# do not take prediction separately because spark ml framework usually return feature together with the prediction
14831502
# and it is costly to join them back
1484-
return features
1503+
df = self.convert_to_default_dataframe(features)
1504+
if td_predictions:
1505+
for f in td_predictions:
1506+
if f.name not in df.columns:
1507+
df = df.withColumn(
1508+
f.name,
1509+
lit(None).cast(
1510+
Engine._convert_offline_type_to_spark_type(f.type)
1511+
),
1512+
)
1513+
1514+
uuid_udf = udf(lambda: str(uuid.uuid4()), StringType())
1515+
1516+
# Add new columns to the DataFrame
1517+
df = df.withColumn(td_col_name, lit(training_dataset_version).cast(LongType()))
1518+
if hsml_model is not None:
1519+
hsml_str = FeatureViewEngine.get_hsml_model_value(hsml_model)
1520+
else:
1521+
hsml_str = None
1522+
df = df.withColumn(model_col_name, lit(hsml_str).cast(StringType()))
1523+
now = datetime.now()
1524+
df = df.withColumn(time_col_name, lit(now).cast(TimestampType()))
1525+
df = df.withColumn("log_id", uuid_udf())
1526+
1527+
# Select the required columns
1528+
return df.select(*[feat.name for feat in fg.features])
1529+
1530+
@staticmethod
1531+
def read_feature_log(query):
1532+
df = query.read()
1533+
return df.drop("log_id", FeatureViewEngine._LOG_TIME)
14851534

14861535

14871536
class SchemaError(Exception):

0 commit comments

Comments
 (0)