Skip to content

Commit

Permalink
feat: method findFileByMetadata
Browse files Browse the repository at this point in the history
  • Loading branch information
gillardoLapp committed Feb 12, 2025
1 parent 2778630 commit d52d46d
Show file tree
Hide file tree
Showing 2 changed files with 346 additions and 0 deletions.
217 changes: 217 additions & 0 deletions src/DIRAC/Resources/Catalog/RucioFileCatalogClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class RucioFileCatalogClient(FileCatalogClientBase):
"resolveDataset",
"getLFNForPFN",
"getUserDirectory",
"getFileUserMetadata",
"findFilesByMetadata",

]

WRITE_METHODS = FileCatalogClientBase.WRITE_METHODS + [
Expand All @@ -78,13 +81,15 @@ class RucioFileCatalogClient(FileCatalogClientBase):
"createDataset",
"changePathOwner",
"changePathMode",
"setMetadata",
]

NO_LFN_METHODS = FileCatalogClientBase.NO_LFN_METHODS + [
"getUserDirectory",
"createUserDirectory",
"createUserMapping",
"removeUserDirectory",
"findFilesByMetadata",
]

ADMIN_METHODS = FileCatalogClientBase.ADMIN_METHODS + [
Expand Down Expand Up @@ -697,3 +702,215 @@ def getDirectorySize(self, lfns, longOutput=False, rawFiles=False):
except Exception as err:
return S_ERROR(str(err))
return S_OK(resDict)

@checkCatalogArguments
def getFileUserMetadata(self, path):
"""Get the meta data attached to a file, but also to
all its parents
"""
print('in getfileusermetadata')
path=next(iter(path))
print('in getfileusermetadata1'+path)

resDict = {"Successful": {}, "Failed": {}}
try:
print('in getfileusermetadata2')
did = self.__getDidsFromLfn(path)
meta = next(self.client.get_metadata_bulk(dids=[did], inherit=True, plugin="ALL"))
print('in getfileusermetadata3')
print(meta)
if meta["did_type"] == "FILE": # Should we also return the metadata for the directories ?
resDict["Successful"][path] = meta
else:
resDict["Failed"][path] = "Not a file"
print('in getfileusermetadata4')
except DataIdentifierNotFound:
resDict["Failed"][path] = "No such file or directory"
except Exception as err:
print(err)
return S_ERROR(str(err))
return S_OK(resDict)

@checkCatalogArguments
def getFileUserMetadataBulk(self, lfns):
"""Get the meta data attached to a list of files, but also to
all their parents
"""
resDict = {"Successful": {}, "Failed": {}}
dids = []
lfnChunks = breakListIntoChunks(lfns, 1000)
for lfnList in lfnChunks:
try:
dids = [self.__getDidsFromLfn(lfn) for lfn in lfnList]
except Exception as err:
return S_ERROR(str(err))
try:
for met in self.client.get_metadata_bulk(dids=dids, inherit=True):
lfn = met["name"]
resDict["Successful"][lfn] = met
for lfn in lfnList:
if lfn not in resDict["Successful"]:
resDict["Failed"][lfn] = "No such file or directory"
except Exception as err:
return S_ERROR(str(err))
return S_OK(resDict)

@checkCatalogArguments
def setMetadataBulk(self, pathMetadataDict):
"""Add metadata for the given paths"""
resDict = {"Successful": {}, "Failed": {}}
dids = []
for path, metadataDict in pathMetadataDict.items():
try:
did = self.__getDidsFromLfn(path)
did["meta"] = metadataDict
dids.append(did)
except Exception as err:
return S_ERROR(str(err))
try:
self.client.set_dids_metadata_bulk(dids=dids, recursive=False)
except Exception as err:
return S_ERROR(str(err))
return S_OK(resDict)

@checkCatalogArguments
def setMetadata(self, path, metadataDict):
print('in setmetadata')
"""Add metadata to the given path"""
pathMetadataDict = {}
path=next(iter(path))
pathMetadataDict[path] = metadataDict
return self.setMetadataBulk(pathMetadataDict)

@checkCatalogArguments
def removeMetadata(self, path, metadata):
"""Remove the specified metadata for the given file"""
resDict = {"Successful": {}, "Failed": {}}
try:
did = self.__getDidsFromLfn(path)
failedMeta = {}
# TODO : Implement bulk delete_metadata method in Rucio
for meta in metadata:
try:
self.client.delete_metadata(scope=did["scope"], name=did["name"], key=meta)
except DataIdentifierNotFound:
return S_ERROR(f"File {path} not found")
except Exception as err:
failedMeta[meta] = str(err)

if failedMeta:
metaExample = list(failedMeta)[0]
result = S_ERROR(f"Failed to remove {len(failedMeta)} metadata, e.g. {failedMeta[metaExample]}")
result["FailedMetadata"] = failedMeta
except Exception as err:
return S_ERROR(str(err))
return S_OK()

#@checkCatalogArguments
def findFilesByMetadata(self, metadataFilterDict, path="/", timeout=120):
"""find the dids for the given metadataFilterDict"""
print(metadataFilterDict)
ruciometadataFilterDict=self.__transform_DIRAC_filter_dict_to_Rucio_filter_dict([metadataFilterDict])
dids=[]
for scope in self.scopes:
try:
dids.extend(self.client.list_dids(scope=scope, filters=ruciometadataFilterDict,did_type="all" ))
except Exception as err:
return S_ERROR(str(err))
return S_OK(dids)

def __transform_DIRAC_operator_to_Rucio(self, DIRAC_dict):
"""
Transforms a DIRAC's metadata Query dictionary to a Rucio-compatible dictionary.
This method takes a dictionary with DIRAC operators and converts it to a
dictionary with Rucio-compatible operators based on predefined mappings.
for example :
input_dict={'key1': 'value1', 'key2': {'>': 10}, 'key3': {'=': 10}}
return = {'key1': 'value1', 'key2.gt': 10, 'key3': 10}
"""
rucio_dict = {}
operator_mapping = {
'>': '.gt',
'<': '.lt',
'>=': '.gte',
'<=': '.lte',
'=<': '.lte',
'!=': '.ne',
'=' : ''
}

for key, value in DIRAC_dict.items():
if isinstance(value, dict):
for operator, num in value.items():
if operator in operator_mapping:
mapped_operator = operator_mapping[operator]
rucio_dict[f"{key}{mapped_operator}"] = num
else:
rucio_dict[key] = value

return rucio_dict

def __transform_dict_with_in_operateur(self, DIRAC_dict_with_in_operator_list):
"""
Transforms a list of DIRAC dictionaries containing 'in' operators into a combined list of dictionaries,
expanding the 'in' operator into individual dictionaries while preserving other keys.
example
input_dict_list = [{'particle': {'in': ['proton','electron']},'site': {'in': [ "LaPalma", 'paranal']},'configuration_id': {'=': 14} } ]
return = [{'particle': 'proton', 'site': 'LaPalma', 'configuration_id': {'=': 14} }, {'particle': 'proton', 'site': 'paranal', 'configuration_id': {'=': 14} }, {'particle': 'electron', 'site': 'LaPalma', 'configuration_id': {'=': 14} }, {'particle': 'electron', 'site': 'paranal', 'configuration_id': {'=': 14} }]
"""
print (DIRAC_dict_with_in_operator_list)
if not isinstance(DIRAC_dict_with_in_operator_list, list):
raise TypeError("DIRAC_dict_with_in_operator_list must be a list of dictionaries")

combined_dict_list = [] # Final list of transformed dictionaries
break_reached = False # Boolean to track if 'in' was found and processed in any dictionary

# Process each dictionary in the input list
for DIRAC_dict_with_in_operator in DIRAC_dict_with_in_operator_list:
if not isinstance(DIRAC_dict_with_in_operator, dict):
raise TypeError("Each element in DIRAC_dict_with_in_operator_list must be a dictionary")

in_key = None
in_values = []

# Extract the key with 'in' operator and the list of values
for key, value in DIRAC_dict_with_in_operator.items():
if isinstance(value, dict) and 'in' in value:
in_key = key
in_values = value['in']
break_reached = True # 'in' operator found
break

# If an 'in' key exists, expand the dictionary for each value
if in_key:
for val in in_values:
# Copy the original dictionary and replace the 'in' key
new_dict = DIRAC_dict_with_in_operator.copy()
new_dict[in_key] = val # Replace the 'in' key with the current value
combined_dict_list.append(new_dict)
else:
# If no 'in' key, simply add the input dictionary as-is
combined_dict_list.append(DIRAC_dict_with_in_operator)

return combined_dict_list, break_reached

def __transform_DIRAC_filter_dict_to_Rucio_filter_dict(self, DIRAC_filter_dict_list):
"""
Transforms a list of DIRAC filter dictionaries into a list of Rucio filter dictionaries.
This method takes a list of filter dictionaries used in DIRAC and converts them into a format
that is compatible with Rucio. It handles the transformation of operators and expands filters
that use the 'in' operator.
example:
input_dict_list = [{'particle': {'in': ['proton','electron']},'site': {'in': [ "LaPalma", 'paranal']},'configuration_id': {'=': 14} } ]
return = [{'particle': 'proton', 'site': 'LaPalma', 'configuration_id': 14}, {'particle': 'proton', 'site': 'paranal', 'configuration_id': 14}, {'particle': 'electron', 'site': 'LaPalma', 'configuration_id': 14}, {'particle': 'electron', 'site': 'paranal', 'configuration_id': 14}]
"""


break_detected=True
DIRAC_expanded_filters=DIRAC_filter_dict_list
while break_detected:
DIRAC_expanded_filters, break_detected = self.__transform_dict_with_in_operateur(DIRAC_expanded_filters)
Rucio_filters=[]
for filter in DIRAC_expanded_filters:
Rucio_filters.append(self.__transform_DIRAC_operator_to_Rucio(filter))
return Rucio_filters
129 changes: 129 additions & 0 deletions src/DIRAC/Resources/Catalog/test/Test_RucioFileCatalogClient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import unittest
from unittest.mock import MagicMock, patch
from DIRAC.Resources.Catalog.RucioFileCatalogClient import RucioFileCatalogClient

class TestRucioFileCatalogClient(unittest.TestCase):

def setUp(self):
self.patcher = patch.object(RucioFileCatalogClient, 'client', new_callable=MagicMock)
self.client = RucioFileCatalogClient()
self.client.scopes = ['test_scope']
self.patcher.start()

def tearDown(self):
self.patcher.stop()

def test_transform_DIRAC_operator_to_Rucio(self):
DIRAC_dict = {
'key1': 'value1',
'key2': {'>': 10},
'key3': {'=': 10}
}
expected_output = {
'key1': 'value1',
'key2.gt': 10,
'key3': 10
}
result = self.client._RucioFileCatalogClient__transform_DIRAC_operator_to_Rucio(DIRAC_dict)
self.assertEqual(result, expected_output)

def test_transform_dict_with_in_operateur_2steps(self):
DIRAC_dict_with_in_operator_list = [
{'particle': {'in': ['proton', 'electron']}, 'site': {'in': ['LaPalma', 'paranal']}, 'configuration_id': {'=': 14}}
]
expected_intermediate_output = [
{'particle': 'proton', 'site': {'in': ['LaPalma', 'paranal']}, 'configuration_id': {'=': 14}},
{'particle': 'electron', 'site': {'in': ['LaPalma', 'paranal']}, 'configuration_id': {'=': 14}}
]
expected_final_output = [
{'particle': 'proton', 'site': 'LaPalma', 'configuration_id': {'=': 14}},
{'particle': 'proton', 'site': 'paranal', 'configuration_id': {'=': 14}},
{'particle': 'electron', 'site': 'LaPalma', 'configuration_id': {'=': 14}},
{'particle': 'electron', 'site': 'paranal', 'configuration_id': {'=': 14}}
]
result_intermediate, _ = self.client._RucioFileCatalogClient__transform_dict_with_in_operateur(DIRAC_dict_with_in_operator_list)
self.assertEqual(result_intermediate, expected_intermediate_output)
result_final, _ = self.client._RucioFileCatalogClient__transform_dict_with_in_operateur( result_intermediate)
self.assertEqual(result_final, expected_final_output)

def test_transform_DIRAC_operator_to_Rucio_simple_key_value(self):
input_dict = {'key1': 'value1', 'key2': 'value2'}
expected_output = {'key1': 'value1', 'key2': 'value2'}
result = self.client._RucioFileCatalogClient__transform_DIRAC_operator_to_Rucio(input_dict)
self.assertEqual(result,expected_output)

def test_transform_DIRAC_operator_to_Rucio_nested_dict_with_operators_gl(self):
input_dict ={'start' : {'>=': 10}, 'end' : {'>': 5}, 'pointingZ' : {'>=': 0.1} , 'organization' : 'ViaCorp' , 'data_levels' : 'DL3'}
expected_output = {'start.gte': 10, 'end.gt': 5, 'pointingZ.gte' : 0.1 ,'organization': 'ViaCorp', 'data_levels' :'DL3'}
result = self.client._RucioFileCatalogClient__transform_DIRAC_operator_to_Rucio(input_dict)
self.assertEqual(result,expected_output)

def test_transform_DIRAC_operator_to_Rucio_nested_dict_with_operators_equals(self):
input_dict = {'start' : {'=': 10}, 'pointingZ' : {'=': 0.1} , 'organization' : 'ViaCorp' , 'data_levels' : 'DL3'}
expected_output = {'start': 10, 'pointingZ' : 0.1 ,'organization': 'ViaCorp', 'data_levels' :'DL3'}
result = self.client._RucioFileCatalogClient__transform_DIRAC_operator_to_Rucio(input_dict)
assert result == expected_output

def test_transform_DIRAC_operator_to_Rucio_mixed_dict(self):
input_dict = {'key1': 'value1', 'key2': {'>': 10}, 'key3': {'=': 10}}
expected_output = {'key1': 'value1', 'key2.gt': 10, 'key3': 10}
result = self.client._RucioFileCatalogClient__transform_DIRAC_operator_to_Rucio(input_dict)
assert result == expected_output

def test_transform_DIRAC_operator_to_Rucio_in_operator(self):
input_dict = [{'analysis_prog': {'in': ['ctapipe-merge', 'ctapipe-process', 'ctapipe-apply-models']}, 'key1': 'value1','key3': {'=': 10},'key4': {'<': 5} }]
expected_intermediate = [{'key1': 'value1', 'key3': 10, 'key4.lt': 5, 'analysis_prog': 'ctapipe-merge'}, {'key1': 'value1', 'key3': 10, 'key4.lt': 5, 'analysis_prog': 'ctapipe-process'}, {'key1': 'value1', 'key3': 10, 'key4.lt': 5, 'analysis_prog': 'ctapipe-apply-models'}]
result_interm = self.client._RucioFileCatalogClient__transform_DIRAC_filter_dict_to_Rucio_filter_dict(input_dict)
assert result_interm == expected_intermediate


def test_transform_DIRAC_operator_to_Rucio_2timesin_operator(self):
input_dict = [{ 'particle': {'in': ['proton','electron']},'site': {'in': [ "LaPalma", 'paranal'] } }]
expected = [{'particle': 'proton', 'site': 'LaPalma'}, {'particle': 'proton', 'site': 'paranal'}, {'particle': 'electron', 'site': 'LaPalma'}, {'particle': 'electron', 'site': 'paranal'}]
result = self.client._RucioFileCatalogClient__transform_DIRAC_filter_dict_to_Rucio_filter_dict(input_dict)
assert result == expected

def test_2timesin_mix_operator(self):
input_dict = [{
'particle': {'in': ['proton','electron']},'site': {'in': [ "LaPalma", 'paranal']},'configuration_id': {'=': 14} } ]
expected = [{'particle': 'proton', 'site': 'LaPalma', 'configuration_id': 14}, {'particle': 'proton', 'site': 'paranal', 'configuration_id': 14}, {'particle': 'electron', 'site': 'LaPalma', 'configuration_id': 14}, {'particle': 'electron', 'site': 'paranal', 'configuration_id': 14}]
result = self.client._RucioFileCatalogClient__transform_DIRAC_filter_dict_to_Rucio_filter_dict(input_dict)
assert result == expected

input_dict = [{
'particle': {'in': ['proton','electron']},'configuration_id': {'=': 14},'site': {'in': [ "LaPalma", 'paranal']} } ]
expected = [{'particle': 'proton', 'configuration_id': 14, 'site': 'LaPalma'}, {'particle': 'proton', 'configuration_id': 14, 'site': 'paranal'}, {'particle': 'electron', 'configuration_id': 14, 'site': 'LaPalma'}, {'particle': 'electron', 'configuration_id': 14, 'site': 'paranal'}]
result = self.client._RucioFileCatalogClient__transform_DIRAC_filter_dict_to_Rucio_filter_dict(input_dict)
assert result == expected



def test_transform_DIRAC_filter_dict_to_Rucio_filter_dict(self):
DIRAC_filter_dict_list = [
{'particle': {'in': ['proton', 'electron']}, 'configuration_id': {'=': 14}, 'site': {'in': ['LaPalma', 'paranal']}}
]
expected_output = [
{'particle': 'proton', 'configuration_id': 14, 'site': 'LaPalma'},
{'particle': 'proton', 'configuration_id': 14, 'site': 'paranal'},
{'particle': 'electron', 'configuration_id': 14, 'site': 'LaPalma'},
{'particle': 'electron', 'configuration_id': 14, 'site': 'paranal'}
]
result = self.client._RucioFileCatalogClient__transform_DIRAC_filter_dict_to_Rucio_filter_dict(DIRAC_filter_dict_list)
self.assertEqual(result, expected_output)

def test_findFilesByMetadata(self):
self.client.client.list_dids.return_value = ['did1', 'did2']
metadataFilterDict = {'key1': 'value1'}
result = self.client.findFilesByMetadata(metadataFilterDict)
self.assertTrue(result['OK'])
self.assertEqual(result['Value'], ['did1', 'did2'])

def test_findFilesByMetadata_with_error(self):
self.client.client.list_dids.side_effect = Exception('Test error')
metadataFilterDict = {'key1': 'value1'}
result = self.client.findFilesByMetadata(metadataFilterDict)
self.assertFalse(result['OK'])
self.assertIn('Test error', result['Message'])

if __name__ == '__main__':
unittest.main()

0 comments on commit d52d46d

Please sign in to comment.