@@ -172,6 +172,8 @@ def _generate_merge_query(self, source_alias, updates_alias):
172
172
@staticmethod
173
173
def _get_last_commit_metadata (spark_context , base_path ):
174
174
fg_source_table = DeltaTable .forPath (spark_context , base_path )
175
+
176
+ # Get info about the latest commit
175
177
last_commit = fg_source_table .history (1 ).first ().asDict ()
176
178
version = last_commit ["version" ]
177
179
commit_timestamp = util .convert_event_time_to_timestamp (
@@ -180,6 +182,12 @@ def _get_last_commit_metadata(spark_context, base_path):
180
182
commit_date_string = util .get_hudi_datestr_from_timestamp (commit_timestamp )
181
183
operation_metrics = last_commit ["operationMetrics" ]
182
184
185
+ # Get info about the oldest remaining commit
186
+ oldest_commit = fg_source_table .history ().orderBy ("version" ).first ().asDict ()
187
+ oldest_commit_timestamp = util .convert_event_time_to_timestamp (
188
+ oldest_commit ["timestamp" ]
189
+ )
190
+
183
191
if version == 0 :
184
192
fg_commit = feature_group_commit .FeatureGroupCommit (
185
193
commitid = None ,
@@ -188,7 +196,7 @@ def _get_last_commit_metadata(spark_context, base_path):
188
196
rows_inserted = operation_metrics ["numOutputRows" ],
189
197
rows_updated = 0 ,
190
198
rows_deleted = 0 ,
191
- last_active_commit_time = commit_timestamp ,
199
+ last_active_commit_time = oldest_commit_timestamp ,
192
200
)
193
201
else :
194
202
fg_commit = feature_group_commit .FeatureGroupCommit (
@@ -198,7 +206,7 @@ def _get_last_commit_metadata(spark_context, base_path):
198
206
rows_inserted = operation_metrics ["numTargetRowsInserted" ],
199
207
rows_updated = operation_metrics ["numTargetRowsUpdated" ],
200
208
rows_deleted = operation_metrics ["numTargetRowsDeleted" ],
201
- last_active_commit_time = commit_timestamp ,
209
+ last_active_commit_time = oldest_commit_timestamp ,
202
210
)
203
211
204
212
return fg_commit
0 commit comments