Skip to content

Commit 7ae1176

Browse files
committed
Merge branch 'the-merge-python-hsfs' into the-merge-complete
2 parents 6280332 + 1848cd8 commit 7ae1176

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+5019
-3594
lines changed

hsfs/utils/java/src/main/java/com/logicalclocks/utils/MainClass.java

+4
Original file line numberDiff line numberDiff line change
@@ -118,5 +118,9 @@ public static void main(String[] args) throws Exception {
118118
if (op.equals("offline_fg_materialization") || op.equals("offline_fg_backfill")) {
119119
SparkEngine.getInstance().streamToHudiTable(streamFeatureGroup, writeOptions);
120120
}
121+
122+
LOGGER.info("Closing spark session...");
123+
SparkEngine.getInstance().closeSparkSession();
124+
System.exit(0);
121125
}
122126
}

hsfs/utils/python/hsfs_utils.py

+3
Original file line numberDiff line numberDiff line change
@@ -292,3 +292,6 @@ def parse_isoformat_date(da: str) -> datetime:
292292
import_fg(job_conf)
293293
elif args.op == "run_feature_monitoring":
294294
run_feature_monitoring(job_conf)
295+
296+
if spark is not None:
297+
spark.stop()

java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -1097,5 +1097,10 @@ protected String makeQueryName(String queryName, FeatureGroupBase featureGroup)
10971097
}
10981098
return queryName;
10991099
}
1100-
1100+
1101+
public void closeSparkSession() {
1102+
if (getSparkSession() != null) {
1103+
getSparkSession().stop();
1104+
}
1105+
}
11011106
}
+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#
2+
# Copyright 2024 Hopsworks AB
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
import numpy as np
18+
import pandas as pd
19+
from hsfs.hopsworks_udf import udf
20+
from hsfs.transformation_statistics import TransformationStatistics
21+
22+
23+
feature_statistics = TransformationStatistics("feature")
24+
25+
26+
@udf(float)
27+
def min_max_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
28+
return (feature - statistics.feature.min) / (
29+
statistics.feature.max - statistics.feature.min
30+
)
31+
32+
33+
@udf(float)
34+
def standard_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
35+
return (feature - statistics.feature.mean) / statistics.feature.stddev
36+
37+
38+
@udf(float)
39+
def robust_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
40+
return (feature - statistics.feature.percentiles[49]) / (
41+
statistics.feature.percentiles[74] - statistics.feature.percentiles[24]
42+
)
43+
44+
45+
@udf(int)
46+
def label_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
47+
unique_data = sorted(
48+
[value for value in statistics.feature.extended_statistics["unique_values"]]
49+
)
50+
value_to_index = {value: index for index, value in enumerate(unique_data)}
51+
return pd.Series(
52+
[value_to_index[data] if not pd.isna(data) else np.nan for data in feature]
53+
)
54+
55+
56+
@udf(bool)
57+
def one_hot_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
58+
unique_data = [
59+
value for value in statistics.feature.extended_statistics["unique_values"]
60+
]
61+
one_hot = pd.get_dummies(feature, dtype="bool")
62+
for data in unique_data:
63+
if data not in one_hot:
64+
one_hot[data] = False
65+
# Sorting by columns so as to maintain consistency in column order.
66+
return one_hot.reindex(sorted(one_hot.columns), axis=1)

