From 4e1e268fce6f94788977e7b7d2abfa79adcb4fa1 Mon Sep 17 00:00:00 2001 From: bklvsky Date: Fri, 5 Apr 2024 17:21:33 +0000 Subject: [PATCH] Integrate fastapi-sqla: bug fixes for dramatiq build tasks (AlmaLinux/build-system#230) --- alws/dramatiq/build.py | 14 +++++--------- alws/dramatiq/errata.py | 16 ++++++++++++++-- alws/dramatiq/products.py | 4 ++-- alws/dramatiq/releases.py | 6 +++--- alws/dramatiq/sign_task.py | 3 +++ alws/dramatiq/tests.py | 4 ++-- alws/dramatiq/user.py | 5 +++-- alws/utils/fastapi_sqla_setup.py | 19 +++++++++++-------- alws/utils/pulp_utils.py | 1 + 9 files changed, 44 insertions(+), 28 deletions(-) diff --git a/alws/dramatiq/build.py b/alws/dramatiq/build.py index 370b268f3..781c05378 100644 --- a/alws/dramatiq/build.py +++ b/alws/dramatiq/build.py @@ -39,11 +39,7 @@ move_issues, set_build_id_to_issues, ) -from alws.utils.fastapi_sqla_setup import ( - sync_setup, - async_setup, - DEFAULT_SESSION_KEY -) +from alws.utils.fastapi_sqla_setup import setup_all __all__ = ['start_build', 'build_done'] @@ -78,9 +74,10 @@ async def _start_build(build_id: int, build_request: build_schema.BuildCreate): )) module_build_index = {} + await setup_all() if has_modules: # with SyncSession() as db, db.begin(): - sync_setup(DEFAULT_SESSION_KEY) + with open_session() as db: platforms = ( db.execute( @@ -109,7 +106,6 @@ async def _start_build(build_id: int, build_request: build_schema.BuildCreate): # db.commit() # db.close() - await async_setup("async") async with open_async_session(key="async") as db: # async with db.begin(): build = await fetch_build(db, build_id) @@ -170,8 +166,8 @@ async def _start_build(build_id: int, build_request: build_schema.BuildCreate): async def _build_done(request: build_node_schema.BuildDone): # async for db in get_db(): - await async_setup("async") - async for db in open_async_session(key="async"): + await setup_all() + async with open_async_session(key="async") as db: try: await build_node_crud.safe_build_done(db, request) except Exception as e: diff --git a/alws/dramatiq/errata.py b/alws/dramatiq/errata.py index ed31d3865..4d64a8135 100644 --- a/alws/dramatiq/errata.py +++ b/alws/dramatiq/errata.py @@ -5,9 +5,21 @@ from alws.constants import DRAMATIQ_TASK_TIMEOUT from alws.crud.errata import bulk_errata_records_release, release_errata_record from alws.dramatiq import event_loop +from alws.utils.fastapi_sqla_setup import setup_all __all__ = ["release_errata"] +async def _release_errata_record(record_id: str, platform_id: int, force: bool): + await setup_all() + await release_errata_record( + record_id, + platform_id, + force, + ) + +async def _bulk_errata_records_release(records_ids: typing.List[str]): + await setup_all() + await bulk_errata_records_release(records_ids) @dramatiq.actor( max_retries=0, @@ -17,7 +29,7 @@ ) def release_errata(record_id: str, platform_id: int, force: bool): event_loop.run_until_complete( - release_errata_record( + _release_errata_record( record_id, platform_id, force, @@ -32,4 +44,4 @@ def release_errata(record_id: str, platform_id: int, force: bool): time_limit=DRAMATIQ_TASK_TIMEOUT, ) def bulk_errata_release(records_ids: typing.List[str]): - event_loop.run_until_complete(bulk_errata_records_release(records_ids)) + event_loop.run_until_complete(_bulk_errata_records_release(records_ids)) diff --git a/alws/dramatiq/products.py b/alws/dramatiq/products.py index eb4309095..82f34f9aa 100644 --- a/alws/dramatiq/products.py +++ b/alws/dramatiq/products.py @@ -16,7 +16,7 @@ from alws.dramatiq import event_loop from alws.utils.log_utils import setup_logger from alws.utils.pulp_client import PulpClient -from alws.utils.fastapi_sqla_setup import async_setup +from alws.utils.fastapi_sqla_setup import setup_all __all__ = ['perform_product_modification'] @@ -276,8 +276,8 @@ async def _perform_product_modification( build_id, product_id, ) + await setup_all() # async with Session() as db, db.begin(): - await async_setup("async") async with open_async_session(key="async") as db: db_product = ( ( diff --git a/alws/dramatiq/releases.py b/alws/dramatiq/releases.py index b12913aac..a3a4a7536 100644 --- a/alws/dramatiq/releases.py +++ b/alws/dramatiq/releases.py @@ -7,21 +7,21 @@ from alws.crud import release as r_crud from alws.dramatiq import event_loop # from alws.dependencies import get_db -from alws.utils.fastapi_sqla_setup import async_setup +from alws.utils.fastapi_sqla_setup import setup_all __all__ = ["execute_release_plan"] async def _commit_release(release_id, user_id): + await setup_all() # async with asynccontextmanager(get_db)() as db: - await async_setup("async") async with open_async_session(key="async") as db: await r_crud.commit_release(db, release_id, user_id) async def _revert_release(release_id, user_id): - await async_setup("async") + await setup_all() async with open_async_session(key="async") as db: await r_crud.revert_release(db, release_id, user_id) diff --git a/alws/dramatiq/sign_task.py b/alws/dramatiq/sign_task.py index bb387cca6..885fc3e54 100644 --- a/alws/dramatiq/sign_task.py +++ b/alws/dramatiq/sign_task.py @@ -7,12 +7,15 @@ from alws.dramatiq import event_loop from alws.schemas import sign_schema +from alws.utils.fastapi_sqla_setup import setup_all + __all__ = ['complete_sign_task'] async def _complete_sign_task( task_id: int, payload: typing.Dict[str, typing.Any] ): + await setup_all() await sign_task.complete_sign_task( task_id, sign_schema.SignTaskComplete(**payload) ) diff --git a/alws/dramatiq/tests.py b/alws/dramatiq/tests.py index f3dc9aa1a..2dff0e78f 100644 --- a/alws/dramatiq/tests.py +++ b/alws/dramatiq/tests.py @@ -9,14 +9,14 @@ # from alws.database import Session from alws.dramatiq import event_loop from alws.schemas.test_schema import TestTaskResult -from alws.utils.fastapi_sqla_setup import async_setup +from alws.utils.fastapi_sqla_setup import setup_all __all__ = ['complete_test_task'] async def _complete_test_task(task_id: int, task_result: TestTaskResult): - await async_setup("async") + await setup_all() async with open_async_session(key="async") as db: try: logging.info('Start processing test task %s', task_id) diff --git a/alws/dramatiq/user.py b/alws/dramatiq/user.py index 2c24458ad..b7640125f 100644 --- a/alws/dramatiq/user.py +++ b/alws/dramatiq/user.py @@ -9,13 +9,14 @@ from alws.crud import build as build_crud # from alws.dependencies import get_db from alws.dramatiq import event_loop +from alws.utils.fastapi_sqla_setup import setup_all __all__ = ['perform_user_removal'] async def _perform_user_removal(user_id: int): - await async_setup("async") - async for db in open_async_session(key="async"): + await setup_all() + async with open_async_session(key="async") as db: # Remove builds build_ids = (await db.execute( select(models.Build.id).where( diff --git a/alws/utils/fastapi_sqla_setup.py b/alws/utils/fastapi_sqla_setup.py index 8ffecd9cc..5da783528 100644 --- a/alws/utils/fastapi_sqla_setup.py +++ b/alws/utils/fastapi_sqla_setup.py @@ -8,14 +8,17 @@ app = FastAPI() setup(app) +sync_keys = ['pulp', DEFAULT_SESSION_KEY] +async_keys = ['async'] + +async def setup_all(): + sync_setup() + await async_setup() + async def async_setup(*args): - if not args: - return await async_startup(DEFAULT_SESSION_KEY) - for arg in args: - await async_startup(arg) + for key in async_keys: + await async_startup(key) def sync_setup(*args): - if not args: - return startup(DEFAULT_SESSION_KEY) - for arg in args: - startup(arg) \ No newline at end of file + for key in sync_keys: + startup(key) \ No newline at end of file diff --git a/alws/utils/pulp_utils.py b/alws/utils/pulp_utils.py index 9dfd49b62..24e314426 100644 --- a/alws/utils/pulp_utils.py +++ b/alws/utils/pulp_utils.py @@ -216,6 +216,7 @@ def get_rpm_packages_by_ids( ) -> typing.Dict[str, RpmPackage]: result = {} with open_session(key="pulp") as pulp_db: + pulp_db.expire_on_commit = False pulp_pkgs = ( pulp_db.execute( select(RpmPackage)