Skip to content

Commit

Permalink
Integrate fastapi-sqla: bug fixes for dramatiq build tasks (AlmaLinux…
Browse files Browse the repository at this point in the history
  • Loading branch information
bklvsky committed Apr 10, 2024
1 parent b5a522c commit 4e1e268
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 28 deletions.
14 changes: 5 additions & 9 deletions alws/dramatiq/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,7 @@
move_issues,
set_build_id_to_issues,
)
from alws.utils.fastapi_sqla_setup import (
sync_setup,
async_setup,
DEFAULT_SESSION_KEY
)
from alws.utils.fastapi_sqla_setup import setup_all

__all__ = ['start_build', 'build_done']

Expand Down Expand Up @@ -78,9 +74,10 @@ async def _start_build(build_id: int, build_request: build_schema.BuildCreate):
))
module_build_index = {}

await setup_all()
if has_modules:
# with SyncSession() as db, db.begin():
sync_setup(DEFAULT_SESSION_KEY)

with open_session() as db:
platforms = (
db.execute(
Expand Down Expand Up @@ -109,7 +106,6 @@ async def _start_build(build_id: int, build_request: build_schema.BuildCreate):
# db.commit()
# db.close()

await async_setup("async")
async with open_async_session(key="async") as db:
# async with db.begin():
build = await fetch_build(db, build_id)
Expand Down Expand Up @@ -170,8 +166,8 @@ async def _start_build(build_id: int, build_request: build_schema.BuildCreate):

async def _build_done(request: build_node_schema.BuildDone):
# async for db in get_db():
await async_setup("async")
async for db in open_async_session(key="async"):
await setup_all()
async with open_async_session(key="async") as db:
try:
await build_node_crud.safe_build_done(db, request)
except Exception as e:
Expand Down
16 changes: 14 additions & 2 deletions alws/dramatiq/errata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,21 @@
from alws.constants import DRAMATIQ_TASK_TIMEOUT
from alws.crud.errata import bulk_errata_records_release, release_errata_record
from alws.dramatiq import event_loop
from alws.utils.fastapi_sqla_setup import setup_all

__all__ = ["release_errata"]

async def _release_errata_record(record_id: str, platform_id: int, force: bool):
await setup_all()
await release_errata_record(
record_id,
platform_id,
force,
)

async def _bulk_errata_records_release(records_ids: typing.List[str]):
await setup_all()
await bulk_errata_records_release(records_ids)

@dramatiq.actor(
max_retries=0,
Expand All @@ -17,7 +29,7 @@
)
def release_errata(record_id: str, platform_id: int, force: bool):
event_loop.run_until_complete(
release_errata_record(
_release_errata_record(
record_id,
platform_id,
force,
Expand All @@ -32,4 +44,4 @@ def release_errata(record_id: str, platform_id: int, force: bool):
time_limit=DRAMATIQ_TASK_TIMEOUT,
)
def bulk_errata_release(records_ids: typing.List[str]):
event_loop.run_until_complete(bulk_errata_records_release(records_ids))
event_loop.run_until_complete(_bulk_errata_records_release(records_ids))
4 changes: 2 additions & 2 deletions alws/dramatiq/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from alws.dramatiq import event_loop
from alws.utils.log_utils import setup_logger
from alws.utils.pulp_client import PulpClient
from alws.utils.fastapi_sqla_setup import async_setup
from alws.utils.fastapi_sqla_setup import setup_all

__all__ = ['perform_product_modification']

Expand Down Expand Up @@ -276,8 +276,8 @@ async def _perform_product_modification(
build_id,
product_id,
)
await setup_all()
# async with Session() as db, db.begin():
await async_setup("async")
async with open_async_session(key="async") as db:
db_product = (
(
Expand Down
6 changes: 3 additions & 3 deletions alws/dramatiq/releases.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,21 @@
from alws.crud import release as r_crud
from alws.dramatiq import event_loop
# from alws.dependencies import get_db
from alws.utils.fastapi_sqla_setup import async_setup
from alws.utils.fastapi_sqla_setup import setup_all


__all__ = ["execute_release_plan"]


async def _commit_release(release_id, user_id):
await setup_all()
# async with asynccontextmanager(get_db)() as db:
await async_setup("async")
async with open_async_session(key="async") as db:
await r_crud.commit_release(db, release_id, user_id)


async def _revert_release(release_id, user_id):
await async_setup("async")
await setup_all()
async with open_async_session(key="async") as db:
await r_crud.revert_release(db, release_id, user_id)

Expand Down
3 changes: 3 additions & 0 deletions alws/dramatiq/sign_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@
from alws.dramatiq import event_loop
from alws.schemas import sign_schema

from alws.utils.fastapi_sqla_setup import setup_all

__all__ = ['complete_sign_task']


async def _complete_sign_task(
task_id: int, payload: typing.Dict[str, typing.Any]
):
await setup_all()
await sign_task.complete_sign_task(
task_id, sign_schema.SignTaskComplete(**payload)
)
Expand Down
4 changes: 2 additions & 2 deletions alws/dramatiq/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
# from alws.database import Session
from alws.dramatiq import event_loop
from alws.schemas.test_schema import TestTaskResult
from alws.utils.fastapi_sqla_setup import async_setup
from alws.utils.fastapi_sqla_setup import setup_all


__all__ = ['complete_test_task']


async def _complete_test_task(task_id: int, task_result: TestTaskResult):
await async_setup("async")
await setup_all()
async with open_async_session(key="async") as db:
try:
logging.info('Start processing test task %s', task_id)
Expand Down
5 changes: 3 additions & 2 deletions alws/dramatiq/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@
from alws.crud import build as build_crud
# from alws.dependencies import get_db
from alws.dramatiq import event_loop
from alws.utils.fastapi_sqla_setup import setup_all

__all__ = ['perform_user_removal']


async def _perform_user_removal(user_id: int):
await async_setup("async")
async for db in open_async_session(key="async"):
await setup_all()
async with open_async_session(key="async") as db:
# Remove builds
build_ids = (await db.execute(
select(models.Build.id).where(
Expand Down
19 changes: 11 additions & 8 deletions alws/utils/fastapi_sqla_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@
app = FastAPI()
setup(app)

sync_keys = ['pulp', DEFAULT_SESSION_KEY]
async_keys = ['async']

async def setup_all():
sync_setup()
await async_setup()

async def async_setup(*args):
if not args:
return await async_startup(DEFAULT_SESSION_KEY)
for arg in args:
await async_startup(arg)
for key in async_keys:
await async_startup(key)

def sync_setup(*args):
if not args:
return startup(DEFAULT_SESSION_KEY)
for arg in args:
startup(arg)
for key in sync_keys:
startup(key)
1 change: 1 addition & 0 deletions alws/utils/pulp_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ def get_rpm_packages_by_ids(
) -> typing.Dict[str, RpmPackage]:
result = {}
with open_session(key="pulp") as pulp_db:
pulp_db.expire_on_commit = False
pulp_pkgs = (
pulp_db.execute(
select(RpmPackage)
Expand Down

0 comments on commit 4e1e268

Please sign in to comment.