Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1537] Managed feature group test #348

Merged
merged 39 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
11ed246
init
bubriks Sep 11, 2024
ec6aa89
add region
bubriks Sep 11, 2024
6bc8e79
fix ruff
bubriks Sep 11, 2024
14363dc
Merge branch 'main' into FSTORE-1537
bubriks Sep 16, 2024
6b66989
temp
bubriks Sep 18, 2024
b3fa7e3
s3 -> s3a
bubriks Sep 18, 2024
07f3a35
time_travel_format
bubriks Sep 18, 2024
5d7dff6
s3 -> s3a
bubriks Sep 18, 2024
23670c7
s3 issue continues
bubriks Sep 18, 2024
8922c81
prepare_spark
bubriks Sep 18, 2024
6b1a53c
path fix?
bubriks Sep 18, 2024
150f836
reconcile_hudi_schema
bubriks Sep 18, 2024
d7ea12f
small undo
bubriks Sep 18, 2024
6db0d1c
remove textFile
bubriks Sep 19, 2024
600b726
register_temporary_table
bubriks Sep 19, 2024
d0738d4
add uri to FG
bubriks Sep 19, 2024
f6da8a3
hive_sync
bubriks Sep 19, 2024
d54c370
fix s3 replace
bubriks Sep 19, 2024
da9d1a4
fix ruff
bubriks Sep 19, 2024
4ea0141
fix some tests
bubriks Sep 19, 2024
91bea60
get_uri()
bubriks Sep 19, 2024
3447281
setup_storage_connector
bubriks Sep 19, 2024
3f048fe
read write from managed FG
bubriks Sep 19, 2024
993649e
Merge branch 'main' into FSTORE-1537
bubriks Sep 19, 2024
700e2a8
fix merge
bubriks Sep 19, 2024
59f2c98
test fix
bubriks Sep 19, 2024
9ddd618
small fixes
bubriks Sep 23, 2024
44d2f6f
fix reconcile and prepare_spark
bubriks Sep 23, 2024
8aeeb03
fix add_cols_to_delta_table
bubriks Sep 23, 2024
d852a34
fix schema evolution hudi
bubriks Sep 24, 2024
c92a1d5
reduce number of get_uri calls
bubriks Sep 24, 2024
4447e3b
get_uri -> prepare_spark_location
bubriks Sep 25, 2024
1cb68ff
Merge branch 'main' into FSTORE-1537
bubriks Sep 25, 2024
0303ad9
add clean fg (#7)
bubriks Sep 30, 2024
2a22b56
Merge branch 'main' into FSTORE-1537
bubriks Oct 1, 2024
626fb12
add tests for clean
bubriks Oct 2, 2024
9ffa117
Merge branch 'managed_feature_group' into FSTORE-1537
bubriks Oct 2, 2024
de7e29a
test_prepare_spark_location
bubriks Oct 2, 2024
abec75e
ruff fix
bubriks Oct 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions python/tests/core/test_feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,54 @@ def test_commit_delete(self, mocker):
# Assert
assert mock_hudi_engine.return_value.delete_record.call_count == 1

def test_clean_delta(self, mocker):
# Arrange
feature_store_id = 99

mocker.patch("hsfs.engine.get_instance")
mock_hudi_engine = mocker.patch("hsfs.core.delta_engine.DeltaEngine")

fg_engine = feature_group_engine.FeatureGroupEngine(
feature_store_id=feature_store_id
)

fg = feature_group.FeatureGroup(
name="test",
version=1,
featurestore_id=feature_store_id,
primary_key=[],
partition_key=[],
id=10,
time_travel_format="DELTA",
)

# Act
fg_engine.clean(feature_group=fg, write_options={})

# Assert
assert mock_hudi_engine.return_value.vacuum.call_count == 1

def test_clean_hudi(self, mocker):
# Arrange
feature_store_id = 99

fg_engine = feature_group_engine.FeatureGroupEngine(
feature_store_id=feature_store_id
)

fg = feature_group.FeatureGroup(
name="test",
version=1,
featurestore_id=feature_store_id,
primary_key=[],
partition_key=[],
id=10,
time_travel_format="HUDI",
)

# Act
fg_engine.clean(feature_group=fg, write_options={})

def test_sql(self, mocker):
# Arrange
feature_store_id = 99
Expand Down
49 changes: 48 additions & 1 deletion python/tests/test_feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
)
from hsfs.client.exceptions import FeatureStoreException, RestAPIError
from hsfs.core.constants import HAS_GREAT_EXPECTATIONS
from hsfs.engine import python
from hsfs.engine import python, spark
from hsfs.transformation_function import TransformationType


Expand Down Expand Up @@ -908,3 +908,50 @@ def test_from_response_json_transformation_functions(self, backend_fixtures):
assert (
fg.expectation_suite.expectation_suite_name == "test_expectation_suite_name"
)

def test_prepare_spark_location(self, mocker, backend_fixtures):
# Arrange
engine = spark.Engine()
engine_instance = mocker.patch("hsfs.engine.get_instance", return_value=engine)
json = backend_fixtures["feature_group"]["get_basic_info"]["response"]
fg = feature_group.FeatureGroup.from_response_json(json)
fg._location = f"{fg.name}_{fg.version}"

# Act
path = fg.prepare_spark_location()

# Assert
assert fg.location == path
engine_instance.assert_not_called()

def test_prepare_spark_location_with_s3_connector(self, mocker, backend_fixtures):
# Arrange
engine = spark.Engine()
engine_instance = mocker.patch("hsfs.engine.get_instance", return_value=engine)
json = backend_fixtures["feature_group"]["get_basic_info"]["response"]
fg = feature_group.FeatureGroup.from_response_json(json)
fg._location = f"{fg.name}_{fg.version}"
fg._storage_connector = storage_connector.S3Connector(id=1, name="s3_conn", featurestore_id=fg.feature_store_id)

# Act
path = fg.prepare_spark_location()

# Assert
assert fg.location == path
engine_instance.assert_called_once()

def test_prepare_spark_location_with_s3_connector_python(self, mocker, backend_fixtures):
# Arrange
engine = python.Engine()
engine_instance = mocker.patch("hsfs.engine.get_instance", return_value=engine)
json = backend_fixtures["feature_group"]["get_basic_info"]["response"]
fg = feature_group.FeatureGroup.from_response_json(json)
fg._location = f"{fg.name}_{fg.version}"
fg._storage_connector = storage_connector.S3Connector(id=1, name="s3_conn", featurestore_id=fg.feature_store_id)

# Act
with pytest.raises(AttributeError):
fg.prepare_spark_location()

# Assert
engine_instance.assert_called_once()
Loading