Skip to content

Commit 774d5d6

Browse files
authored
update table schema job
1 parent b859e16 commit 774d5d6

File tree

6 files changed

+66
-5
lines changed

6 files changed

+66
-5
lines changed

python/hsfs/core/feature_group_api.py

+36-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,12 @@
2121
from hopsworks_common import client
2222
from hsfs import feature_group as fg_mod
2323
from hsfs import feature_group_commit, util
24-
from hsfs.core import explicit_provenance, ingestion_job, ingestion_job_conf
24+
from hsfs.core import (
25+
explicit_provenance,
26+
ingestion_job,
27+
ingestion_job_conf,
28+
job,
29+
)
2530

2631

2732
class FeatureGroupApi:
@@ -416,6 +421,36 @@ def ingestion(
416421
),
417422
)
418423

424+
def update_table_schema(
425+
self,
426+
feature_group_instance: fg_mod.FeatureGroup,
427+
) -> job.Job:
428+
"""
429+
Setup a Hopsworks job to update table schema
430+
Args:
431+
feature_group_instance: FeatureGroup, required
432+
metadata object of feature group.
433+
job_conf: the configuration for the job application
434+
"""
435+
436+
_client = client.get_instance()
437+
path_params = [
438+
"project",
439+
_client._project_id,
440+
"featurestores",
441+
feature_group_instance.feature_store_id,
442+
"featuregroups",
443+
feature_group_instance.id,
444+
"updatetableschema",
445+
]
446+
447+
headers = {"content-type": "application/json"}
448+
return job.Job.from_response_json(
449+
_client._send_request(
450+
"POST", path_params, headers=headers
451+
),
452+
)
453+
419454
def get_parent_feature_groups(
420455
self,
421456
feature_group_instance: Union[

python/hsfs/core/feature_group_engine.py

+2
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,8 @@ def commit_delete(feature_group, delete_df, write_options):
249249
@staticmethod
250250
def delta_vacuum(feature_group, retention_hours):
251251
if feature_group.time_travel_format == "DELTA":
252+
# TODO: This should change, DeltaEngine and HudiEngine always assumes spark client!
253+
# Cannot properly manage what should happen when using python.
252254
delta_engine_instance = delta_engine.DeltaEngine(
253255
feature_group.feature_store_id,
254256
feature_group.feature_store_name,

python/hsfs/engine/python.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -1213,8 +1213,10 @@ def save_stream_dataframe(
12131213
)
12141214

12151215
def update_table_schema(self, feature_group: Union[FeatureGroup, ExternalFeatureGroup]) -> None:
1216-
"""Wrapper around update_table_schema in order to provide no-op."""
1217-
pass
1216+
_job = self._feature_group_api.update_table_schema(feature_group)
1217+
_job._wait_for_job(
1218+
await_termination=True
1219+
)
12181220

12191221
def _get_app_options(
12201222
self, user_write_options: Optional[Dict[str, Any]] = None

python/hsfs/engine/spark.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ def reconcile_schema(
251251
):
252252
if sorted(self._spark_session.table(fg_alias.alias).columns) != sorted(
253253
[feature.name for feature in fg_alias.feature_group._features] +
254-
self.HUDI_SPEC_FEATURE_NAMES if fg_alias.feature_group.time_travel_format == "HUDI" else []
254+
hudi_engine.HudiEngine.HUDI_SPEC_FEATURE_NAMES if fg_alias.feature_group.time_travel_format == "HUDI" else []
255255
):
256256
full_fg = feature_group_api.FeatureGroupApi().get(
257257
feature_store_id=fg_alias.feature_group._feature_store_id,

python/tests/engine/test_python.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -2565,15 +2565,22 @@ def test_save_stream_dataframe(self):
25652565
== "Stream ingestion is not available on Python environments, because it requires Spark as engine."
25662566
)
25672567

2568-
def test_update_table_schema(self):
2568+
def test_update_table_schema(self, mocker):
25692569
# Arrange
2570+
mock_fg_api = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi")
2571+
25702572
python_engine = python.Engine()
25712573

2574+
mock_fg_api.return_value.update_table_schema.return_value.job = job.Job(
2575+
1, "test_job", None, None, None, None
2576+
)
2577+
25722578
# Act
25732579
result = python_engine.update_table_schema(feature_group=None)
25742580

25752581
# Assert
25762582
assert result is None
2583+
assert mock_fg_api.return_value.update_table_schema.call_count == 1
25772584

25782585
def test_get_app_options(self, mocker):
25792586
# Arrange

utils/python/hsfs_utils.py

+15
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,18 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in
323323
offset_df = spark.createDataFrame([offset_dict])
324324
offset_df.coalesce(1).write.mode("overwrite").json(offset_location)
325325

326+
def update_table_schema_fg(spark: SparkSession, job_conf: Dict[Any, Any]) -> None:
327+
"""
328+
Run table schema update job on a feature group.
329+
"""
330+
feature_store = job_conf.pop("feature_store")
331+
fs = get_feature_store_handle(feature_store)
332+
333+
entity = fs.get_feature_group(name=job_conf["name"], version=job_conf["version"])
334+
335+
entity.stream = False
336+
engine.get_instance().update_table_schema(entity)
337+
326338
def _build_starting_offsets(initial_check_point_string: str):
327339
if not initial_check_point_string:
328340
return ""
@@ -358,6 +370,7 @@ def _build_starting_offsets(initial_check_point_string: str):
358370
"run_feature_monitoring",
359371
"delta_vacuum_fg",
360372
"offline_fg_materialization",
373+
"update_table_schema_fg",
361374
],
362375
help="Operation type",
363376
)
@@ -406,6 +419,8 @@ def parse_isoformat_date(da: str) -> datetime:
406419
delta_vacuum_fg(spark, job_conf)
407420
elif args.op == "offline_fg_materialization":
408421
offline_fg_materialization(spark, job_conf, args.initialCheckPointString)
422+
elif args.op == "update_table_schema_fg":
423+
update_table_schema_fg(spark, job_conf)
409424

410425
success = True
411426
except Exception:

0 commit comments

Comments
 (0)