Skip to content

Commit 3a7b6d7

Browse files
authored
[FSTORE-1537][APPEND] Managed feature group delta vacuum (#354)
1 parent 06425e9 commit 3a7b6d7

File tree

5 files changed

+20
-23
lines changed

5 files changed

+20
-23
lines changed

python/hsfs/core/delta_engine.py

+3-7
Original file line numberDiff line numberDiff line change
@@ -158,14 +158,10 @@ def _write_delta_dataset(self, dataset, write_options):
158158
self._spark_session, location
159159
)
160160

161-
def vacuum(self, retention_hours):
161+
def vacuum(self, retention_hours: int):
162162
location = self._feature_group.prepare_spark_location()
163-
164-
delta_table = DeltaTable.forPath(self._spark_session, location)
165-
166-
# Vacuum the table
167-
# https://docs.delta.io/1.0.1/api/python/index.html#delta.tables.DeltaTable.vacuum
168-
delta_table.vacuum(retention_hours)
163+
retention = f"RETAIN {retention_hours} HOURS" if retention_hours is not None else ""
164+
self._spark_session.sql(f"VACUUM '{location}' {retention}")
169165

170166
def _generate_merge_query(self, source_alias, updates_alias):
171167
merge_query_list = []

python/hsfs/core/feature_group_engine.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ def commit_delete(feature_group, delete_df, write_options):
247247
return hudi_engine_instance.delete_record(delete_df, write_options)
248248

249249
@staticmethod
250-
def clean(feature_group, write_options):
250+
def delta_vacuum(feature_group, retention_hours):
251251
if feature_group.time_travel_format == "DELTA":
252252
delta_engine_instance = delta_engine.DeltaEngine(
253253
feature_group.feature_store_id,
@@ -256,7 +256,7 @@ def clean(feature_group, write_options):
256256
engine.get_instance()._spark_session,
257257
engine.get_instance()._spark_context,
258258
)
259-
return delta_engine_instance.vacuum(write_options.get("retention_hours", None))
259+
return delta_engine_instance.vacuum(retention_hours)
260260
else:
261261
return None
262262

python/hsfs/feature_group.py

+7-6
Original file line numberDiff line numberDiff line change
@@ -3276,11 +3276,12 @@ def commit_delete_record(
32763276
"""
32773277
self._feature_group_engine.commit_delete(self, delete_df, write_options or {})
32783278

3279-
def clean(
3279+
def delta_vacuum(
32803280
self,
3281-
write_options: Optional[Dict[Any, Any]] = None,
3281+
retention_hours: int = None,
32823282
) -> None:
3283-
""" Clean up old files. This method can only be used on feature groups stored as DELTA.
3283+
""" Vacuum files that are no longer referenced by a Delta table and are older than the retention threshold.
3284+
This method can only be used on feature groups stored as DELTA.
32843285
32853286
!!! example
32863287
```python
@@ -3290,15 +3291,15 @@ def clean(
32903291
# get the Feature Group instance
32913292
fg = fs.get_or_create_feature_group(...)
32923293
3293-
commit_details = fg.clean(write_options = {"retention_hours": 100})
3294+
commit_details = fg.delta_vacuum(retention_hours = 168)
32943295
32953296
# Arguments
3296-
write_options: User provided write options. Defaults to `{}`.
3297+
retention_hours: User provided retention period. The default retention threshold for the files is 7 days.
32973298
32983299
# Raises
32993300
`hsfs.client.exceptions.RestAPIError`.
33003301
"""
3301-
self._feature_group_engine.clean(self, write_options or {})
3302+
self._feature_group_engine.delta_vacuum(self, retention_hours)
33023303

33033304
def as_of(
33043305
self,

python/tests/core/test_feature_group_engine.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,7 @@ def test_clean_delta(self, mocker):
547547
)
548548

549549
# Act
550-
fg_engine.clean(feature_group=fg, write_options={})
550+
fg_engine.delta_vacuum(feature_group=fg, retention_hours=200)
551551

552552
# Assert
553553
assert mock_hudi_engine.return_value.vacuum.call_count == 1
@@ -571,7 +571,7 @@ def test_clean_hudi(self, mocker):
571571
)
572572

573573
# Act
574-
fg_engine.clean(feature_group=fg, write_options={})
574+
fg_engine.delta_vacuum(feature_group=fg, retention_hours=200)
575575

576576
def test_sql(self, mocker):
577577
# Arrange

utils/python/hsfs_utils.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -247,16 +247,16 @@ def run_feature_monitoring(job_conf: Dict[str, str]) -> None:
247247
raise e
248248

249249

250-
def clean_fg(spark: SparkSession, job_conf: Dict[Any, Any]) -> None:
250+
def delta_vacuum_fg(spark: SparkSession, job_conf: Dict[Any, Any]) -> None:
251251
"""
252-
Run clean on a feature group.
252+
Run delta vacuum on a feature group.
253253
"""
254254
feature_store = job_conf.pop("feature_store")
255255
fs = get_feature_store_handle(feature_store)
256256

257257
entity = fs.get_feature_group(name=job_conf["name"], version=job_conf["version"])
258258

259-
entity.clean()
259+
entity.delta_vacuum()
260260

261261

262262
if __name__ == "__main__":
@@ -277,7 +277,7 @@ def clean_fg(spark: SparkSession, job_conf: Dict[Any, Any]) -> None:
277277
"ge_validate",
278278
"import_fg",
279279
"run_feature_monitoring",
280-
"clean_fg",
280+
"delta_vacuum_fg",
281281
],
282282
help="Operation type",
283283
)
@@ -316,8 +316,8 @@ def parse_isoformat_date(da: str) -> datetime:
316316
import_fg(job_conf)
317317
elif args.op == "run_feature_monitoring":
318318
run_feature_monitoring(job_conf)
319-
elif args.op == "clean_fg":
320-
clean_fg(spark, job_conf)
319+
elif args.op == "delta_vacuum_fg":
320+
delta_vacuum_fg(spark, job_conf)
321321

322322
success = True
323323
except Exception:

0 commit comments

Comments
 (0)