Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable remote pilot logging system [MISSING AUTH] #269

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions diracx-core/src/diracx/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,14 @@ class VOInfo(TypedDict):

class Metadata(TypedDict):
virtual_organizations: dict[str, VOInfo]


class LogLine(BaseModel):
line_no: int
line: str


class LogMessage(BaseModel):
pilot_stamp: str
lines: list[LogLine]
vo: str
1 change: 1 addition & 0 deletions diracx-db/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ TaskQueueDB = "diracx.db.sql:TaskQueueDB"

[project.entry-points."diracx.dbs.os"]
JobParametersDB = "diracx.db.os:JobParametersDB"
PilotLogsDB = "diracx.db.os:PilotLogsDB"

[tool.setuptools.packages.find]
where = ["src"]
Expand Down
6 changes: 5 additions & 1 deletion diracx-db/src/diracx/db/os/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from __future__ import annotations

__all__ = ("JobParametersDB",)
__all__ = (
"JobParametersDB",
"PilotLogsDB",
)

from .job_parameters import JobParametersDB
from .pilot_logs import PilotLogsDB
35 changes: 35 additions & 0 deletions diracx-db/src/diracx/db/os/pilot_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from __future__ import annotations

from diracx.db.os.utils import BaseOSDB


class PilotLogsDB(BaseOSDB):
fields = {
"PilotStamp": {"type": "keyword"},
"PilotID": {"type": "long"},
"SubmissionTime": {"type": "date"},
"LineNumber": {"type": "long"},
"Message": {"type": "text"},
"VO": {"type": "keyword"},
"timestamp": {"type": "date"},
}
index_prefix = "pilot_logs"

def index_name(self, doc_id: int) -> str:
# TODO decide how to define the index name
# use pilot ID
return f"{self.index_prefix}_{doc_id // 1e6:.0f}"


async def search_message(db: PilotLogsDB, search_params: list[dict]):

return await db.search(
["Message"],
search_params,
[{"parameter": "LineNumber", "direction": "asc"}],
)


async def bulk_insert(db: PilotLogsDB, docs: list[dict], pilot_id: int):

await db.bulk_insert(db.index_name(pilot_id), docs)
19 changes: 19 additions & 0 deletions diracx-db/src/diracx/db/os/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import Any, Self

from opensearchpy import AsyncOpenSearch
from opensearchpy.helpers import async_bulk

from diracx.core.exceptions import InvalidQueryError
from diracx.core.extensions import select_from_extension
Expand Down Expand Up @@ -192,6 +193,13 @@
)
print(f"{response=}")

async def bulk_insert(self, index_name: str, docs: list[dict[str, Any]]) -> None:
"""Bulk inserting to database."""
n_inserted = await async_bulk(

Check warning on line 198 in diracx-db/src/diracx/db/os/utils.py

View check run for this annotation

Codecov / codecov/patch

diracx-db/src/diracx/db/os/utils.py#L198

Added line #L198 was not covered by tests
self.client, actions=[doc | {"_index": index_name} for doc in docs]
)
logger.info("Inserted %d documents to %r", n_inserted, index_name)

Check warning on line 201 in diracx-db/src/diracx/db/os/utils.py

View check run for this annotation

Codecov / codecov/patch

diracx-db/src/diracx/db/os/utils.py#L201

Added line #L201 was not covered by tests

async def search(
self, parameters, search, sorts, *, per_page: int = 100, page: int | None = None
) -> list[dict[str, Any]]:
Expand Down Expand Up @@ -233,6 +241,17 @@

return hits

async def delete(self, query: list[dict[str, Any]]) -> dict:
"""Delete multiple documents by query."""
body = {}
res = {}
if query:
body["query"] = apply_search_filters(self.fields, query)
res = await self.client.delete_by_query(

Check warning on line 250 in diracx-db/src/diracx/db/os/utils.py

View check run for this annotation

Codecov / codecov/patch

diracx-db/src/diracx/db/os/utils.py#L246-L250

Added lines #L246 - L250 were not covered by tests
body=body, index=f"{self.index_prefix}*"
)
return res

