From 9bf328185475c5d04435704722d545c3d8d077c1 Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Tue, 4 Feb 2025 09:39:39 +0100 Subject: [PATCH 1/2] fix: make the setting of inputDataBulk extendable --- .../Client/WorkflowTasks.py | 44 +++++++++++-------- .../Client/test/Test_Client_WorkflowTasks.py | 26 +++++++++++ 2 files changed, 51 insertions(+), 19 deletions(-) diff --git a/src/DIRAC/TransformationSystem/Client/WorkflowTasks.py b/src/DIRAC/TransformationSystem/Client/WorkflowTasks.py index 7517e24810a..a3878e55ffa 100644 --- a/src/DIRAC/TransformationSystem/Client/WorkflowTasks.py +++ b/src/DIRAC/TransformationSystem/Client/WorkflowTasks.py @@ -187,9 +187,8 @@ def _prepareTasksBulk(self, transBody, taskDict, owner, ownerGroup, ownerDN): if not sites: self._logError("Could not get a list a sites", transID=transID, method=method) return S_ERROR(ETSUKN, "Can not evaluate destination site") - else: - self._logVerbose("Setting Site: ", str(sites), transID=transID, method=method) - seqDict["Site"] = sites + self._logVerbose("Setting Site: ", str(sites), transID=transID, method=method) + seqDict["Site"] = sites seqDict["JobName"] = self._transTaskName(transID, taskID) seqDict["JOB_ID"] = str(taskID).zfill(8) @@ -200,22 +199,7 @@ def _prepareTasksBulk(self, transBody, taskDict, owner, ownerGroup, ownerDN): method=method, ) - # Handle Input Data - inputData = paramsDict.get("InputData") - if inputData: - if isinstance(inputData, str): - inputData = inputData.replace(" ", "").split(";") - self._logVerbose(f"Setting input data to {inputData}", transID=transID, method=method) - seqDict["InputData"] = inputData - elif paramSeqDict.get("InputData") is not None: - self._logError("Invalid mixture of jobs with and without input data") - return S_ERROR(ETSDATA, "Invalid mixture of jobs with and without input data") - - for paramName, paramValue in paramsDict.items(): - if paramName not in ("InputData", "Site", "TargetSE"): - if paramValue: - self._logVerbose(f"Setting {paramName} to {paramValue}", transID=transID, method=method) - seqDict[paramName] = paramValue + inputData = self._handleInputsBulk(seqDict, paramsDict, transID) outputParameterList = [] if self.outputDataModule: @@ -412,6 +396,28 @@ def _handleInputs(self, oJob, paramsDict): if not res["OK"]: self._logError(f"Could not set the inputs: {res['Message']}", transID=transID, method="_handleInputs") + def _handleInputsBulk(self, seqDict, paramsDict, transID): + """set job inputs (+ metadata)""" + method = "_handleInputsBulk" + if seqDict: + self._logVerbose(f"Setting job input data to {seqDict}", transID=transID, method=method) + + # Handle Input Data + inputData = paramsDict.get("InputData") + if inputData: + if isinstance(inputData, str): + inputData = inputData.replace(" ", "").split(";") + self._logVerbose(f"Setting input data {inputData} to {seqDict}", transID=transID, method=method) + seqDict["InputData"] = inputData + + for paramName, paramValue in paramsDict.items(): + if paramName not in ("InputData", "Site", "TargetSE"): + if paramValue: + self._logVerbose(f"Setting {paramName} to {paramValue}", transID=transID, method=method) + seqDict[paramName] = paramValue + + return inputData + def _handleRest(self, oJob, paramsDict): """add as JDL parameters all the other parameters that are not for inputs or destination""" transID = paramsDict["TransformationID"] diff --git a/src/DIRAC/TransformationSystem/Client/test/Test_Client_WorkflowTasks.py b/src/DIRAC/TransformationSystem/Client/test/Test_Client_WorkflowTasks.py index f29eea42e01..fdcb674014b 100644 --- a/src/DIRAC/TransformationSystem/Client/test/Test_Client_WorkflowTasks.py +++ b/src/DIRAC/TransformationSystem/Client/test/Test_Client_WorkflowTasks.py @@ -3,6 +3,7 @@ # pylint: disable=protected-access,missing-docstring,invalid-name from unittest.mock import MagicMock + import pytest from DIRAC import gLogger @@ -136,3 +137,28 @@ def test__handleDestination(mocker, paramsDict, expected): mocker.patch("DIRAC.TransformationSystem.Client.TaskManagerPlugin.getSitesForSE", side_effect=ourgetSitesForSE) res = wfTasks._handleDestination(paramsDict) assert sorted(res) == sorted(expected) + + +@pytest.mark.parametrize( + "seqDict, paramsDict, expected", + [ + ({}, {}, None), + ({"Site": "Site1", "JobName": "Job1", "JOB_ID": "00000001"}, {}, None), + ( + {"Site": "Site1", "JobName": "Job1", "JOB_ID": "00000001"}, + {"Site": "Site1", "JobType": "Sprucing", "TransformationID": 1}, + None, + ), + ( + {"Site": "Site1", "JobName": "Job1", "JOB_ID": "00000001"}, + {"Site": "Site1", "JobType": "Sprucing", "TransformationID": 1, "InputData": ["a1", "a2"]}, + ["a1", "a2"], + ), + # ({"a1": "aa1", "a2": "aa2", "a3": "aa3"}, {"b1": "bb1", "b2": "bb2", "b3": "bb3"}, {"b1": "bb1", "b2": "bb2"}, ["a1", "a2"]), + ], +) +def test__handleInputsBulk(mocker, seqDict, paramsDict, expected): + """Test the _handleInputsBulk method WorkflowTasks""" + mocker.patch("DIRAC.TransformationSystem.Client.TaskManagerPlugin.getSitesForSE", side_effect=ourgetSitesForSE) + res = wfTasks._handleInputsBulk(seqDict, paramsDict, transID=1) + assert res == expected From 35eed1dc3431f3600f822939fd4ee6e89be2b13d Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Tue, 4 Feb 2025 11:18:26 +0100 Subject: [PATCH 2/2] fix: make self.parametricSequencedKeys a variable --- src/DIRAC/TransformationSystem/Client/WorkflowTasks.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/DIRAC/TransformationSystem/Client/WorkflowTasks.py b/src/DIRAC/TransformationSystem/Client/WorkflowTasks.py index a3878e55ffa..0acd7f730c3 100644 --- a/src/DIRAC/TransformationSystem/Client/WorkflowTasks.py +++ b/src/DIRAC/TransformationSystem/Client/WorkflowTasks.py @@ -80,6 +80,8 @@ def __init__( self.outputDataModule_o = None + self.parametricSequencedKeys = ["JOB_ID", "PRODUCTION_ID", "InputData"] + def prepareTransformationTasks( self, transBody, taskDict, owner="", ownerGroup="", ownerDN="", bulkSubmissionFlag=False ): @@ -228,7 +230,7 @@ def _prepareTasksBulk(self, transBody, taskDict, owner, ownerGroup, ownerDN): paramSeqDict.setdefault(pName, []).append(seq) for paramName, paramSeq in paramSeqDict.items(): - if paramName in ["JOB_ID", "PRODUCTION_ID", "InputData"] + outputParameterList: + if paramName in self.parametricSequencedKeys + outputParameterList: res = oJob.setParameterSequence(paramName, paramSeq, addToWorkflow=paramName) else: res = oJob.setParameterSequence(paramName, paramSeq)