Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.0] fix: make the setting of inputDataBulk extendable #8026

Merged
merged 2 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 28 additions & 20 deletions src/DIRAC/TransformationSystem/Client/WorkflowTasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ def __init__(

self.outputDataModule_o = None

self.parametricSequencedKeys = ["JOB_ID", "PRODUCTION_ID", "InputData"]

def prepareTransformationTasks(
self, transBody, taskDict, owner="", ownerGroup="", ownerDN="", bulkSubmissionFlag=False
):
Expand Down Expand Up @@ -187,9 +189,8 @@ def _prepareTasksBulk(self, transBody, taskDict, owner, ownerGroup, ownerDN):
if not sites:
self._logError("Could not get a list a sites", transID=transID, method=method)
return S_ERROR(ETSUKN, "Can not evaluate destination site")
else:
self._logVerbose("Setting Site: ", str(sites), transID=transID, method=method)
seqDict["Site"] = sites
self._logVerbose("Setting Site: ", str(sites), transID=transID, method=method)
seqDict["Site"] = sites

seqDict["JobName"] = self._transTaskName(transID, taskID)
seqDict["JOB_ID"] = str(taskID).zfill(8)
Expand All @@ -200,22 +201,7 @@ def _prepareTasksBulk(self, transBody, taskDict, owner, ownerGroup, ownerDN):
method=method,
)

# Handle Input Data
inputData = paramsDict.get("InputData")
if inputData:
if isinstance(inputData, str):
inputData = inputData.replace(" ", "").split(";")
self._logVerbose(f"Setting input data to {inputData}", transID=transID, method=method)
seqDict["InputData"] = inputData
elif paramSeqDict.get("InputData") is not None:
self._logError("Invalid mixture of jobs with and without input data")
return S_ERROR(ETSDATA, "Invalid mixture of jobs with and without input data")

for paramName, paramValue in paramsDict.items():
if paramName not in ("InputData", "Site", "TargetSE"):
if paramValue:
self._logVerbose(f"Setting {paramName} to {paramValue}", transID=transID, method=method)
seqDict[paramName] = paramValue
inputData = self._handleInputsBulk(seqDict, paramsDict, transID)

outputParameterList = []
if self.outputDataModule:
Expand Down Expand Up @@ -244,7 +230,7 @@ def _prepareTasksBulk(self, transBody, taskDict, owner, ownerGroup, ownerDN):
paramSeqDict.setdefault(pName, []).append(seq)

for paramName, paramSeq in paramSeqDict.items():
if paramName in ["JOB_ID", "PRODUCTION_ID", "InputData"] + outputParameterList:
if paramName in self.parametricSequencedKeys + outputParameterList:
res = oJob.setParameterSequence(paramName, paramSeq, addToWorkflow=paramName)
else:
res = oJob.setParameterSequence(paramName, paramSeq)
Expand Down Expand Up @@ -412,6 +398,28 @@ def _handleInputs(self, oJob, paramsDict):
if not res["OK"]:
self._logError(f"Could not set the inputs: {res['Message']}", transID=transID, method="_handleInputs")

def _handleInputsBulk(self, seqDict, paramsDict, transID):
"""set job inputs (+ metadata)"""
method = "_handleInputsBulk"
if seqDict:
self._logVerbose(f"Setting job input data to {seqDict}", transID=transID, method=method)

# Handle Input Data
inputData = paramsDict.get("InputData")
if inputData:
if isinstance(inputData, str):
inputData = inputData.replace(" ", "").split(";")
self._logVerbose(f"Setting input data {inputData} to {seqDict}", transID=transID, method=method)
seqDict["InputData"] = inputData

for paramName, paramValue in paramsDict.items():
if paramName not in ("InputData", "Site", "TargetSE"):
if paramValue:
self._logVerbose(f"Setting {paramName} to {paramValue}", transID=transID, method=method)
seqDict[paramName] = paramValue

return inputData

def _handleRest(self, oJob, paramsDict):
"""add as JDL parameters all the other parameters that are not for inputs or destination"""
transID = paramsDict["TransformationID"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# pylint: disable=protected-access,missing-docstring,invalid-name

from unittest.mock import MagicMock

import pytest

from DIRAC import gLogger
Expand Down Expand Up @@ -136,3 +137,28 @@ def test__handleDestination(mocker, paramsDict, expected):
mocker.patch("DIRAC.TransformationSystem.Client.TaskManagerPlugin.getSitesForSE", side_effect=ourgetSitesForSE)
res = wfTasks._handleDestination(paramsDict)
assert sorted(res) == sorted(expected)


@pytest.mark.parametrize(
"seqDict, paramsDict, expected",
[
({}, {}, None),
({"Site": "Site1", "JobName": "Job1", "JOB_ID": "00000001"}, {}, None),
(
{"Site": "Site1", "JobName": "Job1", "JOB_ID": "00000001"},
{"Site": "Site1", "JobType": "Sprucing", "TransformationID": 1},
None,
),
(
{"Site": "Site1", "JobName": "Job1", "JOB_ID": "00000001"},
{"Site": "Site1", "JobType": "Sprucing", "TransformationID": 1, "InputData": ["a1", "a2"]},
["a1", "a2"],
),
# ({"a1": "aa1", "a2": "aa2", "a3": "aa3"}, {"b1": "bb1", "b2": "bb2", "b3": "bb3"}, {"b1": "bb1", "b2": "bb2"}, ["a1", "a2"]),
],
)
def test__handleInputsBulk(mocker, seqDict, paramsDict, expected):
"""Test the _handleInputsBulk method WorkflowTasks"""
mocker.patch("DIRAC.TransformationSystem.Client.TaskManagerPlugin.getSitesForSE", side_effect=ourgetSitesForSE)
res = wfTasks._handleInputsBulk(seqDict, paramsDict, transID=1)
assert res == expected
Loading