@@ -54,6 +54,7 @@ class Backup(TypedDict):
54
54
basebackup_info : BaseBackup
55
55
closed_at : Optional [float ]
56
56
completed_at : Optional [float ]
57
+ broken_at : Optional [float ]
57
58
recovery_site : bool
58
59
stream_id : str
59
60
resumable : bool
@@ -287,6 +288,10 @@ def restore_backup(
287
288
continue
288
289
if not backup ["basebackup_info" ]:
289
290
raise ValueError (f"Backup { backup !r} cannot be restored" )
291
+
292
+ if backup .get ("broken_at" ):
293
+ raise ValueError (f"Cannot restore a broken backup: { backup !r} " )
294
+
290
295
if target_time :
291
296
if target_time < backup ["basebackup_info" ]["end_ts" ]:
292
297
raise ValueError (f"Requested target time { target_time } predates backup completion: { backup !r} " )
@@ -559,6 +564,7 @@ def get_backup_list(backup_sites: Dict[str, BackupSiteInfo], *, seen_basebackup_
559
564
for site_and_stream_id in streams :
560
565
basebackup_compressed_size = None
561
566
basebackup_info = {}
567
+ broken_info = {}
562
568
closed_info = {}
563
569
completed_info = {}
564
570
for info in file_storage .list_iter (site_and_stream_id ):
@@ -573,6 +579,8 @@ def get_backup_list(backup_sites: Dict[str, BackupSiteInfo], *, seen_basebackup_
573
579
info_str , _ = file_storage .get_contents_to_string (info ["name" ])
574
580
basebackup_info = json .loads (info_str .decode ("utf-8" ))
575
581
seen_basebackup_infos [site_and_stream_id ] = basebackup_info
582
+ elif file_name == "broken.json" :
583
+ broken_info = parse_fs_metadata (info ["metadata" ])
576
584
elif file_name == "closed.json" :
577
585
closed_info = parse_fs_metadata (info ["metadata" ])
578
586
elif file_name == "completed.json" :
@@ -586,6 +594,7 @@ def get_backup_list(backup_sites: Dict[str, BackupSiteInfo], *, seen_basebackup_
586
594
backups .append (
587
595
{
588
596
"basebackup_info" : basebackup_info ,
597
+ "broken_at" : broken_info .get ("broken_at" ),
589
598
"closed_at" : closed_info ["closed_at" ] if closed else None ,
590
599
"completed_at" : completed_info ["completed_at" ] if completed else None ,
591
600
"recovery_site" : site_config .get ("recovery_only" , False ),
@@ -1140,6 +1149,7 @@ def _handle_mode_restore(self):
1140
1149
self ._process_local_binlog_updates ()
1141
1150
self ._extend_binlog_stream_list ()
1142
1151
if self .restore_coordinator .phase == RestoreCoordinator .Phase .failed_basebackup :
1152
+ self ._mark_failed_restore_backup_as_broken ()
1143
1153
self ._switch_basebackup_if_possible ()
1144
1154
if self .state ["promote_on_restore_completion" ] and self .restore_coordinator .is_complete ():
1145
1155
self .state_manager .update_state (
@@ -1151,6 +1161,20 @@ def _handle_mode_restore(self):
1151
1161
restore_options = {},
1152
1162
)
1153
1163
1164
+ def _mark_failed_restore_backup_as_broken (self ) -> None :
1165
+ broken_backup = None
1166
+ failed_stream_id = self .state ["restore_options" ]["stream_id" ]
1167
+ backups = self .state ["backups" ]
1168
+ for backup in backups :
1169
+ if backup ["stream_id" ] == failed_stream_id :
1170
+ broken_backup = backup
1171
+ break
1172
+
1173
+ if not broken_backup :
1174
+ raise Exception (f"Stream { failed_stream_id } to be marked as broken not found in completed backups: { backups } " )
1175
+
1176
+ self ._build_backup_stream (broken_backup ).mark_as_broken ()
1177
+
1154
1178
def _mark_periodic_backup_requested_if_interval_exceeded (self ):
1155
1179
normalized_backup_time = self ._current_normalized_backup_timestamp ()
1156
1180
most_recent_scheduled = None
@@ -1224,21 +1248,37 @@ def _process_removed_binlogs(self, binlogs):
1224
1248
1225
1249
def _purge_old_backups (self ):
1226
1250
purgeable = [backup for backup in self .state ["backups" ] if backup ["completed_at" ]]
1227
- if len (purgeable ) <= self .backup_settings ["backup_count_min" ]:
1251
+ broken_backups_count = sum (backup ["broken_at" ] is not None for backup in purgeable )
1252
+ # do not consider broken backups for the count, they will still be purged
1253
+ # but we should only purge when the count of non-broken backups has exceeded the limit.
1254
+ non_broken_backups_count = len (purgeable ) - broken_backups_count
1255
+
1256
+ if non_broken_backups_count <= self .backup_settings ["backup_count_max" ] < len (purgeable ):
1257
+ self .log .info (
1258
+ "Backup count %s is above max allowed, but %s are broken, not dropping" ,
1259
+ len (purgeable ),
1260
+ broken_backups_count ,
1261
+ )
1262
+ return
1263
+
1264
+ if non_broken_backups_count <= self .backup_settings ["backup_count_min" ]:
1228
1265
return
1229
1266
1230
1267
# For simplicity only ever drop one backup here. This function
1231
1268
# is called repeatedly so if there are for any reason more backups
1232
1269
# to drop they will be dropped soon enough
1233
1270
purgeable = sort_completed_backups (purgeable )
1234
1271
backup = purgeable [0 ]
1272
+
1235
1273
if not backup ["closed_at" ]:
1236
1274
return
1237
1275
1238
1276
if time .time () > backup ["closed_at" ] + self .backup_settings ["backup_age_days_max" ] * 24 * 60 * 60 :
1239
1277
self .log .info ("Backup %r is older than max backup age, dropping it" , backup ["stream_id" ])
1240
- elif len (purgeable ) > self .backup_settings ["backup_count_max" ]:
1241
- self .log .info ("Backup count %s is above max allowed, dropping %r" , len (purgeable ), backup ["stream_id" ])
1278
+ elif non_broken_backups_count > self .backup_settings ["backup_count_max" ]:
1279
+ self .log .info (
1280
+ "Non-broken backup count %s is above max allowed, dropping %r" , non_broken_backups_count , backup ["stream_id" ]
1281
+ )
1242
1282
else :
1243
1283
return
1244
1284
@@ -1619,7 +1659,9 @@ def _switch_basebackup_if_possible(self):
1619
1659
for backup in backups :
1620
1660
if backup ["stream_id" ] == current_stream_id :
1621
1661
break
1622
- earlier_backup = backup
1662
+
1663
+ if not backup ["broken_at" ]:
1664
+ earlier_backup = backup
1623
1665
else :
1624
1666
raise Exception (f"Stream { current_stream_id } being restored not found in completed backups: { backups } " )
1625
1667
0 commit comments