Skip to content

Commit

Permalink
upload: add option of heirachial upload. rucio#6457
Browse files Browse the repository at this point in the history
  • Loading branch information
panta-123 committed Jan 24, 2024
1 parent 436ecc9 commit afe9f40
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 61 deletions.
13 changes: 12 additions & 1 deletion bin/rucio
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,17 @@ def upload(args):
return FAILURE
args.lifetime = (expiration_date - datetime.utcnow()).total_seconds()

if args.dirac and args.recursive:
logger.error("--recursive and --dirac cannot be specified at the same time.\
As dirac will register the heirachy to rucio.")
return FAILURE

dirac = False
if args.dirac:
if args.lifetime:
logger.warning("Ignoring --lifetime as --dirac is set where the lifetime is taken from cfg file for dataset")
dirac = True

dsscope = None
dsname = None
for arg in args.args:
Expand Down Expand Up @@ -979,7 +990,7 @@ def upload(args):
from rucio.client.uploadclient import UploadClient
upload_client = UploadClient(client, logger=logger)
summary_file_path = 'rucio_upload.json' if args.summary else None
upload_client.upload(items, summary_file_path)
upload_client.upload(items, summary_file_path, dirac=dirac)
return SUCCESS


Expand Down
164 changes: 105 additions & 59 deletions lib/rucio/client/uploadclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
ResourceTemporaryUnavailable, ServiceUnavailable, InputValidationError, RSEChecksumUnavailable,
ScopeNotFound)
from rucio.common.utils import (adler32, detect_client_location, execute, generate_uuid, make_valid_did, md5, send_trace,
retry, GLOBALLY_SUPPORTED_CHECKSUMS)
retry, GLOBALLY_SUPPORTED_CHECKSUMS, extract_scope)
from rucio.rse import rsemanager as rsemgr


Expand Down Expand Up @@ -67,29 +67,30 @@ def __init__(self, _client=None, logger=None, tracing=True):
self.trace['eventType'] = 'upload'
self.trace['eventVersion'] = version.RUCIO_VERSION[0]

