Skip to content

Commit

Permalink
Add 'assign sb to job' and 'get job sb' routes
Browse files Browse the repository at this point in the history
  • Loading branch information
natthan-pigoux committed Nov 20, 2023
1 parent d3b1652 commit 290043e
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 7 deletions.
40 changes: 36 additions & 4 deletions src/diracx/db/sql/sandbox_metadata/db.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

from typing import Any

import sqlalchemy
from sqlalchemy import delete

Expand Down Expand Up @@ -83,12 +85,42 @@ async def sandbox_is_assigned(self, se_name: str, pfn: str) -> bool:
result = await self.conn.execute(stmt)
is_assigned = result.scalar_one()
return is_assigned
return True

async def get_sandbox_assigned_to_job(self, job_id: int, sb_type: str) -> list[Any]:
"""Get the sandbox assign to job"""
stmt = (
sqlalchemy.select(sb_SandBoxes.SEPFN)
.where(sb_SandBoxes.SBId == sb_EntityMapping.SBId)
.where(
sb_EntityMapping.EntityId == job_id, sb_EntityMapping.Type == sb_type
)
)
result = await self.conn.execute(stmt)
return [result.scalar()]

async def assign_sandbox_to_job(
self, job_id: int, pfn: str, sb_type: str, se_name: str
):
"""Mapp sandbox and job"""
select_sbid = sqlalchemy.select(
sb_SandBoxes.SBId,
sqlalchemy.literal(job_id).label("EntityId"),
sqlalchemy.literal(sb_type).label("Type"),
).where(sb_SandBoxes.SEName == se_name, sb_SandBoxes.SEPFN == pfn)
stmt = sqlalchemy.insert(sb_EntityMapping).from_select(
["SBId", "EntityId", "Type"], select_sbid
)
await self.conn.execute(stmt)

stmt = (
sqlalchemy.update(sb_SandBoxes)
.where(sb_SandBoxes.SEPFN == pfn)
.values(Assigned=True)
)
await self.conn.execute(stmt)

async def unassign_sandbox_from_jobs(self, job_ids: list[int]):
"""
Unassign sandbox from jobs
"""
"""Delete sandbox and job mapping"""
stmt = delete(sb_EntityMapping).where(
sb_EntityMapping.EntityId.in_(f"Job:{job_id}" for job_id in job_ids)
)
Expand Down
46 changes: 44 additions & 2 deletions src/diracx/routers/job_manager/sandboxes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

import contextlib
from http import HTTPStatus
from typing import TYPE_CHECKING, Annotated, AsyncIterator
from typing import TYPE_CHECKING, Annotated, AsyncIterator, Literal

from aiobotocore.session import get_session
from botocore.config import Config
from botocore.errorfactory import ClientError
from fastapi import Depends, HTTPException, Query
from fastapi import Body, Depends, HTTPException, Query
from pydantic import BaseModel, PrivateAttr
from pyparsing import Any
from sqlalchemy.exc import NoResultFound

