diff --git a/diracx-routers/src/diracx/routers/jobs/status.py b/diracx-routers/src/diracx/routers/jobs/status.py index 9f9519bf..8b9b55ac 100644 --- a/diracx-routers/src/diracx/routers/jobs/status.py +++ b/diracx-routers/src/diracx/routers/jobs/status.py @@ -5,14 +5,14 @@ from http import HTTPStatus from typing import Annotated, Any -from diracx.db.sql.job.db import _get_columns -from diracx.db.sql.job.schema import Jobs from fastapi import BackgroundTasks, HTTPException, Query from diracx.core.models import ( JobStatusUpdate, SetJobStatusReturn, ) +from diracx.db.sql.job.db import _get_columns +from diracx.db.sql.job.schema import Jobs from diracx.db.sql.utils.job import ( remove_jobs, reschedule_jobs_bulk, @@ -23,9 +23,9 @@ Config, JobDB, JobLoggingDB, + JobParametersDB, SandboxMetadataDB, TaskQueueDB, - JobParametersDB, ) from ..fastapi_classes import DiracxRouter from .access_policies import ActionType, CheckWMSPolicyCallable @@ -146,7 +146,9 @@ def set_job_parameters_or_attributes( job_db: JobDB, job_parameters_db: JobParametersDB, ): - possible_attribute_columns = [name.lower() for name in _get_columns(Jobs.__table__, None)] + possible_attribute_columns = [ + name.lower() for name in _get_columns(Jobs.__table__, None) + ] attr_updates = {} param_updates = {} @@ -154,16 +156,20 @@ def set_job_parameters_or_attributes( for job_id, metadata in updates.items(): # check if this is setting an attribute in the JobDB attr_updates[job_id] = { - pname: pvalue for pname, pvalue in metadata.items() if pname.lower() in possible_attribute_columns + pname: pvalue + for pname, pvalue in metadata.items() + if pname.lower() in possible_attribute_columns } # else set elastic parameters DB param_updates[job_id] = [ - (pname, pvalue) for pname, pvalue in metadata.items() if pname.lower() not in possible_attribute_columns + (pname, pvalue) + for pname, pvalue in metadata.items() + if pname.lower() not in possible_attribute_columns ] - + job_db.set_job_attributes_bulk(attr_updates) - + # TODO: can we upsert multiple documents? for job_id, updates in param_updates.items(): job_parameters_db.upsert(