From fc0389b4e87618f7e6b74dd22017c263a4ff66b1 Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 25 Sep 2024 12:06:48 +0300 Subject: [PATCH] init --- python/hsfs/core/delta_engine.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/python/hsfs/core/delta_engine.py b/python/hsfs/core/delta_engine.py index 56cce3a30..95ea7f37b 100644 --- a/python/hsfs/core/delta_engine.py +++ b/python/hsfs/core/delta_engine.py @@ -172,6 +172,8 @@ def _generate_merge_query(self, source_alias, updates_alias): @staticmethod def _get_last_commit_metadata(spark_context, base_path): fg_source_table = DeltaTable.forPath(spark_context, base_path) + + # Get info about the latest commit last_commit = fg_source_table.history(1).first().asDict() version = last_commit["version"] commit_timestamp = util.convert_event_time_to_timestamp( @@ -180,6 +182,12 @@ def _get_last_commit_metadata(spark_context, base_path): commit_date_string = util.get_hudi_datestr_from_timestamp(commit_timestamp) operation_metrics = last_commit["operationMetrics"] + # Get info about the oldest remaining commit + oldest_commit = fg_source_table.history().orderBy("version").first().asDict() + oldest_commit_timestamp = util.convert_event_time_to_timestamp( + oldest_commit["timestamp"] + ) + if version == 0: fg_commit = feature_group_commit.FeatureGroupCommit( commitid=None, @@ -188,7 +196,7 @@ def _get_last_commit_metadata(spark_context, base_path): rows_inserted=operation_metrics["numOutputRows"], rows_updated=0, rows_deleted=0, - last_active_commit_time=commit_timestamp, + last_active_commit_time=oldest_commit_timestamp, ) else: fg_commit = feature_group_commit.FeatureGroupCommit( @@ -198,7 +206,7 @@ def _get_last_commit_metadata(spark_context, base_path): rows_inserted=operation_metrics["numTargetRowsInserted"], rows_updated=operation_metrics["numTargetRowsUpdated"], rows_deleted=operation_metrics["numTargetRowsDeleted"], - last_active_commit_time=commit_timestamp, + last_active_commit_time=oldest_commit_timestamp, ) return fg_commit