Skip to content

Commit 0303ad9

Browse files
authored
add clean fg (#7)
1 parent 1cb68ff commit 0303ad9

File tree

5 files changed

+68
-0
lines changed

5 files changed

+68
-0
lines changed

python/hsfs/core/delta_engine.py

+9
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,15 @@ def _write_delta_dataset(self, dataset, write_options):
158158
self._spark_session, location
159159
)
160160

161+
def vacuum(self, retention_hours):
162+
location = self._feature_group.prepare_spark_location()
163+
164+
delta_table = DeltaTable.forPath(self._spark_session, location)
165+
166+
# Vacuum the table
167+
# https://docs.delta.io/1.0.1/api/python/index.html#delta.tables.DeltaTable.vacuum
168+
delta_table.vacuum(retention_hours)
169+
161170
def _generate_merge_query(self, source_alias, updates_alias):
162171
merge_query_list = []
163172
primary_key = self._feature_group.primary_key

python/hsfs/core/feature_group_engine.py

+14
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,20 @@ def commit_delete(feature_group, delete_df, write_options):
246246
)
247247
return hudi_engine_instance.delete_record(delete_df, write_options)
248248

249+
@staticmethod
250+
def clean(feature_group, write_options):
251+
if feature_group.time_travel_format == "DELTA":
252+
delta_engine_instance = delta_engine.DeltaEngine(
253+
feature_group.feature_store_id,
254+
feature_group.feature_store_name,
255+
feature_group,
256+
engine.get_instance()._spark_session,
257+
engine.get_instance()._spark_context,
258+
)
259+
return delta_engine_instance.vacuum(write_options.get("retention_hours", None))
260+
else:
261+
return None
262+
249263
def sql(self, query, feature_store_name, dataframe_type, online, read_options):
250264
if online and self._online_conn is None:
251265
self._online_conn = self._storage_connector_api.get_online_connector(

python/hsfs/engine/python.py

+6
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,12 @@ def register_external_temporary_table(
481481
# No op to avoid query failure
482482
pass
483483

484+
def register_delta_temporary_table(
485+
self, delta_fg_alias, feature_store_id, feature_store_name, read_options
486+
):
487+
# No op to avoid query failure
488+
pass
489+
484490
def register_hudi_temporary_table(
485491
self,
486492
hudi_fg_alias: "hsfs.constructor.hudi_feature_group_alias.HudiFeatureGroupAlias",

python/hsfs/feature_group.py

+24
Original file line numberDiff line numberDiff line change
@@ -3277,6 +3277,30 @@ def commit_delete_record(
32773277
"""
32783278
self._feature_group_engine.commit_delete(self, delete_df, write_options or {})
32793279

3280+
def clean(
3281+
self,
3282+
write_options: Optional[Dict[Any, Any]] = None,
3283+
) -> None:
3284+
""" Clean up old files. This method can only be used on feature groups stored as DELTA.
3285+
3286+
!!! example
3287+
```python
3288+
# connect to the Feature Store
3289+
fs = ...
3290+
3291+
# get the Feature Group instance
3292+
fg = fs.get_or_create_feature_group(...)
3293+
3294+
commit_details = fg.clean(write_options = {"retention_hours": 100})
3295+
3296+
# Arguments
3297+
write_options: User provided write options. Defaults to `{}`.
3298+
3299+
# Raises
3300+
`hsfs.client.exceptions.RestAPIError`.
3301+
"""
3302+
self._feature_group_engine.clean(self, write_options or {})
3303+
32803304
def as_of(
32813305
self,
32823306
wallclock_time: Optional[Union[str, int, datetime, date]] = None,

utils/python/hsfs_utils.py

+15
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,18 @@ def run_feature_monitoring(job_conf: Dict[str, str]) -> None:
247247
raise e
248248

249249

250+
def clean_fg(spark: SparkSession, job_conf: Dict[Any, Any]) -> None:
251+
"""
252+
Run clean on a feature group.
253+
"""
254+
feature_store = job_conf.pop("feature_store")
255+
fs = get_feature_store_handle(feature_store)
256+
257+
entity = fs.get_feature_group(name=job_conf["name"], version=job_conf["version"])
258+
259+
entity.clean()
260+
261+
250262
if __name__ == "__main__":
251263
# Setup spark first so it fails faster in case of args errors
252264
# Otherwise the resource manager will wait until the spark application master
@@ -265,6 +277,7 @@ def run_feature_monitoring(job_conf: Dict[str, str]) -> None:
265277
"ge_validate",
266278
"import_fg",
267279
"run_feature_monitoring",
280+
"clean_fg",
268281
],
269282
help="Operation type",
270283
)
@@ -303,6 +316,8 @@ def parse_isoformat_date(da: str) -> datetime:
303316
import_fg(job_conf)
304317
elif args.op == "run_feature_monitoring":
305318
run_feature_monitoring(job_conf)
319+
elif args.op == "clean_fg":
320+
clean_fg(spark, job_conf)
306321

307322
success = True
308323
except Exception:

0 commit comments

Comments
 (0)