from diracx.core.models import (
Expand Down Expand Up @@ -192,3 +193,44 @@ async def get_sandbox_file(
return SandboxDownloadResponse(
url=presigned_url, expires_in=settings.url_validity_seconds
)


@router.get("/{job_id}/sandbox")
async def get_job_sandboxes(
job_id: int,
sandbox_metadata_db: SandboxMetadataDB,
) -> dict[str, list[Any]]:
"""Get input and output sandboxes of given job id at the same time."""
# TODO: check that user as created the job or is admin
input_sb = await sandbox_metadata_db.get_sandbox_assigned_to_job(job_id, "Input")
output_sb = await sandbox_metadata_db.get_sandbox_assigned_to_job(job_id, "Output")
return {"Input": input_sb, "Output": output_sb}


@router.get("/{job_id}/sandbox/{sandbox_type}")
async def get_job_sandbox(
job_id: int,
sandbox_metadata_db: SandboxMetadataDB,
sandbox_type: Literal["input", "output"],
) -> list[Any]:
"""Get input or output sandbox from given job"""
# TODO: check that user has created the job or is admin
job_sb_pfns = await sandbox_metadata_db.get_sandbox_assigned_to_job(
job_id, sandbox_type.capitalize()
)

return job_sb_pfns


@router.patch("/{job_id}/sandbox/output")
async def assign_sandbox(
job_id: int,
pfn: Annotated[str, Body(max_length=256, pattern=SANDBOX_PFN_REGEX)],
sandbox_metadata_db: SandboxMetadataDB,
settings: SandboxStoreSettings,
):
# TODO: check that user has created the job or is admin
short_pfn = pfn.split("|", 1)[-1]
await sandbox_metadata_db.assign_sandbox_to_job(
job_id, short_pfn, "Output", settings.se_name
)
57 changes: 56 additions & 1 deletion tests/db/test_sandbox_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from diracx.core.models import SandboxInfo, UserInfo
from diracx.db.sql.sandbox_metadata.db import SandboxMetadataDB
from diracx.db.sql.sandbox_metadata.schema import sb_SandBoxes
from diracx.db.sql.sandbox_metadata.schema import sb_EntityMapping, sb_SandBoxes


@pytest.fixture
Expand Down Expand Up @@ -90,3 +90,58 @@ async def _dump_db(
)
res = await sandbox_metadata_db.conn.execute(stmt)
return {row.SEPFN: (row.OwnerId, row.LastAccessTime) for row in res}


async def test_assign_sandbox_to_job(sandbox_metadata_db: SandboxMetadataDB):
pfn = secrets.token_hex()
user_info = UserInfo(
sub="vo:sub", preferred_username="user1", dirac_group="group1", vo="vo"
)
dummy_jobid = 666
# Insert the sandbox
async with sandbox_metadata_db:
await sandbox_metadata_db.insert_sandbox("SandboxSE", user_info, pfn, 100)

async with sandbox_metadata_db:
stmt = sqlalchemy.select(sb_SandBoxes.SBId, sb_SandBoxes.SEPFN)
res = await sandbox_metadata_db.conn.execute(stmt)
db_contents = {row.SEPFN: row.SBId for row in res}
SBId1 = db_contents[pfn]
# The sandbox still hasn't been assigned
async with sandbox_metadata_db:
assert not await sandbox_metadata_db.sandbox_is_assigned("SandboxSE", pfn)

# Check there is no mapping
async with sandbox_metadata_db:
stmt = sqlalchemy.select(
sb_EntityMapping.SBId, sb_EntityMapping.EntityId, sb_EntityMapping.Type
)
res = await sandbox_metadata_db.conn.execute(stmt)
db_contents = {row.SBId: (row.EntityId, row.Type) for row in res}
assert db_contents == {}

# Assign sandbox with dummy jobid
async with sandbox_metadata_db:
await sandbox_metadata_db.assign_sandbox_to_job(
dummy_jobid, pfn, "Output", "SandboxSE"
)
# Check if sandbox and job are mapped
async with sandbox_metadata_db:
stmt = sqlalchemy.select(
sb_EntityMapping.SBId, sb_EntityMapping.EntityId, sb_EntityMapping.Type
)
res = await sandbox_metadata_db.conn.execute(stmt)
db_contents = {row.SBId: (row.EntityId, row.Type) for row in res}

EntityID1, Type = db_contents[SBId1]
assert int(EntityID1) == dummy_jobid
assert Type == "Output"

async with sandbox_metadata_db:
stmt = sqlalchemy.select(sb_SandBoxes.SBId, sb_SandBoxes.SEPFN)
res = await sandbox_metadata_db.conn.execute(stmt)
db_contents = {row.SEPFN: row.SBId for row in res}
SBId1 = db_contents[pfn]
# The sandbox should be assigned
async with sandbox_metadata_db:
assert await sandbox_metadata_db.sandbox_is_assigned("SandboxSE", pfn)
73 changes: 73 additions & 0 deletions tests/routers/jobs/test_sandboxes.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,76 @@ def test_upload_oversized(normal_user_client: TestClient):
)
assert r.status_code == 400, r.text
assert "Sandbox too large" in r.json()["detail"], r.text


TEST_JDL = """
Arguments = "jobDescription.xml -o LogLevel=INFO";
Executable = "dirac-jobexec";
JobGroup = jobGroup;
JobName = jobName;
JobType = User;
LogLevel = INFO;
OutputSandbox =
{
Script1_CodeOutput.log,
std.err,
std.out
};
Priority = 1;
Site = ANY;
StdError = std.err;
StdOutput = std.out;
"""


def test_assign_sandbox_to_job(normal_user_client: TestClient):
data = secrets.token_bytes(512)
checksum = hashlib.sha256(data).hexdigest()

# Upload Sandbox:
r = normal_user_client.post(
"/api/jobs/sandbox",
json={
"checksum_algorithm": "sha256",
"checksum": checksum,
"size": len(data),
"format": "tar.bz2",
},
)
assert r.status_code == 200, r.text
upload_info = r.json()
assert upload_info["url"]
sandbox_pfn = upload_info["pfn"]
assert sandbox_pfn.startswith("SB:SandboxSE|/S3/")

# Submit a job:
job_definition = [TEST_JDL]
r = normal_user_client.post("/api/jobs/", json=job_definition)
assert r.status_code == 200, r.json()[0]["JobID"]
job_id = r.json()[0]["JobID"]

# Getting job sb:
r = normal_user_client.get(f"/api/jobs/{job_id}/sandbox/output")
assert r.status_code == 200
# Should be empty
# assert r.json()[0] is None

# Assign sb to job:
r = normal_user_client.patch(
f"/api/jobs/{job_id}/sandbox/output",
json=sandbox_pfn,
)
assert r.status_code == 200

# Get the sb again:
short_pfn = sandbox_pfn.split("|", 1)[-1]
r = normal_user_client.get(f"/api/jobs/{job_id}/sandbox/")
assert r.status_code == 200
assert r.json()["Input"] == [None]
assert r.json()["Output"] == [short_pfn]

r = normal_user_client.get(f"/api/jobs/{job_id}/sandbox/output")
assert r.status_code == 200
assert r.json()[0] == short_pfn

# TODO: unassigned sb to job

0 comments on commit 290043e

Please sign in to comment.