Skip to content

Commit 5599291

Browse files
authored
[FSTORE-1672][4.1][APPEND] Allow multiple on-demand features to be returned from an on-demand transformation function and allow passing of local variables to a transformation function (logicalclocks#473)
* adding unknown type that can be used to used to initialize transformation types when a transformation function is not attached to a feature view or a fetaure group * initalizing output column names only if transformation type is not UNDEFINED * handling undefined transformation type during feature view and feature grouo creation * explicitly checking if features is a list or None to handle usecase where features is DataFrame * adding backwards compatiblity to 4.1.x versions of hopsworks * adding backwards compatiblity to 4.1.x versions of hopsworks * fixing tests by mockign connections * fixing unit tests * handling versions with SNAPSHOT, updating comments and removing prints * skipping test that requires Versions above 4.1.6 * addressing review comments * setting transformation function types to be send to the backend only on versions above 4.1.6 * reverting changes * fixing tests
1 parent 5838d82 commit 5599291

7 files changed

+119
-15
lines changed

python/hopsworks_common/connection.py

+21-3
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ def __init__(
142142
self._api_key_file = api_key_file
143143
self._api_key_value = api_key_value
144144
self._connected = False
145+
self._backend_version = None
145146

146147
self.connect()
147148

@@ -299,16 +300,16 @@ def _check_compatibility(self):
299300
regexMatcher = re.compile(versionPattern)
300301

301302
client_version = version.__version__
302-
backend_version = self._variable_api.get_version("hopsworks")
303+
self.backend_version = self._variable_api.get_version("hopsworks")
303304

304305
major_minor_client = regexMatcher.search(client_version).group(0)
305-
major_minor_backend = regexMatcher.search(backend_version).group(0)
306+
major_minor_backend = regexMatcher.search(self._backend_version).group(0)
306307

307308
if major_minor_backend != major_minor_client:
308309
print("\n", file=sys.stderr)
309310
warnings.warn(
310311
"The installed hopsworks client version {0} may not be compatible with the connected Hopsworks backend version {1}. \nTo ensure compatibility please install the latest bug fix release matching the minor version of your backend ({2}) by running 'pip install hopsworks=={2}.*'".format(
311-
client_version, backend_version, major_minor_backend
312+
client_version, self._backend_version, major_minor_backend
312313
),
313314
stacklevel=1,
314315
)
@@ -614,6 +615,23 @@ def api_key_file(self) -> Optional[str]:
614615
def api_key_value(self) -> Optional[str]:
615616
return self._api_key_value
616617

618+
@property
619+
def backend_version(self) -> Optional[str]:
620+
"""
621+
The version of the backend currently connected to hopsworks.
622+
"""
623+
return self._backend_version
624+
625+
@backend_version.setter
626+
def backend_version(self, backend_version: str) -> None:
627+
"""
628+
The version of the backend currently connected to hopsworks.
629+
"""
630+
self._backend_version = backend_version.split("-SNAPSHOT")[
631+
0
632+
].strip() # Strip off the -SNAPSHOT part of the version if it is present.
633+
return self._backend_version
634+
617635
@api_key_file.setter
618636
@not_connected
619637
def api_key_file(self, api_key_file: Optional[str]) -> None:

python/hsfs/feature_group.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -2379,7 +2379,11 @@ def __init__(
23792379
)
23802380
)
23812381
else:
2382-
if not transformation_function.transformation_type:
2382+
if (
2383+
not transformation_function.transformation_type
2384+
or transformation_function.transformation_type
2385+
== TransformationType.UNDEFINED
2386+
):
23832387
transformation_function.transformation_type = (
23842388
TransformationType.ON_DEMAND
23852389
)
@@ -2730,11 +2734,11 @@ def save(
27302734
if (
27312735
(features is None and len(self._features) > 0)
27322736
or (
2733-
isinstance(features, List)
2737+
isinstance(features, list)
27342738
and len(features) > 0
27352739
and all([isinstance(f, feature.Feature) for f in features])
27362740
)
2737-
or (not features and len(self.transformation_functions) > 0)
2741+
or (features is None and len(self.transformation_functions) > 0)
27382742
):
27392743
# This is done for compatibility. Users can specify the feature list in the
27402744
# (get_or_)create_feature_group. Users can also provide the feature list in the save().

python/hsfs/feature_view.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,11 @@ def __init__(
164164
)
165165
)
166166
else:
167-
if not transformation_function.transformation_type:
167+
if (
168+
not transformation_function.transformation_type
169+
or transformation_function.transformation_type
170+
== TransformationType.UNDEFINED
171+
):
168172
transformation_function.transformation_type = (
169173
TransformationType.MODEL_DEPENDENT
170174
)

python/hsfs/hopsworks_udf.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@
2626
from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union
2727

2828
import humps
29+
from hopsworks_common import client
2930
from hopsworks_common.client.exceptions import FeatureStoreException
3031
from hopsworks_common.constants import FEATURES
3132
from hsfs import engine, util
3233
from hsfs.core.feature_descriptive_statistics import FeatureDescriptiveStatistics
3334
from hsfs.decorators import typechecked
3435
from hsfs.transformation_statistics import TransformationStatistics
36+
from packaging.version import Version
3537

3638

3739
class UDFExecutionMode(Enum):
@@ -857,6 +859,8 @@ def to_dict(self) -> Dict[str, Any]:
857859
# Returns
858860
`Dict`: Dictionary that contains all data required to json serialize the object.
859861
"""
862+
backend_version = client.get_connection().backend_version
863+
860864
return {
861865
"sourceCode": self._function_source,
862866
"outputTypes": self.return_types,
@@ -869,7 +873,11 @@ def to_dict(self) -> Dict[str, Any]:
869873
"name": self.function_name,
870874
"featureNamePrefix": self._feature_name_prefix,
871875
"executionMode": self.execution_mode.value.upper(),
872-
"outputColumnNames": self.output_column_names,
876+
**(
877+
{"outputColumnNames": self.output_column_names}
878+
if Version(backend_version) >= Version("4.1.6")
879+
else {}
880+
), # This check is added for backward compatibility with older versions of Hopsworks. The "outputColumnNames" field was added in Hopsworks 4.1.6 and versions below do not support unknown fields in the backend.
873881
}
874882

875883
def json(self) -> str:

python/hsfs/transformation_function.py

+21-3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from typing import Any, Dict, List, Optional, Union
2222

2323
import humps
24+
from hopsworks_common import client
2425
from hopsworks_common.client.exceptions import FeatureStoreException
2526
from hopsworks_common.constants import FEATURES
2627
from hsfs import util
@@ -29,6 +30,7 @@
2930
from hsfs.decorators import typechecked
3031
from hsfs.hopsworks_udf import HopsworksUdf
3132
from hsfs.transformation_statistics import TransformationStatistics
33+
from packaging.version import Version
3234

3335

3436
_logger = logging.getLogger(__name__)
@@ -41,6 +43,7 @@ class TransformationType(Enum):
4143

4244
MODEL_DEPENDENT = "model_dependent"
4345
ON_DEMAND = "on_demand"
46+
UNDEFINED = "undefined" # This type is used when the UDF created is not attached to a feature view / feature group. Hence the transformation function is neither model dependent nor on-demand.
4447

4548

4649
@typechecked
@@ -87,7 +90,12 @@ def __init__(
8790
TransformationFunction._validate_transformation_type(
8891
transformation_type=transformation_type, hopsworks_udf=hopsworks_udf
8992
)
90-
self.transformation_type = transformation_type
93+
94+
# Setting transformation type as unknown when the transformation function is not attached to a feature view / feature group.
95+
# This happens for example when the transformation function is fetched from the backend.
96+
self.transformation_type = (
97+
transformation_type if transformation_type else TransformationType.UNDEFINED
98+
)
9199

92100
if self.__hopsworks_udf._generate_output_col_name:
93101
# Reset output column names so that they would be regenerated.
@@ -230,12 +238,18 @@ def to_dict(self) -> Dict[str, Any]:
230238
# Returns
231239
`Dict`: Dictionary that contains all data required to json serialize the object.
232240
"""
241+
backend_version = client.get_connection().backend_version
242+
233243
return {
234244
"id": self._id,
235245
"version": self._version,
236246
"featurestoreId": self._featurestore_id,
237247
"hopsworksUdf": self.hopsworks_udf.to_dict(),
238-
"transformationType": self.transformation_type.value,
248+
**(
249+
{"transformationType": self.transformation_type.value}
250+
if Version(backend_version) > Version("4.1.6")
251+
else {}
252+
), # This check is added for backward compatibility with older versions of Hopsworks. The "transformationType" field was added for equality checking of transformation functions and versions below 4.1.6 do not support unknown fields in the backend.
239253
}
240254

241255
def alias(self, *args: str):
@@ -349,7 +363,11 @@ def version(self, version: int) -> None:
349363
def hopsworks_udf(self) -> HopsworksUdf:
350364
"""Meta data class for the user defined transformation function."""
351365
# Make sure that the output column names for a model-dependent or on-demand transformation function, when accessed externally from the class.
352-
if self.transformation_type and not self.__hopsworks_udf.output_column_names:
366+
if (
367+
self.transformation_type
368+
and self.transformation_type != TransformationType.UNDEFINED
369+
and not self.__hopsworks_udf.output_column_names
370+
):
353371
self.__hopsworks_udf.output_column_names = self._get_output_column_names()
354372
return self.__hopsworks_udf
355373

python/tests/test_feature_view.py

+6
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#
1616
import warnings
1717

18+
from hopsworks_common import version
1819
from hsfs import feature_view, training_dataset_feature
1920
from hsfs.constructor import fs_query, query
2021
from hsfs.feature_store import FeatureStore
@@ -148,6 +149,11 @@ def test_from_response_json_basic_info_deprecated(self, mocker, backend_fixtures
148149
def test_transformation_function_instances(self, mocker, backend_fixtures):
149150
# Arrange
150151
feature_store_id = 99
152+
mocked_connection = mocker.MagicMock()
153+
mocked_connection.backend_version = version.__version__
154+
mocked_connection = mocker.patch(
155+
"hopsworks_common.client.get_connection", return_value=mocked_connection
156+
)
151157
mocker.patch("hsfs.core.feature_view_engine.FeatureViewEngine")
152158
json = backend_fixtures["fs_query"]["get"]["response"]
153159

python/tests/test_transformation_function.py

+50-4
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919

2020
import pandas as pd
2121
import pytest
22+
from hopsworks_common import version
2223
from hsfs.client.exceptions import FeatureStoreException
2324
from hsfs.hopsworks_udf import udf
2425
from hsfs.transformation_function import TransformationFunction, TransformationType
26+
from packaging.version import Version
2527

2628

2729
class TestTransformationFunction:
@@ -228,7 +230,15 @@ def test(col1):
228230
== "Please use the hopsworks_udf decorator when defining transformation functions."
229231
)
230232

231-
def test_transformation_function_definition_with_hopworks_udf(self):
233+
def test_transformation_function_definition_with_hopworks_udf(self, mocker):
234+
mocked_connection = mocker.MagicMock()
235+
mocked_connection.backend_version = (
236+
version.__version__
237+
) # Mocking backend version to be the same as the current version
238+
mocked_connection = mocker.patch(
239+
"hopsworks_common.client.get_connection", return_value=mocked_connection
240+
)
241+
232242
@udf(int)
233243
def test2(col1):
234244
return col1 + 1
@@ -956,7 +966,19 @@ def really_long_function_name_that_exceed_63_characters_causing_invalid_name_for
956966
"really_long_function_name_that_exceed_63_characters_causing_inv"
957967
]
958968

