Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(db): assigning non existing sandbox or multiple times the same sandbox #393

Merged
merged 1 commit into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions diracx-core/src/diracx/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ def __init__(self, pfn: str, se_name: str, detail: str | None = None):
)


class SandboxAlreadyAssignedError(Exception):
def __init__(self, pfn: str, se_name: str, detail: str | None = None):
self.pfn: str = pfn
self.se_name: str = se_name
super().__init__(
f"Sandbox with {pfn} and {se_name} already assigned"
+ (" ({detail})" if detail else "")
)


class JobError(Exception):
def __init__(self, job_id, detail: str | None = None):
self.job_id: int = job_id
Expand Down
12 changes: 10 additions & 2 deletions diracx-db/src/diracx/db/sql/sandbox_metadata/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sqlalchemy import Executable, delete, insert, literal, select, update
from sqlalchemy.exc import IntegrityError, NoResultFound

from diracx.core.exceptions import SandboxNotFoundError
from diracx.core.exceptions import SandboxAlreadyAssignedError, SandboxNotFoundError
from diracx.core.models import SandboxInfo, SandboxType, UserInfo
from diracx.db.sql.utils import BaseSQLDB, utcnow

Expand Down Expand Up @@ -135,10 +135,18 @@
stmt = insert(SBEntityMapping).from_select(
["SBId", "EntityId", "Type"], select_sb_id
)
await self.conn.execute(stmt)
try:
await self.conn.execute(stmt)
except IntegrityError as e:
raise SandboxAlreadyAssignedError(pfn, se_name) from e

stmt = update(SandBoxes).where(SandBoxes.SEPFN == pfn).values(Assigned=True)
result = await self.conn.execute(stmt)
if result.rowcount == 0:
# If the update didn't affect any row, the sandbox doesn't exist
# It means the previous insert didn't have any effect
raise SandboxNotFoundError(pfn, se_name)

Check warning on line 148 in diracx-db/src/diracx/db/sql/sandbox_metadata/db.py

View check run for this annotation

Codecov / codecov/patch

diracx-db/src/diracx/db/sql/sandbox_metadata/db.py#L148

Added line #L148 was not covered by tests

assert result.rowcount == 1

async def unassign_sandboxes_to_jobs(self, jobs_ids: list[int]) -> None:
Expand Down
23 changes: 16 additions & 7 deletions diracx-routers/src/diracx/routers/jobs/sandboxes.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pydantic_settings import SettingsConfigDict
from pyparsing import Any

from diracx.core.exceptions import SandboxNotFoundError
from diracx.core.exceptions import SandboxAlreadyAssignedError, SandboxNotFoundError
from diracx.core.models import (
SandboxInfo,
SandboxType,
Expand Down Expand Up @@ -267,12 +267,21 @@
"""Map the pfn as output sandbox to job."""
await check_permissions(action=ActionType.MANAGE, job_db=job_db, job_ids=[job_id])
short_pfn = pfn.split("|", 1)[-1]
await sandbox_metadata_db.assign_sandbox_to_jobs(
jobs_ids=[job_id],
pfn=short_pfn,
sb_type=SandboxType.Output,
se_name=settings.se_name,
)
try:
await sandbox_metadata_db.assign_sandbox_to_jobs(
jobs_ids=[job_id],
pfn=short_pfn,
sb_type=SandboxType.Output,
se_name=settings.se_name,
)
except SandboxNotFoundError as e:
raise HTTPException(

Check warning on line 278 in diracx-routers/src/diracx/routers/jobs/sandboxes.py

View check run for this annotation

Codecov / codecov/patch

diracx-routers/src/diracx/routers/jobs/sandboxes.py#L278

Added line #L278 was not covered by tests
status_code=HTTPStatus.BAD_REQUEST, detail="Sandbox not found"
) from e
except (SandboxAlreadyAssignedError, AssertionError) as e:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST, detail="Sandbox already assigned"
) from e


@router.delete("/{job_id}/sandbox")
Expand Down
61 changes: 61 additions & 0 deletions diracx-routers/tests/jobs/test_sandboxes.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,64 @@ def test_get_empty_job_sandboxes(normal_user_client: TestClient):
r = normal_user_client.get(f"/api/jobs/{job_id}/sandbox")
assert r.status_code == 200
assert r.json() == {"Input": [None], "Output": [None]}


def test_assign_nonexisting_sb_to_job(normal_user_client: TestClient):
"""Test that we cannot assign a non-existing sandbox to a job."""
# Submit a job:
job_definitions = [TEST_JDL]
r = normal_user_client.post("/api/jobs/jdl", json=job_definitions)
assert r.status_code == 200, r.json()
assert len(r.json()) == len(job_definitions)
job_id = r.json()[0]["JobID"]

# Malformed request:
r = normal_user_client.patch(
f"/api/jobs/{job_id}/sandbox/output",
json="/S3/pathto/vo/vo_group/user/sha256:55967b0c430058c3105472b1edae6c8987c65bcf01ef58f10a3f5e93948782d8.tar.bz2",
)
assert r.status_code == 400


def test_assign_sb_to_job_twice(normal_user_client: TestClient):
"""Test that we cannot assign a sandbox to a job twice."""
data = secrets.token_bytes(512)
checksum = hashlib.sha256(data).hexdigest()

# Upload Sandbox:
r = normal_user_client.post(
"/api/jobs/sandbox",
json={
"checksum_algorithm": "sha256",
"checksum": checksum,
"size": len(data),
"format": "tar.bz2",
},
)

assert r.status_code == 200, r.text
upload_info = r.json()
assert upload_info["url"]
sandbox_pfn = upload_info["pfn"]
assert sandbox_pfn.startswith("SB:SandboxSE|/S3/")

# Submit a job:
job_definitions = [TEST_JDL]
r = normal_user_client.post("/api/jobs/jdl", json=job_definitions)
assert r.status_code == 200, r.json()
assert len(r.json()) == len(job_definitions)
job_id = r.json()[0]["JobID"]

# Assign sandbox to the job: first attempt should be successful
r = normal_user_client.patch(
f"/api/jobs/{job_id}/sandbox/output",
json=sandbox_pfn,
)
assert r.status_code == 200

# Assign sandbox to the job: second attempt should fail
r = normal_user_client.patch(
f"/api/jobs/{job_id}/sandbox/output",
json=sandbox_pfn,
)
assert r.status_code == 400
Loading