diff --git a/python/tests/core/test_feature_group_engine.py b/python/tests/core/test_feature_group_engine.py index 2847d5219..f93e44abd 100644 --- a/python/tests/core/test_feature_group_engine.py +++ b/python/tests/core/test_feature_group_engine.py @@ -525,6 +525,54 @@ def test_commit_delete(self, mocker): # Assert assert mock_hudi_engine.return_value.delete_record.call_count == 1 + def test_clean_delta(self, mocker): + # Arrange + feature_store_id = 99 + + mocker.patch("hsfs.engine.get_instance") + mock_hudi_engine = mocker.patch("hsfs.core.delta_engine.DeltaEngine") + + fg_engine = feature_group_engine.FeatureGroupEngine( + feature_store_id=feature_store_id + ) + + fg = feature_group.FeatureGroup( + name="test", + version=1, + featurestore_id=feature_store_id, + primary_key=[], + partition_key=[], + id=10, + time_travel_format="DELTA", + ) + + # Act + fg_engine.clean(feature_group=fg, write_options={}) + + # Assert + assert mock_hudi_engine.return_value.vacuum.call_count == 1 + + def test_clean_hudi(self, mocker): + # Arrange + feature_store_id = 99 + + fg_engine = feature_group_engine.FeatureGroupEngine( + feature_store_id=feature_store_id + ) + + fg = feature_group.FeatureGroup( + name="test", + version=1, + featurestore_id=feature_store_id, + primary_key=[], + partition_key=[], + id=10, + time_travel_format="HUDI", + ) + + # Act + fg_engine.clean(feature_group=fg, write_options={}) + def test_sql(self, mocker): # Arrange feature_store_id = 99 diff --git a/python/tests/test_feature_group.py b/python/tests/test_feature_group.py index bb2944f97..5e01b5a10 100644 --- a/python/tests/test_feature_group.py +++ b/python/tests/test_feature_group.py @@ -32,7 +32,7 @@ ) from hsfs.client.exceptions import FeatureStoreException, RestAPIError from hsfs.core.constants import HAS_GREAT_EXPECTATIONS -from hsfs.engine import python +from hsfs.engine import python, spark from hsfs.transformation_function import TransformationType @@ -908,3 +908,50 @@ def test_from_response_json_transformation_functions(self, backend_fixtures): assert ( fg.expectation_suite.expectation_suite_name == "test_expectation_suite_name" ) + + def test_prepare_spark_location(self, mocker, backend_fixtures): + # Arrange + engine = spark.Engine() + engine_instance = mocker.patch("hsfs.engine.get_instance", return_value=engine) + json = backend_fixtures["feature_group"]["get_basic_info"]["response"] + fg = feature_group.FeatureGroup.from_response_json(json) + fg._location = f"{fg.name}_{fg.version}" + + # Act + path = fg.prepare_spark_location() + + # Assert + assert fg.location == path + engine_instance.assert_not_called() + + def test_prepare_spark_location_with_s3_connector(self, mocker, backend_fixtures): + # Arrange + engine = spark.Engine() + engine_instance = mocker.patch("hsfs.engine.get_instance", return_value=engine) + json = backend_fixtures["feature_group"]["get_basic_info"]["response"] + fg = feature_group.FeatureGroup.from_response_json(json) + fg._location = f"{fg.name}_{fg.version}" + fg._storage_connector = storage_connector.S3Connector(id=1, name="s3_conn", featurestore_id=fg.feature_store_id) + + # Act + path = fg.prepare_spark_location() + + # Assert + assert fg.location == path + engine_instance.assert_called_once() + + def test_prepare_spark_location_with_s3_connector_python(self, mocker, backend_fixtures): + # Arrange + engine = python.Engine() + engine_instance = mocker.patch("hsfs.engine.get_instance", return_value=engine) + json = backend_fixtures["feature_group"]["get_basic_info"]["response"] + fg = feature_group.FeatureGroup.from_response_json(json) + fg._location = f"{fg.name}_{fg.version}" + fg._storage_connector = storage_connector.S3Connector(id=1, name="s3_conn", featurestore_id=fg.feature_store_id) + + # Act + with pytest.raises(AttributeError): + fg.prepare_spark_location() + + # Assert + engine_instance.assert_called_once()