Check warning on line 253 in diracx-db/src/diracx/db/os/utils.py

View check run for this annotation

Codecov / codecov/patch

diracx-db/src/diracx/db/os/utils.py#L253

Added line #L253 was not covered by tests


def require_type(operator, field_name, field_type, allowed_types):
if field_type not in allowed_types:
Expand Down
19 changes: 4 additions & 15 deletions diracx-db/src/diracx/db/sql/job/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
SortSpec,
)

from ..utils import BaseSQLDB, apply_search_filters, apply_sort_constraints
from ..utils import BaseSQLDB, apply_search_filters, apply_sort_constraints, get_columns
from .schema import (
InputData,
JobCommands,
Expand All @@ -24,17 +24,6 @@
)


def _get_columns(table, parameters):
columns = [x for x in table.columns]
if parameters:
if unrecognised_parameters := set(parameters) - set(table.columns.keys()):
raise InvalidQueryError(
f"Unrecognised parameters requested {unrecognised_parameters}"
)
columns = [c for c in columns if c.name in parameters]
return columns


class JobDB(BaseSQLDB):
metadata = JobDBBase.metadata

Expand All @@ -45,7 +34,7 @@

async def summary(self, group_by, search) -> list[dict[str, str | int]]:
"""Get a summary of the jobs."""
columns = _get_columns(Jobs.__table__, group_by)
columns = get_columns(Jobs.__table__, group_by)

stmt = select(*columns, func.count(Jobs.job_id).label("count"))
stmt = apply_search_filters(Jobs.__table__.columns.__getitem__, stmt, search)
Expand All @@ -70,7 +59,7 @@
) -> tuple[int, list[dict[Any, Any]]]:
"""Search for jobs in the database."""
# Find which columns to select
columns = _get_columns(Jobs.__table__, parameters)
columns = get_columns(Jobs.__table__, parameters)

stmt = select(*columns)

Expand Down Expand Up @@ -235,7 +224,7 @@
required_parameters = list(required_parameters_set)[0]
update_parameters = [{"job_id": k, **v} for k, v in properties.items()]

columns = _get_columns(Jobs.__table__, required_parameters)
columns = get_columns(Jobs.__table__, required_parameters)

Check warning on line 227 in diracx-db/src/diracx/db/sql/job/db.py

View check run for this annotation

Codecov / codecov/patch

diracx-db/src/diracx/db/sql/job/db.py#L227

Added line #L227 was not covered by tests
values: dict[str, BindParameter[Any] | datetime] = {
c.name: bindparam(c.name) for c in columns
}
Expand Down
54 changes: 52 additions & 2 deletions diracx-db/src/diracx/db/sql/pilot_agents/db.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
from __future__ import annotations

from datetime import datetime, timezone
from typing import Any

from sqlalchemy import insert
from sqlalchemy import func, insert, select

from ..utils import BaseSQLDB
from diracx.core.exceptions import InvalidQueryError
from diracx.core.models import (
SearchSpec,
SortSpec,
)

from ..utils import BaseSQLDB, apply_search_filters, apply_sort_constraints, get_columns
from .schema import PilotAgents, PilotAgentsDBBase


Expand Down Expand Up @@ -44,3 +51,46 @@
stmt = insert(PilotAgents).values(values)
await self.conn.execute(stmt)
return

async def search(
self,
parameters: list[str] | None,
search: list[SearchSpec],
sorts: list[SortSpec],
*,
distinct: bool = False,
per_page: int = 100,
page: int | None = None,
) -> tuple[int, list[dict[Any, Any]]]:
# Find which columns to select
columns = get_columns(PilotAgents.__table__, parameters)

stmt = select(*columns)

