Skip to content

Commit

Permalink
upload: make necessary changes. rucio#6475
Browse files Browse the repository at this point in the history
  • Loading branch information
panta-123 committed Jan 28, 2024
1 parent d87f2a4 commit d84854b
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 30 deletions.
19 changes: 9 additions & 10 deletions bin/rucio
Original file line number Diff line number Diff line change
Expand Up @@ -924,17 +924,10 @@ def upload(args):
return FAILURE
args.lifetime = (expiration_date - datetime.utcnow()).total_seconds()

if args.dirac and args.recursive:
logger.error("--recursive and --dirac cannot be specified at the same time.\
As dirac will register the heirachy to rucio.")
return FAILURE

dirac = False
if args.dirac:
if args.lifetime:
logger.warning("Ignoring --lifetime as --dirac is set where the lifetime is taken from cfg file for dataset")
dirac = True


dsscope = None
dsname = None
for arg in args.args:
Expand Down Expand Up @@ -971,7 +964,12 @@ def upload(args):

if len(items) < 1:
raise InputValidationError('No files could be extracted from the given arguments')

if args.dirac and args.recursive:
logger.error("--recursive and --dirac cannot be specified at the same time.")
logger.error("As dirac will register the heirachy to rucio.")
if args.dirac and not (args.name or dsname):
logger.error("Either --name or dataset name must be specified for dirac")
raise InputValidationError('Invalid input argument composition')
if len(items) > 1 and args.guid:
logger.error("A single GUID was specified on the command line, but there are multiple files to upload.")
logger.error("If GUID auto-detection is not used, only one file may be uploaded at a time")
Expand All @@ -990,7 +988,7 @@ def upload(args):
from rucio.client.uploadclient import UploadClient
upload_client = UploadClient(client, logger=logger)
summary_file_path = 'rucio_upload.json' if args.summary else None
upload_client.upload(items, summary_file_path, dirac=dirac)
upload_client.upload(items, summary_file_path, dirac=args.dirac)
return SUCCESS


Expand Down Expand Up @@ -2299,6 +2297,7 @@ You can filter by key/value, e.g.::
upload_parser.add_argument('--transfer-timeout', dest='transfer_timeout', type=float, action='store', default=config_get_float('upload', 'transfer_timeout', False, 360), help='Transfer timeout (in seconds).')
upload_parser.add_argument(dest='args', action='store', nargs='+', help='files and datasets.')
upload_parser.add_argument('--recursive', dest='recursive', action='store_true', default=False, help='Convert recursively the folder structure into collections')
upload_parser.add_argument('--dirac', dest='dirac', action='store_true', default=False, help='Does the registriation to rucio in herirachial namespace as dirac.')

# The download and get subparser
get_parser = subparsers.add_parser('get', help='Download method (synonym for download)')
Expand Down
46 changes: 26 additions & 20 deletions lib/rucio/client/uploadclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ def _pick_random_rse(rse_expression):
raise RSEWriteBlocked('%s is not available for writing. No actions have been taken' % rse)

file['rse'] = rse
registered_file_dids.add('%s:%s' % (file['did_scope'], file['did_name']))
dataset_scope = file.get('dataset_scope')
dataset_name = file.get('dataset_name')
if not dirac:
dataset_scope = file.get('dataset_scope')
dataset_name = file.get('dataset_name')
registered_file_dids.add('%s:%s' % (file['did_scope'], file['did_name']))
if dataset_scope and dataset_name:
dataset_did_str = ('%s:%s' % (dataset_scope, dataset_name))
file['dataset_did_str'] = dataset_did_str
Expand All @@ -141,10 +141,12 @@ def _pick_random_rse(rse_expression):
logger(logging.DEBUG, 'Input validation done.')

