12
12
import subprocess
13
13
import time
14
14
from concurrent .futures import ProcessPoolExecutor
15
+ from copy import deepcopy
15
16
from datetime import datetime , timezone
16
17
from functools import partial
17
18
from typing import Any , Callable , Dict , Generator , Iterable , List , Optional , Tuple , Union
@@ -268,7 +269,8 @@ def prepare_src_dump(self):
268
269
self .src_dump = get_src_dump ()
269
270
self .src_doc = self .src_dump .find_one ({'_id' : self .src_name }) or {}
270
271
271
- def register_status (self , status , transient = False , ** extra ):
272
+ def register_status (self , status , transient = False , dry_run = False , ** extra ):
273
+ src_doc = deepcopy (self .src_doc )
272
274
try :
273
275
# if status is "failed" and depending on where it failed,
274
276
# we may not be able to get the new_data_folder (if dumper didn't reach
@@ -281,20 +283,18 @@ def register_status(self, status, transient=False, **extra):
281
283
# it has not been set by the dumper before while exploring
282
284
# remote site. maybe we're just running post step ?
283
285
# back-compatibility; use "release" at root level if not found under "download"
284
- release = self .src_doc .get ("download" , {}).get ("release" ) or self .src_doc .get (
285
- "release"
286
- )
286
+ release = src_doc .get ("download" , {}).get ("release" ) or src_doc .get ("release" )
287
287
self .logger .error (
288
288
"No release set, assuming: data_folder: %s, release: %s" % (data_folder , release )
289
289
)
290
290
# make sure to remove old "release" field to get back on track
291
291
for field in ["release" , "data_folder" ]:
292
- if self . src_doc .get (field ):
292
+ if src_doc .get (field ):
293
293
self .logger .warning (
294
294
"Found '%s'='%s' at root level, convert to new format"
295
- % (field , self . src_doc [field ])
295
+ % (field , src_doc [field ])
296
296
)
297
- self . src_doc .pop (field )
297
+ src_doc .pop (field )
298
298
299
299
current_download_info = {
300
300
'_id' : self .src_name ,
@@ -312,7 +312,7 @@ def register_status(self, status, transient=False, **extra):
312
312
last_success = current_download_info ["download" ]["started_at" ]
313
313
else :
314
314
# If failed, we will get the last_success from the last download instead.
315
- last_download_info = self . src_doc .setdefault ("download" , {})
315
+ last_download_info = src_doc .setdefault ("download" , {})
316
316
last_success = last_download_info .get ("last_success" , None )
317
317
if not last_success and last_download_info .get ("status" ) == 'success' :
318
318
# If last_success from the last download doesn't exist or is None, and last
@@ -321,18 +321,22 @@ def register_status(self, status, transient=False, **extra):
321
321
if last_success :
322
322
current_download_info ["download" ]["last_success" ] = last_success
323
323
324
- self . src_doc .update (current_download_info )
324
+ src_doc .update (current_download_info )
325
325
326
326
# only register time when it's a final state
327
327
if transient :
328
- self . src_doc ["download" ]["pid" ] = os .getpid ()
328
+ src_doc ["download" ]["pid" ] = os .getpid ()
329
329
else :
330
- self . src_doc ["download" ]["time" ] = timesofar (self .t0 )
330
+ src_doc ["download" ]["time" ] = timesofar (self .t0 )
331
331
if "download" in extra :
332
- self . src_doc ["download" ].update (extra ["download" ])
332
+ src_doc ["download" ].update (extra ["download" ])
333
333
else :
334
- self .src_doc .update (extra )
335
- self .src_dump .save (self .src_doc )
334
+ src_doc .update (extra )
335
+
336
+ # when dry run, we should not change the src_doc, and src_dump
337
+ if not dry_run :
338
+ self .src_doc = deepcopy (src_doc )
339
+ self .src_dump .save (src_doc )
336
340
337
341
async def dump (self , steps = None , force = False , job_manager = None , check_only = False , ** kwargs ):
338
342
'''
@@ -423,6 +427,19 @@ def postdumped(f):
423
427
if self .client :
424
428
self .release_client ()
425
429
430
+ def mark_success (self , dry_run = True ):
431
+ '''
432
+ Mark the datasource as successful dumped.
433
+ It's useful in case the datasource is unstable, and need to be manually downloaded.
434
+ '''
435
+ self .register_status ("success" , dry_run = dry_run )
436
+ self .logger .info ("Done!" )
437
+ result = {
438
+ "_id" : self .src_doc ["_id" ],
439
+ "download" : self .src_doc ["download" ],
440
+ }
441
+ return result
442
+
426
443
def get_predicates (self ):
427
444
"""
428
445
Return a list of predicates (functions returning true/false, as in math logic)
@@ -1444,6 +1461,19 @@ def dump_src(
1444
1461
logging .error ("Error while dumping '%s': %s" % (src , e ))
1445
1462
raise
1446
1463
1464
+ def mark_success (self , src , dry_run = True ):
1465
+ result = []
1466
+ if src in self .register :
1467
+ klasses = self .register [src ]
1468
+ else :
1469
+ raise DumperException (
1470
+ "Can't find '%s' in registered sources (whether as main or sub-source)" % src
1471
+ )
1472
+ for _ , klass in enumerate (klasses ):
1473
+ inst = self .create_instance (klass )
1474
+ result .append (inst .mark_success (dry_run = dry_run ))
1475
+ return result
1476
+
1447
1477
def call (self , src , method_name , * args , ** kwargs ):
1448
1478
"""
1449
1479
Create a dumper for datasource "src" and call method "method_name" on it,
0 commit comments