diff --git a/bin/rucio b/bin/rucio index 03aede6311..618add30ac 100755 --- a/bin/rucio +++ b/bin/rucio @@ -924,17 +924,10 @@ def upload(args): return FAILURE args.lifetime = (expiration_date - datetime.utcnow()).total_seconds() - if args.dirac and args.recursive: - logger.error("--recursive and --dirac cannot be specified at the same time.\ - As dirac will register the heirachy to rucio.") - return FAILURE - - dirac = False if args.dirac: if args.lifetime: logger.warning("Ignoring --lifetime as --dirac is set where the lifetime is taken from cfg file for dataset") - dirac = True - + dsscope = None dsname = None for arg in args.args: @@ -971,7 +964,12 @@ def upload(args): if len(items) < 1: raise InputValidationError('No files could be extracted from the given arguments') - + if args.dirac and args.recursive: + logger.error("--recursive and --dirac cannot be specified at the same time.") + logger.error("As dirac will register the heirachy to rucio.") + if args.dirac and not (args.name or dsname): + logger.error("Either --name or dataset name must be specified for dirac") + raise InputValidationError('Invalid input argument composition') if len(items) > 1 and args.guid: logger.error("A single GUID was specified on the command line, but there are multiple files to upload.") logger.error("If GUID auto-detection is not used, only one file may be uploaded at a time") @@ -990,7 +988,7 @@ def upload(args): from rucio.client.uploadclient import UploadClient upload_client = UploadClient(client, logger=logger) summary_file_path = 'rucio_upload.json' if args.summary else None - upload_client.upload(items, summary_file_path, dirac=dirac) + upload_client.upload(items, summary_file_path, dirac=args.dirac) return SUCCESS @@ -2299,6 +2297,7 @@ You can filter by key/value, e.g.:: upload_parser.add_argument('--transfer-timeout', dest='transfer_timeout', type=float, action='store', default=config_get_float('upload', 'transfer_timeout', False, 360), help='Transfer timeout (in seconds).') upload_parser.add_argument(dest='args', action='store', nargs='+', help='files and datasets.') upload_parser.add_argument('--recursive', dest='recursive', action='store_true', default=False, help='Convert recursively the folder structure into collections') + upload_parser.add_argument('--dirac', dest='dirac', action='store_true', default=False, help='Does the registriation to rucio in herirachial namespace as dirac.') # The download and get subparser get_parser = subparsers.add_parser('get', help='Download method (synonym for download)') diff --git a/lib/rucio/client/uploadclient.py b/lib/rucio/client/uploadclient.py index 24a5c1815b..f8f494fe16 100644 --- a/lib/rucio/client/uploadclient.py +++ b/lib/rucio/client/uploadclient.py @@ -127,10 +127,10 @@ def _pick_random_rse(rse_expression): raise RSEWriteBlocked('%s is not available for writing. No actions have been taken' % rse) file['rse'] = rse - registered_file_dids.add('%s:%s' % (file['did_scope'], file['did_name'])) + dataset_scope = file.get('dataset_scope') + dataset_name = file.get('dataset_name') if not dirac: - dataset_scope = file.get('dataset_scope') - dataset_name = file.get('dataset_name') + registered_file_dids.add('%s:%s' % (file['did_scope'], file['did_name'])) if dataset_scope and dataset_name: dataset_did_str = ('%s:%s' % (dataset_scope, dataset_name)) file['dataset_did_str'] = dataset_did_str @@ -141,10 +141,12 @@ def _pick_random_rse(rse_expression): logger(logging.DEBUG, 'Input validation done.') if dirac: + if not dataset_name and not file['did_name']: + raise InputValidationError('Either dataset_name or did_name must be specified for dirac upload') scope, _ = extract_scope(file['did_name']) if file['did_scope']: if file['did_scope'] != scope: - logger(logging.WARNING, 'skipping scope %s provided for the file %s and using the proper scope %s from extract_scope algorith.\ + logger(logging.WARNING, 'replacing scope %s provided for the file %s and using the proper scope %s from extract_scope algorithm.\ ' % (str( file['did_scope']), str(file['did_name']), str(scope))) # clear this set again to ensure that we only try to register datasets once @@ -166,7 +168,6 @@ def _pick_random_rse(rse_expression): # appending trace to list reference, if the reference exists if traces_copy_out is not None: traces_copy_out.append(trace) - rse = file['rse'] trace['scope'] = file['did_scope'] trace['datasetScope'] = file.get('dataset_scope', '') @@ -371,8 +372,8 @@ def _register_file(self, file, registered_dataset_dids, ignore_availability=Fals logger(logging.WARNING, 'Scope {} not found for the account {}.'.format(file['did_scope'], self.client.account)) rse = file['rse'] + dataset_did_str = file.get('dataset_did_str') if not dirac: - dataset_did_str = file.get('dataset_did_str') # register a dataset if we need to if dataset_did_str and dataset_did_str not in registered_dataset_dids: registered_dataset_dids.add(dataset_did_str) @@ -395,7 +396,6 @@ def _register_file(self, file, registered_dataset_dids, ignore_availability=Fals file.get('lifetime'))) else: logger(logging.DEBUG, 'Skipping dataset registration') - file_scope = file['did_scope'] file_name = file['did_name'] file_did = {'scope': file_scope, 'name': file_name} @@ -423,7 +423,7 @@ def _register_file(self, file, registered_dataset_dids, ignore_availability=Fals if not dirac: self.client.add_replicas(rse=rse, files=[replica_for_api]) else: - self.client.add_files(replica_for_api, ignore_availability=ignore_availability) + self.client.add_files([replica_for_api], ignore_availability=ignore_availability) logger(logging.INFO, 'Successfully added replica in Rucio catalogue at %s' % rse) if not dirac: if not dataset_did_str: @@ -457,7 +457,7 @@ def _get_file_guid(self, file): guid = generate_uuid() return guid - def _collect_file_info(self, filepath, item): + def _collect_file_info(self, filepath, item, dirac: bool = False): """ Collects infos (e.g. size, checksums, etc.) about the file and returns them as a dictionary @@ -465,6 +465,7 @@ def _collect_file_info(self, filepath, item): :param filepath: path where the file is stored :param item: input options for the given file + :param dirac: True if the upload is dirac. (Default: False) :returns: a dictionary containing all collected info and the input options """ @@ -478,10 +479,18 @@ def _collect_file_info(self, filepath, item): new_item['md5'] = md5(filepath) new_item['meta'] = {'guid': self._get_file_guid(new_item)} new_item['state'] = 'C' - if not new_item.get('did_scope'): - new_item['did_scope'] = self.default_file_scope - if not new_item.get('did_name'): - new_item['did_name'] = new_item['basename'] + if dirac: + if not new_item.get('did_name'): + if new_item.get('dataset_name'): + new_item['did_name'] = new_item['dataset_name'] + "/" + new_item['basename'] + if not new_item.get('did_scope'): + scope, _ = extract_scope(new_item['did_name']) + new_item['did_scope'] = scope + else: + if not new_item.get('did_scope'): + new_item['did_scope'] = self.default_file_scope + if not new_item.get('did_name'): + new_item['did_name'] = new_item['basename'] return new_item @@ -492,6 +501,7 @@ def _collect_and_validate_file_info(self, items, dirac: bool = False): (This function is meant to be used as class internal only) :param filepath: list of dictionaries with all input files and options + :param dirac: True if the upload is dirac. (Default: False) :returns: a list of dictionaries containing all descriptions of the files to upload @@ -509,10 +519,6 @@ def _collect_and_validate_file_info(self, items, dirac: bool = False): if not item.get('rse'): logger(logging.WARNING, 'Skipping file %s because no rse was given' % path) continue - did_name = item.get('did_name') - if dirac and not did_name: - logger(logging.WARNING, 'Skipping file %s because no did_name was given as dirac option is true' % path) - continue if pfn: item['force_scheme'] = pfn.split(':')[0] if item.get('impl'): @@ -526,18 +532,18 @@ def _collect_and_validate_file_info(self, items, dirac: bool = False): if os.path.isdir(path) and not recursive: dname, subdirs, fnames = next(os.walk(path)) for fname in fnames: - file = self._collect_file_info(os.path.join(dname, fname), item) + file = self._collect_file_info(os.path.join(dname, fname), item, dirac=dirac) files.append(file) if not len(fnames) and not len(subdirs): logger(logging.WARNING, 'Skipping %s because it is empty.' % dname) elif not len(fnames): logger(logging.WARNING, 'Skipping %s because it has no files in it. Subdirectories are not supported.' % dname) elif dirac and recursive: - logger(logging.WARNING, '. Skipping %s because it dirac and recursive flag combined is not supported') + logger(logging.WARNING, '. Skipping %s because dirac and recursive flag combined is not supported') elif os.path.isdir(path) and recursive: files.extend(self._recursive(item)) elif os.path.isfile(path) and not recursive: - file = self._collect_file_info(path, item) + file = self._collect_file_info(path, item, dirac=dirac) files.append(file) elif os.path.isfile(path) and recursive: logger(logging.WARNING, 'Skipping %s because of --recursive flag' % path) diff --git a/tests/test_upload.py b/tests/test_upload.py index 4c450351bf..e0a9a20f43 100644 --- a/tests/test_upload.py +++ b/tests/test_upload.py @@ -405,7 +405,9 @@ def test_upload_file_ignore_availability(rse_factory, scope, upload_client, file status = upload_client.upload(item, ignore_availability=True) assert status == 0 +@skip_non_belleii def test_upload_file_with_dirac(rse, scope, upload_client, download_client, did_client, file_factory): + config_set('dirac', 'lifetime', '{"user.*": 2592400}') local_file1 = file_factory.file_generator(use_basedir=True) local_file2 = file_factory.file_generator(use_basedir=True) download_dir = file_factory.base_dir