if dirac:
if not dataset_name and not file['did_name']:
raise InputValidationError('Either dataset_name or did_name must be specified for dirac upload')
scope, _ = extract_scope(file['did_name'])
if file['did_scope']:
if file['did_scope'] != scope:
logger(logging.WARNING, 'skipping scope %s provided for the file %s and using the proper scope %s from extract_scope algorith.\
logger(logging.WARNING, 'replacing scope %s provided for the file %s and using the proper scope %s from extract_scope algorithm.\
' % (str( file['did_scope']), str(file['did_name']), str(scope)))

# clear this set again to ensure that we only try to register datasets once
Expand All @@ -166,7 +168,6 @@ def _pick_random_rse(rse_expression):
# appending trace to list reference, if the reference exists
if traces_copy_out is not None:
traces_copy_out.append(trace)

rse = file['rse']
trace['scope'] = file['did_scope']
trace['datasetScope'] = file.get('dataset_scope', '')
Expand Down Expand Up @@ -371,8 +372,8 @@ def _register_file(self, file, registered_dataset_dids, ignore_availability=Fals
logger(logging.WARNING, 'Scope {} not found for the account {}.'.format(file['did_scope'], self.client.account))

rse = file['rse']
dataset_did_str = file.get('dataset_did_str')
if not dirac:
dataset_did_str = file.get('dataset_did_str')
# register a dataset if we need to
if dataset_did_str and dataset_did_str not in registered_dataset_dids:
registered_dataset_dids.add(dataset_did_str)
Expand All @@ -395,7 +396,6 @@ def _register_file(self, file, registered_dataset_dids, ignore_availability=Fals
file.get('lifetime')))
else:
logger(logging.DEBUG, 'Skipping dataset registration')

file_scope = file['did_scope']
file_name = file['did_name']
file_did = {'scope': file_scope, 'name': file_name}
Expand Down Expand Up @@ -423,7 +423,7 @@ def _register_file(self, file, registered_dataset_dids, ignore_availability=Fals
if not dirac:
self.client.add_replicas(rse=rse, files=[replica_for_api])
else:
self.client.add_files(replica_for_api, ignore_availability=ignore_availability)
self.client.add_files([replica_for_api], ignore_availability=ignore_availability)
logger(logging.INFO, 'Successfully added replica in Rucio catalogue at %s' % rse)
if not dirac:
if not dataset_did_str:
Expand Down Expand Up @@ -457,14 +457,15 @@ def _get_file_guid(self, file):
guid = generate_uuid()
return guid

def _collect_file_info(self, filepath, item):
def _collect_file_info(self, filepath, item, dirac: bool = False):
"""
Collects infos (e.g. size, checksums, etc.) about the file and
returns them as a dictionary
(This function is meant to be used as class internal only)
:param filepath: path where the file is stored
:param item: input options for the given file
:param dirac: True if the upload is dirac. (Default: False)
:returns: a dictionary containing all collected info and the input options
"""
Expand All @@ -478,10 +479,18 @@ def _collect_file_info(self, filepath, item):
new_item['md5'] = md5(filepath)
new_item['meta'] = {'guid': self._get_file_guid(new_item)}
new_item['state'] = 'C'
if not new_item.get('did_scope'):
new_item['did_scope'] = self.default_file_scope
if not new_item.get('did_name'):
new_item['did_name'] = new_item['basename']
if dirac:
if not new_item.get('did_name'):
if new_item.get('dataset_name'):
new_item['did_name'] = new_item['dataset_name'] + "/" + new_item['basename']
if not new_item.get('did_scope'):
scope, _ = extract_scope(new_item['did_name'])
new_item['did_scope'] = scope
else:
if not new_item.get('did_scope'):
new_item['did_scope'] = self.default_file_scope
if not new_item.get('did_name'):
new_item['did_name'] = new_item['basename']

return new_item

Expand All @@ -492,6 +501,7 @@ def _collect_and_validate_file_info(self, items, dirac: bool = False):
(This function is meant to be used as class internal only)
:param filepath: list of dictionaries with all input files and options
:param dirac: True if the upload is dirac. (Default: False)
:returns: a list of dictionaries containing all descriptions of the files to upload
Expand All @@ -509,10 +519,6 @@ def _collect_and_validate_file_info(self, items, dirac: bool = False):
if not item.get('rse'):
logger(logging.WARNING, 'Skipping file %s because no rse was given' % path)
continue
did_name = item.get('did_name')
if dirac and not did_name:
logger(logging.WARNING, 'Skipping file %s because no did_name was given as dirac option is true' % path)
continue
if pfn:
item['force_scheme'] = pfn.split(':')[0]
if item.get('impl'):
Expand All @@ -526,18 +532,18 @@ def _collect_and_validate_file_info(self, items, dirac: bool = False):
if os.path.isdir(path) and not recursive:
dname, subdirs, fnames = next(os.walk(path))
for fname in fnames:
file = self._collect_file_info(os.path.join(dname, fname), item)
file = self._collect_file_info(os.path.join(dname, fname), item, dirac=dirac)
files.append(file)
if not len(fnames) and not len(subdirs):
logger(logging.WARNING, 'Skipping %s because it is empty.' % dname)
elif not len(fnames):
logger(logging.WARNING, 'Skipping %s because it has no files in it. Subdirectories are not supported.' % dname)
elif dirac and recursive:
logger(logging.WARNING, '. Skipping %s because it dirac and recursive flag combined is not supported')
logger(logging.WARNING, '. Skipping %s because dirac and recursive flag combined is not supported')
elif os.path.isdir(path) and recursive:
files.extend(self._recursive(item))
elif os.path.isfile(path) and not recursive:
file = self._collect_file_info(path, item)
file = self._collect_file_info(path, item, dirac=dirac)
files.append(file)
elif os.path.isfile(path) and recursive:
logger(logging.WARNING, 'Skipping %s because of --recursive flag' % path)
Expand Down
2 changes: 2 additions & 0 deletions tests/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,9 @@ def test_upload_file_ignore_availability(rse_factory, scope, upload_client, file
status = upload_client.upload(item, ignore_availability=True)
assert status == 0

@skip_non_belleii
def test_upload_file_with_dirac(rse, scope, upload_client, download_client, did_client, file_factory):
config_set('dirac', 'lifetime', '{"user.*": 2592400}')
local_file1 = file_factory.file_generator(use_basedir=True)
local_file2 = file_factory.file_generator(use_basedir=True)
download_dir = file_factory.base_dir
Expand Down

0 comments on commit d84854b

Please sign in to comment.