@@ -187,6 +187,8 @@ def _generate_merge_query(self, source_alias, updates_alias):
187
187
@staticmethod
188
188
def _get_last_commit_metadata (spark_context , base_path ):
189
189
fg_source_table = DeltaTable .forPath (spark_context , base_path )
190
+
191
+ # Get info about the latest commit
190
192
last_commit = fg_source_table .history (1 ).first ().asDict ()
191
193
version = last_commit ["version" ]
192
194
commit_timestamp = util .convert_event_time_to_timestamp (
@@ -195,6 +197,12 @@ def _get_last_commit_metadata(spark_context, base_path):
195
197
commit_date_string = util .get_hudi_datestr_from_timestamp (commit_timestamp )
196
198
operation_metrics = last_commit ["operationMetrics" ]
197
199
200
+ # Get info about the oldest remaining commit
201
+ oldest_commit = fg_source_table .history ().orderBy ("version" ).first ().asDict ()
202
+ oldest_commit_timestamp = util .convert_event_time_to_timestamp (
203
+ oldest_commit ["timestamp" ]
204
+ )
205
+
198
206
if version == 0 :
199
207
fg_commit = feature_group_commit .FeatureGroupCommit (
200
208
commitid = None ,
@@ -203,7 +211,7 @@ def _get_last_commit_metadata(spark_context, base_path):
203
211
rows_inserted = operation_metrics ["numOutputRows" ],
204
212
rows_updated = 0 ,
205
213
rows_deleted = 0 ,
206
- last_active_commit_time = commit_timestamp ,
214
+ last_active_commit_time = oldest_commit_timestamp ,
207
215
)
208
216
else :
209
217
fg_commit = feature_group_commit .FeatureGroupCommit (
@@ -213,7 +221,7 @@ def _get_last_commit_metadata(spark_context, base_path):
213
221
rows_inserted = operation_metrics ["numTargetRowsInserted" ],
214
222
rows_updated = operation_metrics ["numTargetRowsUpdated" ],
215
223
rows_deleted = operation_metrics ["numTargetRowsDeleted" ],
216
- last_active_commit_time = commit_timestamp ,
224
+ last_active_commit_time = oldest_commit_timestamp ,
217
225
)
218
226
219
227
return fg_commit
0 commit comments