stmt = apply_search_filters(
PilotAgents.__table__.columns.__getitem__, stmt, search
)
stmt = apply_sort_constraints(
PilotAgents.__table__.columns.__getitem__, stmt, sorts
)

if distinct:
stmt = stmt.distinct()

Check warning on line 78 in diracx-db/src/diracx/db/sql/pilot_agents/db.py

View check run for this annotation

Codecov / codecov/patch

diracx-db/src/diracx/db/sql/pilot_agents/db.py#L78

Added line #L78 was not covered by tests

# Calculate total count before applying pagination
total_count_subquery = stmt.alias()
total_count_stmt = select(func.count()).select_from(total_count_subquery)
total = (await self.conn.execute(total_count_stmt)).scalar_one()

# Apply pagination
if page is not None:
if page < 1:
raise InvalidQueryError("Page must be a positive integer")
if per_page < 1:
raise InvalidQueryError("Per page must be a positive integer")
stmt = stmt.offset((page - 1) * per_page).limit(per_page)

Check warning on line 91 in diracx-db/src/diracx/db/sql/pilot_agents/db.py

View check run for this annotation

Codecov / codecov/patch

diracx-db/src/diracx/db/sql/pilot_agents/db.py#L86-L91

Added lines #L86 - L91 were not covered by tests

# Execute the query
return total, [
dict(row._mapping) async for row in (await self.conn.stream(stmt))
]
2 changes: 2 additions & 0 deletions diracx-db/src/diracx/db/sql/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
SQLDBUnavailableError,
apply_search_filters,
apply_sort_constraints,
get_columns,
)
from .functions import hash, substract_date, utcnow
from .types import Column, DateNowColumn, EnumBackedBool, EnumColumn, NullColumn
Expand All @@ -19,6 +20,7 @@
"EnumColumn",
"apply_search_filters",
"apply_sort_constraints",
"get_columns",
"substract_date",
"hash",
"SQLDBUnavailableError",
Expand Down
11 changes: 11 additions & 0 deletions diracx-db/src/diracx/db/sql/utils/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,17 @@ def find_time_resolution(value):
raise InvalidQueryError(f"Cannot parse {value=}")


def get_columns(table, parameters):
columns = [x for x in table.columns]
if parameters:
if unrecognised_parameters := set(parameters) - set(table.columns.keys()):
raise InvalidQueryError(
f"Unrecognised parameters requested {unrecognised_parameters}"
)
columns = [c for c in columns if c.name in parameters]
return columns


def apply_search_filters(column_mapping, stmt, search):
for query in search:
try:
Expand Down
4 changes: 2 additions & 2 deletions diracx-logic/src/diracx/logic/jobs/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
VectorSearchSpec,
)
from diracx.db.os.job_parameters import JobParametersDB
from diracx.db.sql.job.db import JobDB, _get_columns
from diracx.db.sql.job.db import JobDB, get_columns
from diracx.db.sql.job.schema import Jobs
from diracx.db.sql.job_logging.db import JobLoggingDB
from diracx.db.sql.sandbox_metadata.db import SandboxMetadataDB
Expand Down Expand Up @@ -485,7 +485,7 @@ async def set_job_parameters_or_attributes(
):
"""Set job parameters or attributes for a list of jobs."""
attribute_columns: list[str] = [
col.name for col in _get_columns(Jobs.__table__, None)
col.name for col in get_columns(Jobs.__table__, None)
]
attribute_columns_lower: list[str] = [col.lower() for col in attribute_columns]

Expand Down
Empty file.
69 changes: 69 additions & 0 deletions diracx-logic/src/diracx/logic/pilots/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from __future__ import annotations

import logging

from diracx.core.models import LogMessage, ScalarSearchOperator, ScalarSearchSpec
from diracx.db.os.pilot_logs import PilotLogsDB, bulk_insert, search_message
from diracx.db.sql.pilot_agents.db import PilotAgentsDB

logger = logging.getLogger(__name__)