python/hsfs/constructor/join.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def __init__(
4747
self._on = util.parse_features(on)
4848
self._left_on = util.parse_features(left_on)
4949
self._right_on = util.parse_features(right_on)
50-
self._join_type = join_type or self.INNER
50+
self._join_type = join_type or self.LEFT
5151
self._prefix = prefix
5252

5353
def to_dict(self) -> Dict[str, Any]:

python/hsfs/constructor/query.py

+15-3
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def __init__(
5959
fg_mod.ExternalFeatureGroup,
6060
fg_mod.SpineGroup,
6161
],
62-
left_features: List[Union[str, "Feature"]],
62+
left_features: List[Union[str, "Feature", Dict]],
6363
feature_store_name: Optional[str] = None,
6464
feature_store_id: Optional[int] = None,
6565
left_feature_group_start_time: Optional[Union[str, int, date, datetime]] = None,
@@ -239,7 +239,7 @@ def join(
239239
on: Optional[List[str]] = None,
240240
left_on: Optional[List[str]] = None,
241241
right_on: Optional[List[str]] = None,
242-
join_type: Optional[str] = "inner",
242+
join_type: Optional[str] = "left",
243243
prefix: Optional[str] = None,
244244
) -> "Query":
245245
"""Join Query with another Query.
@@ -769,7 +769,7 @@ def featuregroups(
769769
"""List of feature groups used in the query"""
770770
featuregroups = {self._left_feature_group}
771771
for join_obj in self.joins:
772-
featuregroups.add(join_obj.query._left_feature_group)
772+
self._fg_rec_add(join_obj, featuregroups)
773773
return list(featuregroups)
774774

775775
@property
@@ -809,6 +809,18 @@ def get_feature(self, feature_name: str) -> "Feature":
809809
"""
810810
return self._get_feature_by_name(feature_name)[0]
811811

812+
def _fg_rec_add(self, join_object, featuregroups):
813+
"""
814+
Recursively get a feature groups from nested join and add to featuregroups list.
815+
816+
# Arguments
817+
join_object: `Join object`.
818+
"""
819+
if len(join_object.query.joins) > 0:
820+
for nested_join in join_object.query.joins:
821+
self._fg_rec_add(nested_join, featuregroups)
822+
featuregroups.add(join_object.query._left_feature_group)
823+
812824
def __getattr__(self, name: str) -> Any:
813825
try:
814826
return self.__getitem__(name)

python/hsfs/core/constants.py

+13
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,19 @@
11
import importlib.util
22

33

4+
# Avro
5+
HAS_FAST_AVRO: bool = importlib.util.find_spec("fastavro") is not None
6+
HAS_AVRO: bool = importlib.util.find_spec("avro") is not None
7+
8+
# Confluent Kafka
9+
HAS_CONFLUENT_KAFKA: bool = importlib.util.find_spec("confluent_kafka") is not None
10+
confluent_kafka_not_installed_message = (
11+
"Confluent Kafka package not found. "
12+
"If you want to use Kafka with Hopsworks you can install the corresponding extras "
13+
"""`pip install hopsworks[python]` or `pip install "hopsworks[python]"` if using zsh. """
14+
"You can also install confluent-kafka directly in your environment e.g `pip install confluent-kafka`. "
15+
"You will need to restart your kernel if applicable."
16+
)
417
# Data Validation / Great Expectations
518
HAS_GREAT_EXPECTATIONS: bool = (
619
importlib.util.find_spec("great_expectations") is not None

python/hsfs/core/feature_view_api.py

+51-14
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,7 @@
1717

1818
from typing import List, Optional, Union
1919

20-
from hsfs import (
21-
client,
22-
feature_view,
23-
training_dataset,
24-
transformation_function_attached,
25-
)
20+
from hsfs import client, feature_view, training_dataset, transformation_function
2621
from hsfs.client.exceptions import RestAPIError
2722
from hsfs.constructor import query, serving_prepared_statement
2823
from hsfs.core import explicit_provenance, job, training_dataset_job_conf
@@ -86,13 +81,28 @@ def update(self, feature_view_obj: feature_view.FeatureView) -> None:
8681
data=feature_view_obj.json(),
8782
)
8883

89-
def get_by_name(self, name: str) -> feature_view.FeatureView:
84+
def get_by_name(self, name: str) -> List[feature_view.FeatureView]:
85+
"""
86+
Get a feature view from the backend using its name.
87+
88+
# Arguments
89+
name `str`: Name of the feature view.
90+
91+
# Returns
92+
`List[FeatureView]`: A list that contains all version of the feature view.
93+
94+
# Raises
95+
`RestAPIError`: If the feature view cannot be found from the backend.
96+
`ValueError`: If the feature group associated with the feature view cannot be found.
97+
"""
9098
path = self._base_path + [name]
9199
try:
92100
return [
93101
feature_view.FeatureView.from_response_json(fv)
94102
for fv in self._client._send_request(
95-
self._GET, path, {"expand": ["query", "features"]}
103+
self._GET,
104+
path,
105+
{"expand": ["query", "features", "transformationfunctions"]},
96106
)["items"]
97107
]
98108
except RestAPIError as e:
@@ -106,11 +116,27 @@ def get_by_name(self, name: str) -> feature_view.FeatureView:
106116
raise e
107117

108118
def get_by_name_version(self, name: str, version: int) -> feature_view.FeatureView:
119+
"""
120+
Get a feature view form the backend using both name and version
121+
122+
# Arguments
123+
name `str`: Name of feature view.
124+
version `version`: Version of the feature view.
125+
126+
# Returns
127+
`FeatureView`
128+
129+
# Raises
130+
`RestAPIError`: If the feature view cannot be found from the backend.
131+
`ValueError`: If the feature group associated with the feature view cannot be found.
132+
"""
109133
path = self._base_path + [name, self._VERSION, version]
110134
try:
111135
return feature_view.FeatureView.from_response_json(
112136
self._client._send_request(
113-
self._GET, path, {"expand": ["query", "features"]}
137+
self._GET,
138+
path,
139+
{"expand": ["query", "features", "transformationfunctions"]},
114140
)
115141
)
116142
except RestAPIError as e:
@@ -190,12 +216,23 @@ def get_serving_prepared_statement(
190216

191217
def get_attached_transformation_fn(
192218
self, name: str, version: int
193-
) -> Union[
194-
"transformation_function_attached.TransformationFunctionAttached",
195-
List["transformation_function_attached.TransformationFunctionAttached"],
196-
]:
219+
) -> List["transformation_function.TransformationFunction"]:
220+
"""
221+
Get transformation functions attached to a feature view form the backend
222+
223+
# Arguments
224+
name `str`: Name of feature view.
225+
version `ìnt`: Version of feature view.
226+
227+
# Returns
228+
`List[TransformationFunction]` : List of transformation functions attached to the feature view.
229+
230+
# Raises
231+
`RestAPIError`: If the feature view cannot be found from the backend.
232+
`ValueError`: If the feature group associated with the feature view cannot be found.
233+
"""
197234
path = self._base_path + [name, self._VERSION, version, self._TRANSFORMATION]
198-
return transformation_function_attached.TransformationFunctionAttached.from_response_json(
235+
return transformation_function.TransformationFunction.from_response_json(
199236
self._client._send_request("GET", path)
200237
)
201238

0 commit comments

Comments
 (0)