diff --git a/diracx-core/src/diracx/core/exceptions.py b/diracx-core/src/diracx/core/exceptions.py index 7ac591e3..04b70192 100644 --- a/diracx-core/src/diracx/core/exceptions.py +++ b/diracx-core/src/diracx/core/exceptions.py @@ -56,6 +56,16 @@ def __init__(self, pfn: str, se_name: str, detail: str | None = None): ) +class SandboxAlreadyAssignedError(Exception): + def __init__(self, pfn: str, se_name: str, detail: str | None = None): + self.pfn: str = pfn + self.se_name: str = se_name + super().__init__( + f"Sandbox with {pfn} and {se_name} already assigned" + + (" ({detail})" if detail else "") + ) + + class JobError(Exception): def __init__(self, job_id, detail: str | None = None): self.job_id: int = job_id diff --git a/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py b/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py index acb8971e..68ed181a 100644 --- a/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py +++ b/diracx-db/src/diracx/db/sql/sandbox_metadata/db.py @@ -5,7 +5,7 @@ from sqlalchemy import Executable, delete, insert, literal, select, update from sqlalchemy.exc import IntegrityError, NoResultFound -from diracx.core.exceptions import SandboxNotFoundError +from diracx.core.exceptions import SandboxAlreadyAssignedError, SandboxNotFoundError from diracx.core.models import SandboxInfo, SandboxType, UserInfo from diracx.db.sql.utils import BaseSQLDB, utcnow @@ -135,10 +135,18 @@ async def assign_sandbox_to_jobs( stmt = insert(SBEntityMapping).from_select( ["SBId", "EntityId", "Type"], select_sb_id ) - await self.conn.execute(stmt) + try: + await self.conn.execute(stmt) + except IntegrityError as e: + raise SandboxAlreadyAssignedError(pfn, se_name) from e stmt = update(SandBoxes).where(SandBoxes.SEPFN == pfn).values(Assigned=True) result = await self.conn.execute(stmt) + if result.rowcount == 0: + # If the update didn't affect any row, the sandbox doesn't exist + # It means the previous insert didn't have any effect + raise SandboxNotFoundError(pfn, se_name) + assert result.rowcount == 1 async def unassign_sandboxes_to_jobs(self, jobs_ids: list[int]) -> None: diff --git a/diracx-routers/src/diracx/routers/jobs/sandboxes.py b/diracx-routers/src/diracx/routers/jobs/sandboxes.py index 731b00c0..01441032 100644 --- a/diracx-routers/src/diracx/routers/jobs/sandboxes.py +++ b/diracx-routers/src/diracx/routers/jobs/sandboxes.py @@ -13,7 +13,7 @@ from pydantic_settings import SettingsConfigDict from pyparsing import Any -from diracx.core.exceptions import SandboxNotFoundError +from diracx.core.exceptions import SandboxAlreadyAssignedError, SandboxNotFoundError from diracx.core.models import ( SandboxInfo, SandboxType, @@ -267,12 +267,21 @@ async def assign_sandbox_to_job( """Map the pfn as output sandbox to job.""" await check_permissions(action=ActionType.MANAGE, job_db=job_db, job_ids=[job_id]) short_pfn = pfn.split("|", 1)[-1] - await sandbox_metadata_db.assign_sandbox_to_jobs( - jobs_ids=[job_id], - pfn=short_pfn, - sb_type=SandboxType.Output, - se_name=settings.se_name, - ) + try: + await sandbox_metadata_db.assign_sandbox_to_jobs( + jobs_ids=[job_id], + pfn=short_pfn, + sb_type=SandboxType.Output, + se_name=settings.se_name, + ) + except SandboxNotFoundError as e: + raise HTTPException( + status_code=HTTPStatus.BAD_REQUEST, detail="Sandbox not found" + ) from e + except (SandboxAlreadyAssignedError, AssertionError) as e: + raise HTTPException( + status_code=HTTPStatus.BAD_REQUEST, detail="Sandbox already assigned" + ) from e @router.delete("/{job_id}/sandbox") diff --git a/diracx-routers/tests/jobs/test_sandboxes.py b/diracx-routers/tests/jobs/test_sandboxes.py index 4dce9bb0..36763ea1 100644 --- a/diracx-routers/tests/jobs/test_sandboxes.py +++ b/diracx-routers/tests/jobs/test_sandboxes.py @@ -249,3 +249,64 @@ def test_get_empty_job_sandboxes(normal_user_client: TestClient): r = normal_user_client.get(f"/api/jobs/{job_id}/sandbox") assert r.status_code == 200 assert r.json() == {"Input": [None], "Output": [None]} + + +def test_assign_nonexisting_sb_to_job(normal_user_client: TestClient): + """Test that we cannot assign a non-existing sandbox to a job.""" + # Submit a job: + job_definitions = [TEST_JDL] + r = normal_user_client.post("/api/jobs/jdl", json=job_definitions) + assert r.status_code == 200, r.json() + assert len(r.json()) == len(job_definitions) + job_id = r.json()[0]["JobID"] + + # Malformed request: + r = normal_user_client.patch( + f"/api/jobs/{job_id}/sandbox/output", + json="/S3/pathto/vo/vo_group/user/sha256:55967b0c430058c3105472b1edae6c8987c65bcf01ef58f10a3f5e93948782d8.tar.bz2", + ) + assert r.status_code == 400 + + +def test_assign_sb_to_job_twice(normal_user_client: TestClient): + """Test that we cannot assign a sandbox to a job twice.""" + data = secrets.token_bytes(512) + checksum = hashlib.sha256(data).hexdigest() + + # Upload Sandbox: + r = normal_user_client.post( + "/api/jobs/sandbox", + json={ + "checksum_algorithm": "sha256", + "checksum": checksum, + "size": len(data), + "format": "tar.bz2", + }, + ) + + assert r.status_code == 200, r.text + upload_info = r.json() + assert upload_info["url"] + sandbox_pfn = upload_info["pfn"] + assert sandbox_pfn.startswith("SB:SandboxSE|/S3/") + + # Submit a job: + job_definitions = [TEST_JDL] + r = normal_user_client.post("/api/jobs/jdl", json=job_definitions) + assert r.status_code == 200, r.json() + assert len(r.json()) == len(job_definitions) + job_id = r.json()[0]["JobID"] + + # Assign sandbox to the job: first attempt should be successful + r = normal_user_client.patch( + f"/api/jobs/{job_id}/sandbox/output", + json=sandbox_pfn, + ) + assert r.status_code == 200 + + # Assign sandbox to the job: second attempt should fail + r = normal_user_client.patch( + f"/api/jobs/{job_id}/sandbox/output", + json=sandbox_pfn, + ) + assert r.status_code == 400