def upload(self, items, summary_file_path=None, traces_copy_out=None, ignore_availability=False, activity=None):
def upload(self, items, summary_file_path=None, traces_copy_out=None, ignore_availability=False, activity=None, dirac: bool = False):
"""
:param items: List of dictionaries. Each dictionary describing a file to upload. Keys:
path - path of the file that will be uploaded
rse - rse expression/name (e.g. 'CERN-PROD_DATADISK') where to upload the file
did_scope - Optional: custom did scope (Default: user.<account>)
did_name - Optional: custom did name (Default: name of the file)
dataset_scope - Optional: custom dataset scope
dataset_name - Optional: custom dataset name
dataset_meta - Optional: custom metadata for dataset
did_name - Optional: custom did name (Default: name of the file) (required if dirac=True)
dataset_scope - Optional: custom dataset scope (ignored if dirac=True)
dataset_name - Optional: custom dataset name (ignored if dirac=True)
dataset_meta - Optional: custom metadata for dataset (ignored if dirac=True)
impl - Optional: name of the protocol implementation to be used to upload this item.
force_scheme - Optional: force a specific scheme (if PFN upload this will be overwritten) (Default: None)
pfn - Optional: use a given PFN (this sets no_register to True, and no_register becomes mandatory)
no_register - Optional: if True, the file will not be registered in the rucio catalogue
register_after_upload - Optional: if True, the file will be registered after successful upload
lifetime - Optional: the lifetime of the file after it was uploaded
lifetime - Optional: the lifetime of the file after it was uploaded. (ignored if dirac=True as lifetime is created from cfg dirac section.)
transfer_timeout - Optional: time after the upload will be aborted
guid - Optional: guid of the file
recursive - Optional: if set, parses the folder structure recursively into collections
:param summary_file_path: Optional: a path where a summary in form of a json file will be stored
:param traces_copy_out: reference to an external list, where the traces should be uploaded
:param ignore_availability: ignore the availability of a RSE
:param activity: the activity set to the rule if no dataset is specified
:param dirac: boolean to trigger register using dirac add_files method for heirachial namespace. (Default: False)
:returns: 0 on success
Expand Down Expand Up @@ -125,20 +126,27 @@ def _pick_random_rse(rse_expression):
if not ignore_availability and rse_settings['availability_write'] != 1:
raise RSEWriteBlocked('%s is not available for writing. No actions have been taken' % rse)

dataset_scope = file.get('dataset_scope')
dataset_name = file.get('dataset_name')
file['rse'] = rse
if dataset_scope and dataset_name:
dataset_did_str = ('%s:%s' % (dataset_scope, dataset_name))
file['dataset_did_str'] = dataset_did_str
registered_dataset_dids.add(dataset_did_str)

registered_file_dids.add('%s:%s' % (file['did_scope'], file['did_name']))
wrong_dids = registered_file_dids.intersection(registered_dataset_dids)
if len(wrong_dids):
raise InputValidationError('DIDs used to address both files and datasets: %s' % str(wrong_dids))
logger(logging.DEBUG, 'Input validation done.')

if not dirac:
dataset_scope = file.get('dataset_scope')
dataset_name = file.get('dataset_name')
if dataset_scope and dataset_name:
dataset_did_str = ('%s:%s' % (dataset_scope, dataset_name))
file['dataset_did_str'] = dataset_did_str
registered_dataset_dids.add(dataset_did_str)
wrong_dids = registered_file_dids.intersection(registered_dataset_dids)
if len(wrong_dids):
raise InputValidationError('DIDs used to address both files and datasets: %s' % str(wrong_dids))
logger(logging.DEBUG, 'Input validation done.')

if dirac:
scope, _ = extract_scope(file['did_name'])
if file['did_scope']:
if file['did_scope'] != scope:
logger(logging.WARNING, 'skipping scope %s provided for the file %s and using the proper scope %s from extract_scope algorith.\
' % (str( file['did_scope']), str(file['did_name']), str(scope)))

# clear this set again to ensure that we only try to register datasets once
registered_dataset_dids = set()
num_succeeded = 0
Expand Down Expand Up @@ -197,7 +205,7 @@ def _pick_random_rse(rse_expression):
# impl = self.preferred_impl(rse_settings, domain)

if not no_register and not register_after_upload:
self._register_file(file, registered_dataset_dids, ignore_availability=ignore_availability, activity=activity)
self._register_file(file, registered_dataset_dids, ignore_availability=ignore_availability, activity=activity, dirac=dirac)

# if register_after_upload, file should be overwritten if it is not registered
# otherwise if file already exists on RSE we're done
Expand Down Expand Up @@ -287,22 +295,22 @@ def _pick_random_rse(rse_expression):

if not no_register:
if register_after_upload:
self._register_file(file, registered_dataset_dids, ignore_availability=ignore_availability, activity=activity)
self._register_file(file, registered_dataset_dids, ignore_availability=ignore_availability, activity=activity, dirac=dirac)
else:
replica_for_api = self._convert_file_for_api(file)
try:
self.client.update_replicas_states(rse, files=[replica_for_api])
except Exception as error:
logger(logging.ERROR, 'Failed to update replica state for file {}'.format(basename))
logger(logging.DEBUG, 'Details: {}'.format(str(error)))

# add file to dataset if needed
if dataset_did_str and not no_register:
try:
self.client.attach_dids(file['dataset_scope'], file['dataset_name'], [file_did])
except Exception as error:
logger(logging.WARNING, 'Failed to attach file to the dataset')
logger(logging.DEBUG, 'Attaching to dataset {}'.format(str(error)))
if not dirac:
# add file to dataset if needed
if dataset_did_str and not no_register:
try:
self.client.attach_dids(file['dataset_scope'], file['dataset_name'], [file_did])
except Exception as error:
logger(logging.WARNING, 'Failed to attach file to the dataset')
logger(logging.DEBUG, 'Attaching to dataset {}'.format(str(error)))
else:
trace['clientState'] = 'FAILED'
trace['stateReason'] = state_reason
Expand Down Expand Up @@ -336,7 +344,7 @@ def _pick_random_rse(rse_expression):
raise NotAllFilesUploaded()
return 0

def _register_file(self, file, registered_dataset_dids, ignore_availability=False, activity=None):
def _register_file(self, file, registered_dataset_dids, ignore_availability=False, activity=None, dirac:bool = False):
"""
Registers the given file in Rucio. Creates a dataset if
needed. Registers the file DID and creates the replication
Expand All @@ -363,34 +371,37 @@ def _register_file(self, file, registered_dataset_dids, ignore_availability=Fals
logger(logging.WARNING, 'Scope {} not found for the account {}.'.format(file['did_scope'], self.client.account))

rse = file['rse']
dataset_did_str = file.get('dataset_did_str')
# register a dataset if we need to
if dataset_did_str and dataset_did_str not in registered_dataset_dids:
registered_dataset_dids.add(dataset_did_str)
try:
logger(logging.DEBUG, 'Trying to create dataset: %s' % dataset_did_str)
self.client.add_dataset(scope=file['dataset_scope'],
name=file['dataset_name'],
meta=file.get('dataset_meta'),
rules=[{'account': self.client.account,
'copies': 1,
'rse_expression': rse,
'grouping': 'DATASET',
'lifetime': file.get('lifetime')}])
logger(logging.INFO, 'Successfully created dataset %s' % dataset_did_str)
except DataIdentifierAlreadyExists:
logger(logging.INFO, 'Dataset %s already exists - no rule will be created' % dataset_did_str)

if file.get('lifetime') is not None:
raise InputValidationError('Dataset %s exists and lifetime %s given. Prohibited to modify parent dataset lifetime.' % (dataset_did_str,
file.get('lifetime')))
else:
logger(logging.DEBUG, 'Skipping dataset registration')
if not dirac:
dataset_did_str = file.get('dataset_did_str')
# register a dataset if we need to
if dataset_did_str and dataset_did_str not in registered_dataset_dids:
registered_dataset_dids.add(dataset_did_str)
try:
logger(logging.DEBUG, 'Trying to create dataset: %s' % dataset_did_str)
self.client.add_dataset(scope=file['dataset_scope'],
name=file['dataset_name'],
meta=file.get('dataset_meta'),
rules=[{'account': self.client.account,
'copies': 1,
'rse_expression': rse,
'grouping': 'DATASET',
'lifetime': file.get('lifetime')}])
logger(logging.INFO, 'Successfully created dataset %s' % dataset_did_str)
except DataIdentifierAlreadyExists:
logger(logging.INFO, 'Dataset %s already exists - no rule will be created' % dataset_did_str)

if file.get('lifetime') is not None:
raise InputValidationError('Dataset %s exists and lifetime %s given. Prohibited to modify parent dataset lifetime.' % (dataset_did_str,
file.get('lifetime')))
else:
logger(logging.DEBUG, 'Skipping dataset registration')

file_scope = file['did_scope']
file_name = file['did_name']
file_did = {'scope': file_scope, 'name': file_name}
replica_for_api = self._convert_file_for_api(file)
if dirac:
replica_for_api = self._convert_file_for_dirac(file)
try:
# if the remote checksum is different this did must not be used
meta = self.client.get_metadata(file_scope, file_name)
Expand All @@ -409,12 +420,16 @@ def _register_file(self, file, registered_dataset_dids, ignore_availability=Fals
logger(logging.INFO, 'Successfully added replica in Rucio catalogue at %s' % rse)
except DataIdentifierNotFound:
logger(logging.DEBUG, 'File DID does not exist')
self.client.add_replicas(rse=rse, files=[replica_for_api])
if not dirac:
self.client.add_replicas(rse=rse, files=[replica_for_api])
else:
self.client.add_files(replica_for_api, ignore_availability=ignore_availability)
logger(logging.INFO, 'Successfully added replica in Rucio catalogue at %s' % rse)
if not dataset_did_str:
# only need to add rules for files if no dataset is given
self.client.add_replication_rule([file_did], copies=1, rse_expression=rse, lifetime=file.get('lifetime'), ignore_availability=ignore_availability, activity=activity)
logger(logging.INFO, 'Successfully added replication rule at %s' % rse)
if not dirac:
if not dataset_did_str:
# only need to add rules for files if no dataset is given
self.client.add_replication_rule([file_did], copies=1, rse_expression=rse, lifetime=file.get('lifetime'), ignore_availability=ignore_availability, activity=activity)
logger(logging.INFO, 'Successfully added replication rule at %s' % rse)

def _get_file_guid(self, file):
"""
Expand Down Expand Up @@ -470,7 +485,7 @@ def _collect_file_info(self, filepath, item):

return new_item

def _collect_and_validate_file_info(self, items):
def _collect_and_validate_file_info(self, items, dirac: bool = False):
"""
Checks if there are any inconsistencies within the given input
options and stores the output of _collect_file_info for every file
Expand All @@ -494,6 +509,10 @@ def _collect_and_validate_file_info(self, items):
if not item.get('rse'):
logger(logging.WARNING, 'Skipping file %s because no rse was given' % path)
continue
did_name = item.get('did_name')
if dirac and not did_name:
logger(logging.WARNING, 'Skipping file %s because no did_name was given as dirac option is true' % path)
continue
if pfn:
item['force_scheme'] = pfn.split(':')[0]
if item.get('impl'):
Expand All @@ -513,6 +532,8 @@ def _collect_and_validate_file_info(self, items):
logger(logging.WARNING, 'Skipping %s because it is empty.' % dname)
elif not len(fnames):
logger(logging.WARNING, 'Skipping %s because it has no files in it. Subdirectories are not supported.' % dname)
elif dirac and recursive:
logger(logging.WARNING, '. Skipping %s because it dirac and recursive flag combined is not supported')
elif os.path.isdir(path) and recursive:
files.extend(self._recursive(item))
elif os.path.isfile(path) and not recursive:
Expand Down Expand Up @@ -550,6 +571,31 @@ def _convert_file_for_api(self, file):
if pfn:
replica['pfn'] = pfn
return replica

def _convert_file_for_dirac(self, file: dict) -> dict:
"""
Creates a new dictionary that contains only the values
that are needed for the upload with the correct keys for dirac client.
(This function is meant to be used as class internal only)
:param file: dictionary describing a file to upload
:returns: dictionary containing not more then the needed values for the upload
"""
replica = {}
replica['lfn'] = file['did_name']
replica['bytes'] = file['bytes']
replica['adler32'] = file['adler32']
replica['rse'] = file['rse']
pfn = file.get('pfn')
replica['state'] = file.get('state')
if pfn:
replica['pfn'] = pfn
guid = file.get('guid')
if guid:
replica['guid'] = guid

return replica

def _upload_item(self, rse_settings, rse_attributes, lfn, source_dir=None, domain='wan', impl=None, force_pfn=None, force_scheme=None, transfer_timeout=None, delete_existing=False, sign_service=None):
"""
Expand Down
Loading

0 comments on commit afe9f40

Please sign in to comment.