959-
def test_equality_mdt(self):
969+
@pytest.mark.skipif(
970+
Version("4.1.6") >= Version(version.__version__),
971+
reason="Requires Hopsworks 4.1.7 or higher to be working.",
972+
)
973+
def test_equality_mdt(self, mocker):
974+
mocked_connection = mocker.MagicMock()
975+
mocked_connection.backend_version = (
976+
version.__version__
977+
) # Mocking backend version to be the same as the current version
978+
mocked_connection = mocker.patch(
979+
"hopsworks_common.client.get_connection", return_value=mocked_connection
980+
)
981+
960982
@udf([int])
961983
def add_one(feature):
962984
return feature + 1
@@ -975,7 +997,19 @@ def add_one(feature):
975997

976998
assert mdt1 == mdt2
977999

978-
def test_equality_odt(self):
1000+
@pytest.mark.skipif(
1001+
Version("4.1.6") >= Version(version.__version__),
1002+
reason="Requires Hopsworks 4.1.7 or higher to be working.",
1003+
)
1004+
def test_equality_odt(self, mocker):
1005+
mocked_connection = mocker.MagicMock()
1006+
mocked_connection.backend_version = (
1007+
version.__version__
1008+
) # Mocking backend version to be the same as the current version
1009+
mocked_connection = mocker.patch(
1010+
"hopsworks_common.client.get_connection", return_value=mocked_connection
1011+
)
1012+
9791013
@udf([int])
9801014
def add_one(feature):
9811015
return feature + 1
@@ -994,7 +1028,19 @@ def add_one(feature):
9941028

9951029
assert odt1 == odt2
9961030

997-
def test_inequality(self):
1031+
@pytest.mark.skipif(
1032+
Version("4.1.6") >= Version(version.__version__),
1033+
reason="Requires Hopsworks 4.1.7 or higher to be working.",
1034+
)
1035+
def test_inequality(self, mocker):
1036+
mocked_connection = mocker.MagicMock()
1037+
mocked_connection.backend_version = (
1038+
version.__version__
1039+
) # Mocking backend version to be the same as the current version
1040+
mocked_connection = mocker.patch(
1041+
"hopsworks_common.client.get_connection", return_value=mocked_connection
1042+
)
1043+
9981044
@udf([int])
9991045
def add_one(feature):
10001046
return feature + 1

0 commit comments

Comments
 (0)