async def send_message(
data: LogMessage,
pilot_logs_db: PilotLogsDB,
pilot_agents_db: PilotAgentsDB,
) -> int:

# get the pilot ID corresponding to a given pilot stamp, expecting exactly one row:
search_params = ScalarSearchSpec(
parameter="PilotStamp",
operator=ScalarSearchOperator.EQUAL,
value=data.pilot_stamp,
)

total, result = await pilot_agents_db.search(
["PilotID", "VO", "SubmissionTime"], [search_params], []
)
if total != 1:
logger.error(

Check warning on line 29 in diracx-logic/src/diracx/logic/pilots/logging.py

View check run for this annotation

Codecov / codecov/patch

diracx-logic/src/diracx/logic/pilots/logging.py#L28-L29

Added lines #L28 - L29 were not covered by tests
"Cannot determine PilotID for requested PilotStamp: %r, (%d candidates)",
data.pilot_stamp,
total,
)
raise Exception(f"Number of rows !=1 {total}")

Check warning on line 34 in diracx-logic/src/diracx/logic/pilots/logging.py

View check run for this annotation

Codecov / codecov/patch

diracx-logic/src/diracx/logic/pilots/logging.py#L34

Added line #L34 was not covered by tests

pilot_id, vo, submission_time = (

Check warning on line 36 in diracx-logic/src/diracx/logic/pilots/logging.py

View check run for this annotation

Codecov / codecov/patch

diracx-logic/src/diracx/logic/pilots/logging.py#L36

Added line #L36 was not covered by tests
result[0]["PilotID"],
result[0]["VO"],
result[0]["SubmissionTime"],
)
docs = []
for line in data.lines:
docs.append(

Check warning on line 43 in diracx-logic/src/diracx/logic/pilots/logging.py

View check run for this annotation

Codecov / codecov/patch

diracx-logic/src/diracx/logic/pilots/logging.py#L41-L43

Added lines #L41 - L43 were not covered by tests
{
"PilotStamp": data.pilot_stamp,
"PilotID": pilot_id,
"SubmissionTime": submission_time,
"VO": vo,
"LineNumber": line.line_no,
"Message": line.line,
}
)
# bulk insert pilot logs to OpenSearch DB:
await bulk_insert(pilot_logs_db, docs, pilot_id)

Check warning on line 54 in diracx-logic/src/diracx/logic/pilots/logging.py

View check run for this annotation

Codecov / codecov/patch

diracx-logic/src/diracx/logic/pilots/logging.py#L54

Added line #L54 was not covered by tests
return pilot_id


async def get_logs(
pilot_id: int,
db: PilotLogsDB,
) -> list[dict]:

search_params = [{"parameter": "PilotID", "operator": "eq", "value": pilot_id}]

result = await search_message(db, search_params)

if not result:
return [{"Message": f"No logs for pilot ID = {pilot_id}"}]

Check warning on line 68 in diracx-logic/src/diracx/logic/pilots/logging.py

View check run for this annotation

Codecov / codecov/patch

diracx-logic/src/diracx/logic/pilots/logging.py#L68

Added line #L68 was not covered by tests
return result
2 changes: 2 additions & 0 deletions diracx-routers/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ types = [
]

[project.entry-points."diracx.services"]
pilots = "diracx.routers.pilots:router"
jobs = "diracx.routers.jobs:router"
config = "diracx.routers.configuration:router"
auth = "diracx.routers.auth:router"
Expand All @@ -49,6 +50,7 @@ auth = "diracx.routers.auth:router"
[project.entry-points."diracx.access_policies"]
WMSAccessPolicy = "diracx.routers.jobs.access_policies:WMSAccessPolicy"
SandboxAccessPolicy = "diracx.routers.jobs.access_policies:SandboxAccessPolicy"
PilotLogsAccessPolicy = "diracx.routers.pilots.access_policies:PilotLogsAccessPolicy"

# Minimum version of the client supported
[project.entry-points."diracx.min_client_version"]
Expand Down
Loading
Loading