From 25e17a53c59ba8dce9f1c53ad1cdc2cff7048474 Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Tue, 9 Apr 2024 14:31:54 -0700 Subject: [PATCH 01/15] use download queue --- jupyter_scheduler/download_manager.py | 129 ++++++++++++++++++++++++++ jupyter_scheduler/download_runner.py | 72 ++++++++++++++ jupyter_scheduler/executors.py | 42 ++++++++- jupyter_scheduler/extension.py | 14 +++ jupyter_scheduler/handlers.py | 23 +++-- jupyter_scheduler/orm.py | 8 +- jupyter_scheduler/scheduler.py | 4 + 7 files changed, 283 insertions(+), 9 deletions(-) create mode 100644 jupyter_scheduler/download_manager.py create mode 100644 jupyter_scheduler/download_runner.py diff --git a/jupyter_scheduler/download_manager.py b/jupyter_scheduler/download_manager.py new file mode 100644 index 000000000..f70552135 --- /dev/null +++ b/jupyter_scheduler/download_manager.py @@ -0,0 +1,129 @@ +from dataclasses import dataclass +from datetime import datetime +from multiprocessing import Queue +from typing import List, Optional + +from jupyter_scheduler.orm import DownloadCacheRecord, create_session, generate_uuid +from jupyter_scheduler.utils import get_utc_timestamp +from jupyter_scheduler.pydantic_v1 import BaseModel + + +class DescribeDownloadCache(BaseModel): + job_id: str + download_id: str + download_initiated_time: int + + class Config: + orm_mode = True + + +@dataclass +class DownloadTask: + job_id: str + download_id: str + download_initiated_time: int + + def __lt__(self, other): + return self.download_initiated_time < other.download_initiated_time + + def __str__(self): + download_initiated_time = datetime.fromtimestamp(self.download_initiated_time / 1e3) + return f"Id: {self.job_id}, Download initiated: {download_initiated_time}" + + +class MultiprocessQueue: + """A multiprocess-safe queue using multiprocessing.Queue()""" + + def __init__(self): + self.queue = Queue() + + def put(self, download: DownloadTask): + self.queue.put(download) + + def get(self) -> Optional[DownloadTask]: + return self.queue.get() if not self.queue.empty() else None + + def isempty(self) -> bool: + return self.queue.empty() + + +class DownloadCache: + def __init__(self, db_url): + self.session = create_session(db_url) + + def put(self, download: DescribeDownloadCache): + with self.session() as session: + new_download = DownloadCacheRecord(**download.dict()) + session.add(new_download) + session.commit() + + def get(self, job_id: str) -> Optional[DescribeDownloadCache]: + with self.session() as session: + download = ( + session.query(DownloadCacheRecord) + .filter(DownloadCacheRecord.job_id == job_id) + .first() + ) + + if download: + return DescribeDownloadCache.from_orm(download) + else: + return None + + def get_tasks(self) -> List[DescribeDownloadCache]: + with self.session() as session: + return ( + session.query(DownloadCacheRecord) + .order_by(DownloadCacheRecord.download_initiated_time) + .all() + ) + + def delete_download(self, download_id: str): + with self.session() as session: + session.query(DownloadCacheRecord).filter( + DownloadCacheRecord.download_id == download_id + ).delete() + session.commit() + + def delete_job_downloads(self, job_id: str): + with self.session() as session: + session.query(DownloadCacheRecord).filter(DownloadCacheRecord.job_id == job_id).delete() + session.commit() + + +class DownloadManager: + def __init__(self, db_url: str): + self.cache = DownloadCache(db_url=db_url) + self.queue = MultiprocessQueue() + + def download_from_staging(self, job_id: str): + download_initiated_time = get_utc_timestamp() + download_id = generate_uuid() + download_cache = DescribeDownloadCache( + job_id=job_id, + download_id=download_id, + download_initiated_time=download_initiated_time, + ) + self.cache.put(download_cache) + download_task = DownloadTask( + job_id=job_id, + download_id=download_id, + download_initiated_time=download_initiated_time, + ) + self.queue.put(download_task) + + def delete_download(self, download_id: str): + self.cache.delete_download(download_id) + + def delete_job_downloads(self, job_id: str): + self.cache.delete_job_downloads(job_id) + + def populate_queue(self): + tasks = self.cache.get_tasks() + for task in tasks: + download_task = DownloadTask( + job_id=task.job_id, + download_id=task.download_id, + download_initiated_time=task.download_initiated_time, + ) + self.queue.put(download_task) diff --git a/jupyter_scheduler/download_runner.py b/jupyter_scheduler/download_runner.py new file mode 100644 index 000000000..97029ea27 --- /dev/null +++ b/jupyter_scheduler/download_runner.py @@ -0,0 +1,72 @@ +import asyncio + +from jupyter_scheduler.download_manager import DownloadManager +from jupyter_scheduler.job_files_manager import JobFilesManager + + +class BaseDownloadRunner: + def start(self): + raise NotImplementedError("Must be implemented by subclass") + + +class DownloadRunner(BaseDownloadRunner): + def __init__( + self, + download_manager: DownloadManager, + job_files_manager: JobFilesManager, + poll_interval: int = 10, + ): + self.download_manager = download_manager + self.job_files_manager = job_files_manager + self.poll_interval = poll_interval + + # def add_download(self, job_id: str): + # download_initiated_time = get_utc_timestamp() + # download_id = generate_uuid() + # download_cache = DescribeDownloadCache( + # job_id=job_id, + # download_id=download_id, + # download_initiated_time=download_initiated_time, + # ) + # self.download_cache.put(download_cache) + # download_task = DownloadTask( + # job_id=job_id, + # download_id=download_id, + # download_initiated_time=download_initiated_time, + # ) + # self.download_queue.put(download_task) + + # def delete_download(self, download_id: str): + # self.download_cache.delete_download(download_id) + + # def delete_job_downloads(self, job_id: str): + # self.download_cache.delete_job_downloads(job_id) + + async def process_download_queue(self): + print("\n\n***\nDownloadRunner.process_download_queue isempty") + print(self.download_manager.queue.isempty()) + while not self.download_manager.queue.isempty(): + download = self.download_manager.queue.get() + print(download) + cache = self.download_manager.cache.get(download.job_id) + print(cache) + if not cache or not download: + continue + await self.job_files_manager.copy_from_staging(cache.job_id) + self.download_manager.cache.delete_download(cache.download_id) + + # def populate_queue(self): + # tasks = self.download_manager.cache.get_tasks() + # for task in tasks: + # download_task = DownloadTask( + # job_id=task.job_id, + # download_id=task.download_id, + # download_initiated_time=task.download_initiated_time, + # ) + # self.download_manager.queue.put(download_task) + + async def start(self): + self.download_manager.populate_queue() + while True: + await self.process_download_queue() + await asyncio.sleep(self.poll_interval) diff --git a/jupyter_scheduler/executors.py b/jupyter_scheduler/executors.py index 7e1a9974e..b21ff9e09 100644 --- a/jupyter_scheduler/executors.py +++ b/jupyter_scheduler/executors.py @@ -1,4 +1,5 @@ import io +import multiprocessing import os import shutil import tarfile @@ -13,6 +14,13 @@ from jupyter_scheduler.models import DescribeJob, JobFeature, Status from jupyter_scheduler.orm import Job, create_session +from jupyter_scheduler.download_manager import ( + DescribeDownloadCache, + DownloadCacheRecord, + DownloadTask, +) +from jupyter_scheduler.models import DescribeJob, JobFeature, Status +from jupyter_scheduler.orm import Job, create_session, generate_uuid from jupyter_scheduler.parameterize import add_parameters from jupyter_scheduler.utils import get_utc_timestamp @@ -29,11 +37,19 @@ class ExecutionManager(ABC): _model = None _db_session = None - def __init__(self, job_id: str, root_dir: str, db_url: str, staging_paths: Dict[str, str]): + def __init__( + self, + job_id: str, + root_dir: str, + db_url: str, + staging_paths: Dict[str, str], + download_queue, + ): self.job_id = job_id self.staging_paths = staging_paths self.root_dir = root_dir self.db_url = db_url + self.download_queue = download_queue @property def model(self): @@ -144,6 +160,30 @@ def execute(self): self.add_side_effects_files(staging_dir) self.create_output_files(job, nb) + def download_from_staging(self, job_id: str): + download_initiated_time = get_utc_timestamp() + download_id = generate_uuid() + download_cache = DescribeDownloadCache( + job_id=job_id, + download_id=download_id, + download_initiated_time=download_initiated_time, + ) + with self.db_session() as session: + new_download = DownloadCacheRecord(**download_cache.dict()) + session.add(new_download) + session.commit() + download_task = DownloadTask( + job_id=job_id, + download_id=download_id, + download_initiated_time=download_initiated_time, + ) + self.download_queue.put(download_task) + # print( + # "\n\n***\n ExecutionManager.download_from_staging uuid and task being put on a qeueue" + # ) + # print(download_id) + # print(download_task) + def add_side_effects_files(self, staging_dir: str): """Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files""" input_notebook = os.path.relpath(self.staging_paths["input"]) diff --git a/jupyter_scheduler/extension.py b/jupyter_scheduler/extension.py index 1a4ba3736..a87d05d4c 100644 --- a/jupyter_scheduler/extension.py +++ b/jupyter_scheduler/extension.py @@ -5,6 +5,8 @@ from jupyter_server.transutils import _i18n from traitlets import Bool, Type, Unicode, default +from jupyter_scheduler.download_manager import DownloadManager +from jupyter_scheduler.download_runner import DownloadRunner from jupyter_scheduler.orm import create_tables from .handlers import ( @@ -73,21 +75,33 @@ def initialize_settings(self): environments_manager = self.environment_manager_class() + download_manager = DownloadManager(db_url=self.db_url) + scheduler = self.scheduler_class( root_dir=self.serverapp.root_dir, environments_manager=environments_manager, db_url=self.db_url, config=self.config, + download_queue=download_manager.queue, ) job_files_manager = self.job_files_manager_class(scheduler=scheduler) + download_runner = DownloadRunner( + download_manager=download_manager, job_files_manager=job_files_manager + ) + self.settings.update( environments_manager=environments_manager, scheduler=scheduler, job_files_manager=job_files_manager, + download_from_staging=download_manager.download_from_staging, ) if scheduler.task_runner: loop = asyncio.get_event_loop() loop.create_task(scheduler.task_runner.start()) + + if download_runner: + loop = asyncio.get_event_loop() + loop.create_task(download_runner.start()) diff --git a/jupyter_scheduler/handlers.py b/jupyter_scheduler/handlers.py index 8e773b750..30d16c7ee 100644 --- a/jupyter_scheduler/handlers.py +++ b/jupyter_scheduler/handlers.py @@ -395,20 +395,29 @@ def get(self): class FilesDownloadHandler(ExtensionHandlerMixin, APIHandler): - _job_files_manager = None + # _job_files_manager = None + _download_from_staging = None + + # @property + # def job_files_manager(self): + # if not self._job_files_manager: + # self._job_files_manager = self.settings.get("job_files_manager", None) + + # return self._job_files_manager @property - def job_files_manager(self): - if not self._job_files_manager: - self._job_files_manager = self.settings.get("job_files_manager", None) + def download_from_staging(self): + if not self._download_from_staging: + self._download_from_staging = self.settings.get("download_from_staging", None) - return self._job_files_manager + return self._download_from_staging @authenticated async def get(self, job_id): - redownload = self.get_query_argument("redownload", False) + # redownload = self.get_query_argument("redownload", False) try: - await self.job_files_manager.copy_from_staging(job_id=job_id, redownload=redownload) + # await self.job_files_manager.copy_from_staging(job_id=job_id, redownload=redownload) + self.download_from_staging(job_id) except Exception as e: self.log.exception(e) raise HTTPError(500, str(e)) from e diff --git a/jupyter_scheduler/orm.py b/jupyter_scheduler/orm.py index 24a915b31..2ed1868e0 100644 --- a/jupyter_scheduler/orm.py +++ b/jupyter_scheduler/orm.py @@ -1,5 +1,4 @@ import json -import os from sqlite3 import OperationalError from uuid import uuid4 @@ -112,6 +111,13 @@ class JobDefinition(CommonColumns, Base): active = Column(Boolean, default=True) +class DownloadCacheRecord(Base): + __tablename__ = "download_cache" + job_id = Column(String(36), primary_key=True) + download_id = Column(String(36), primary_key=True) + download_initiated_time = Column(Integer) + + def create_tables(db_url, drop_tables=False): engine = create_engine(db_url) try: diff --git a/jupyter_scheduler/scheduler.py b/jupyter_scheduler/scheduler.py index 867034c60..ee57e2ed0 100644 --- a/jupyter_scheduler/scheduler.py +++ b/jupyter_scheduler/scheduler.py @@ -15,6 +15,7 @@ from traitlets import Unicode, default from traitlets.config import LoggingConfigurable +from jupyter_scheduler.download_manager import MultiprocessQueue from jupyter_scheduler.environments import EnvironmentManager from jupyter_scheduler.exceptions import ( IdempotencyTokenError, @@ -404,6 +405,7 @@ def __init__( root_dir: str, environments_manager: Type[EnvironmentManager], db_url: str, + download_queue: MultiprocessQueue, config=None, **kwargs, ): @@ -413,6 +415,7 @@ def __init__( self.db_url = db_url if self.task_runner_class: self.task_runner = self.task_runner_class(scheduler=self, config=config) + self.download_queue = download_queue @property def db_session(self): @@ -492,6 +495,7 @@ def create_job(self, model: CreateJob) -> str: staging_paths=staging_paths, root_dir=self.root_dir, db_url=self.db_url, + download_queue=self.download_queue.queue, ).process ) p.start() From 3fc2cc0d830e7263c14fa1d4385f559132627b31 Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Tue, 9 Apr 2024 23:09:13 -0700 Subject: [PATCH 02/15] rename Download tables, use mp.quque directly without a wrapper --- jupyter_scheduler/download_manager.py | 65 +++++++++------------------ jupyter_scheduler/download_runner.py | 44 ++---------------- jupyter_scheduler/executors.py | 13 ++---- jupyter_scheduler/orm.py | 4 +- jupyter_scheduler/scheduler.py | 5 +-- 5 files changed, 32 insertions(+), 99 deletions(-) diff --git a/jupyter_scheduler/download_manager.py b/jupyter_scheduler/download_manager.py index f70552135..4e484eadf 100644 --- a/jupyter_scheduler/download_manager.py +++ b/jupyter_scheduler/download_manager.py @@ -3,12 +3,13 @@ from multiprocessing import Queue from typing import List, Optional -from jupyter_scheduler.orm import DownloadCacheRecord, create_session, generate_uuid +from jupyter_scheduler.orm import Downloads, create_session, generate_uuid +from jupyter_scheduler.pydantic_v1 import BaseModel from jupyter_scheduler.utils import get_utc_timestamp from jupyter_scheduler.pydantic_v1 import BaseModel -class DescribeDownloadCache(BaseModel): +class DescribeDownload(BaseModel): job_id: str download_id: str download_initiated_time: int @@ -31,80 +32,54 @@ def __str__(self): return f"Id: {self.job_id}, Download initiated: {download_initiated_time}" -class MultiprocessQueue: - """A multiprocess-safe queue using multiprocessing.Queue()""" - - def __init__(self): - self.queue = Queue() - - def put(self, download: DownloadTask): - self.queue.put(download) - - def get(self) -> Optional[DownloadTask]: - return self.queue.get() if not self.queue.empty() else None - - def isempty(self) -> bool: - return self.queue.empty() - - -class DownloadCache: +class DownloadRecordManager: def __init__(self, db_url): self.session = create_session(db_url) - def put(self, download: DescribeDownloadCache): + def put(self, download: DescribeDownload): with self.session() as session: - new_download = DownloadCacheRecord(**download.dict()) + new_download = Downloads(**download.dict()) session.add(new_download) session.commit() - def get(self, job_id: str) -> Optional[DescribeDownloadCache]: + def get(self, job_id: str) -> Optional[DescribeDownload]: with self.session() as session: - download = ( - session.query(DownloadCacheRecord) - .filter(DownloadCacheRecord.job_id == job_id) - .first() - ) + download = session.query(Downloads).filter(Downloads.job_id == job_id).first() if download: - return DescribeDownloadCache.from_orm(download) + return DescribeDownload.from_orm(download) else: return None - def get_tasks(self) -> List[DescribeDownloadCache]: + def get_tasks(self) -> List[DescribeDownload]: with self.session() as session: - return ( - session.query(DownloadCacheRecord) - .order_by(DownloadCacheRecord.download_initiated_time) - .all() - ) + return session.query(Downloads).order_by(Downloads.download_initiated_time).all() def delete_download(self, download_id: str): with self.session() as session: - session.query(DownloadCacheRecord).filter( - DownloadCacheRecord.download_id == download_id - ).delete() + session.query(Downloads).filter(Downloads.download_id == download_id).delete() session.commit() def delete_job_downloads(self, job_id: str): with self.session() as session: - session.query(DownloadCacheRecord).filter(DownloadCacheRecord.job_id == job_id).delete() + session.query(Downloads).filter(Downloads.job_id == job_id).delete() session.commit() class DownloadManager: def __init__(self, db_url: str): - self.cache = DownloadCache(db_url=db_url) - self.queue = MultiprocessQueue() + self.record_manager = DownloadRecordManager(db_url=db_url) + self.queue = Queue() def download_from_staging(self, job_id: str): download_initiated_time = get_utc_timestamp() download_id = generate_uuid() - download_cache = DescribeDownloadCache( + download_cache = DescribeDownload( job_id=job_id, download_id=download_id, download_initiated_time=download_initiated_time, ) - self.cache.put(download_cache) + self.record_manager.put(download_cache) download_task = DownloadTask( job_id=job_id, download_id=download_id, @@ -113,13 +88,13 @@ def download_from_staging(self, job_id: str): self.queue.put(download_task) def delete_download(self, download_id: str): - self.cache.delete_download(download_id) + self.record_manager.delete_download(download_id) def delete_job_downloads(self, job_id: str): - self.cache.delete_job_downloads(job_id) + self.record_manager.delete_job_downloads(job_id) def populate_queue(self): - tasks = self.cache.get_tasks() + tasks = self.record_manager.get_tasks() for task in tasks: download_task = DownloadTask( job_id=task.job_id, diff --git a/jupyter_scheduler/download_runner.py b/jupyter_scheduler/download_runner.py index 97029ea27..b0da7e5fd 100644 --- a/jupyter_scheduler/download_runner.py +++ b/jupyter_scheduler/download_runner.py @@ -14,56 +14,20 @@ def __init__( self, download_manager: DownloadManager, job_files_manager: JobFilesManager, - poll_interval: int = 10, + poll_interval: int = 5, ): self.download_manager = download_manager self.job_files_manager = job_files_manager self.poll_interval = poll_interval - # def add_download(self, job_id: str): - # download_initiated_time = get_utc_timestamp() - # download_id = generate_uuid() - # download_cache = DescribeDownloadCache( - # job_id=job_id, - # download_id=download_id, - # download_initiated_time=download_initiated_time, - # ) - # self.download_cache.put(download_cache) - # download_task = DownloadTask( - # job_id=job_id, - # download_id=download_id, - # download_initiated_time=download_initiated_time, - # ) - # self.download_queue.put(download_task) - - # def delete_download(self, download_id: str): - # self.download_cache.delete_download(download_id) - - # def delete_job_downloads(self, job_id: str): - # self.download_cache.delete_job_downloads(job_id) - async def process_download_queue(self): - print("\n\n***\nDownloadRunner.process_download_queue isempty") - print(self.download_manager.queue.isempty()) - while not self.download_manager.queue.isempty(): + while not self.download_manager.queue.empty(): download = self.download_manager.queue.get() - print(download) - cache = self.download_manager.cache.get(download.job_id) - print(cache) + cache = self.download_manager.record_manager.get(download.job_id) if not cache or not download: continue await self.job_files_manager.copy_from_staging(cache.job_id) - self.download_manager.cache.delete_download(cache.download_id) - - # def populate_queue(self): - # tasks = self.download_manager.cache.get_tasks() - # for task in tasks: - # download_task = DownloadTask( - # job_id=task.job_id, - # download_id=task.download_id, - # download_initiated_time=task.download_initiated_time, - # ) - # self.download_manager.queue.put(download_task) + self.download_manager.record_manager.delete_download(cache.download_id) async def start(self): self.download_manager.populate_queue() diff --git a/jupyter_scheduler/executors.py b/jupyter_scheduler/executors.py index b21ff9e09..ea9b4476c 100644 --- a/jupyter_scheduler/executors.py +++ b/jupyter_scheduler/executors.py @@ -15,8 +15,8 @@ from jupyter_scheduler.models import DescribeJob, JobFeature, Status from jupyter_scheduler.orm import Job, create_session from jupyter_scheduler.download_manager import ( - DescribeDownloadCache, - DownloadCacheRecord, + DescribeDownload, + Downloads, DownloadTask, ) from jupyter_scheduler.models import DescribeJob, JobFeature, Status @@ -163,13 +163,13 @@ def execute(self): def download_from_staging(self, job_id: str): download_initiated_time = get_utc_timestamp() download_id = generate_uuid() - download_cache = DescribeDownloadCache( + download_cache = DescribeDownload( job_id=job_id, download_id=download_id, download_initiated_time=download_initiated_time, ) with self.db_session() as session: - new_download = DownloadCacheRecord(**download_cache.dict()) + new_download = Downloads(**download_cache.dict()) session.add(new_download) session.commit() download_task = DownloadTask( @@ -178,11 +178,6 @@ def download_from_staging(self, job_id: str): download_initiated_time=download_initiated_time, ) self.download_queue.put(download_task) - # print( - # "\n\n***\n ExecutionManager.download_from_staging uuid and task being put on a qeueue" - # ) - # print(download_id) - # print(download_task) def add_side_effects_files(self, staging_dir: str): """Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files""" diff --git a/jupyter_scheduler/orm.py b/jupyter_scheduler/orm.py index 2ed1868e0..853c55d47 100644 --- a/jupyter_scheduler/orm.py +++ b/jupyter_scheduler/orm.py @@ -111,8 +111,8 @@ class JobDefinition(CommonColumns, Base): active = Column(Boolean, default=True) -class DownloadCacheRecord(Base): - __tablename__ = "download_cache" +class Downloads(Base): + __tablename__ = "downloads" job_id = Column(String(36), primary_key=True) download_id = Column(String(36), primary_key=True) download_initiated_time = Column(Integer) diff --git a/jupyter_scheduler/scheduler.py b/jupyter_scheduler/scheduler.py index ee57e2ed0..fbf3f4984 100644 --- a/jupyter_scheduler/scheduler.py +++ b/jupyter_scheduler/scheduler.py @@ -15,7 +15,6 @@ from traitlets import Unicode, default from traitlets.config import LoggingConfigurable -from jupyter_scheduler.download_manager import MultiprocessQueue from jupyter_scheduler.environments import EnvironmentManager from jupyter_scheduler.exceptions import ( IdempotencyTokenError, @@ -405,7 +404,7 @@ def __init__( root_dir: str, environments_manager: Type[EnvironmentManager], db_url: str, - download_queue: MultiprocessQueue, + download_queue: mp.Queue, config=None, **kwargs, ): @@ -495,7 +494,7 @@ def create_job(self, model: CreateJob) -> str: staging_paths=staging_paths, root_dir=self.root_dir, db_url=self.db_url, - download_queue=self.download_queue.queue, + download_queue=self.download_queue, ).process ) p.start() From 7abdf27b7593a412a2dfd09c90612b8dc82350c1 Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Tue, 9 Apr 2024 23:27:01 -0700 Subject: [PATCH 03/15] Remove DownloadTask data class, use DescribeDownload for both queue and db records --- jupyter_scheduler/download_manager.py | 65 +++++++-------------------- jupyter_scheduler/executors.py | 23 +++------- jupyter_scheduler/models.py | 12 +++++ jupyter_scheduler/orm.py | 2 +- 4 files changed, 35 insertions(+), 67 deletions(-) diff --git a/jupyter_scheduler/download_manager.py b/jupyter_scheduler/download_manager.py index 4e484eadf..3d9e571ed 100644 --- a/jupyter_scheduler/download_manager.py +++ b/jupyter_scheduler/download_manager.py @@ -1,68 +1,43 @@ -from dataclasses import dataclass -from datetime import datetime from multiprocessing import Queue from typing import List, Optional -from jupyter_scheduler.orm import Downloads, create_session, generate_uuid -from jupyter_scheduler.pydantic_v1 import BaseModel +from jupyter_scheduler.models import DescribeDownload +from jupyter_scheduler.orm import Download, create_session, generate_uuid from jupyter_scheduler.utils import get_utc_timestamp from jupyter_scheduler.pydantic_v1 import BaseModel -class DescribeDownload(BaseModel): - job_id: str - download_id: str - download_initiated_time: int - - class Config: - orm_mode = True - - -@dataclass -class DownloadTask: - job_id: str - download_id: str - download_initiated_time: int - - def __lt__(self, other): - return self.download_initiated_time < other.download_initiated_time - - def __str__(self): - download_initiated_time = datetime.fromtimestamp(self.download_initiated_time / 1e3) - return f"Id: {self.job_id}, Download initiated: {download_initiated_time}" - - class DownloadRecordManager: def __init__(self, db_url): self.session = create_session(db_url) def put(self, download: DescribeDownload): with self.session() as session: - new_download = Downloads(**download.dict()) - session.add(new_download) + download = Download(**download.dict()) + session.add(download) session.commit() def get(self, job_id: str) -> Optional[DescribeDownload]: with self.session() as session: - download = session.query(Downloads).filter(Downloads.job_id == job_id).first() + download = session.query(Download).filter(Download.job_id == job_id).first() if download: return DescribeDownload.from_orm(download) else: return None - def get_tasks(self) -> List[DescribeDownload]: + def get_downloads(self) -> List[DescribeDownload]: with self.session() as session: - return session.query(Downloads).order_by(Downloads.download_initiated_time).all() + return session.query(Download).order_by(Download.download_initiated_time).all() def delete_download(self, download_id: str): with self.session() as session: - session.query(Downloads).filter(Downloads.download_id == download_id).delete() + session.query(Download).filter(Download.download_id == download_id).delete() session.commit() def delete_job_downloads(self, job_id: str): with self.session() as session: - session.query(Downloads).filter(Downloads.job_id == job_id).delete() + session.query(Download).filter(Download.job_id == job_id).delete() session.commit() @@ -74,18 +49,13 @@ def __init__(self, db_url: str): def download_from_staging(self, job_id: str): download_initiated_time = get_utc_timestamp() download_id = generate_uuid() - download_cache = DescribeDownload( - job_id=job_id, - download_id=download_id, - download_initiated_time=download_initiated_time, - ) - self.record_manager.put(download_cache) - download_task = DownloadTask( + download = DescribeDownload( job_id=job_id, download_id=download_id, download_initiated_time=download_initiated_time, ) - self.queue.put(download_task) + self.record_manager.put(download) + self.queue.put(download) def delete_download(self, download_id: str): self.record_manager.delete_download(download_id) @@ -94,11 +64,6 @@ def delete_job_downloads(self, job_id: str): self.record_manager.delete_job_downloads(job_id) def populate_queue(self): - tasks = self.record_manager.get_tasks() - for task in tasks: - download_task = DownloadTask( - job_id=task.job_id, - download_id=task.download_id, - download_initiated_time=task.download_initiated_time, - ) - self.queue.put(download_task) + downloads = self.record_manager.get_downloads() + for download in downloads: + self.queue.put(download) diff --git a/jupyter_scheduler/executors.py b/jupyter_scheduler/executors.py index ea9b4476c..91ef17465 100644 --- a/jupyter_scheduler/executors.py +++ b/jupyter_scheduler/executors.py @@ -1,5 +1,4 @@ import io -import multiprocessing import os import shutil import tarfile @@ -14,11 +13,7 @@ from jupyter_scheduler.models import DescribeJob, JobFeature, Status from jupyter_scheduler.orm import Job, create_session -from jupyter_scheduler.download_manager import ( - DescribeDownload, - Downloads, - DownloadTask, -) +from jupyter_scheduler.download_manager import DescribeDownload, Download from jupyter_scheduler.models import DescribeJob, JobFeature, Status from jupyter_scheduler.orm import Job, create_session, generate_uuid from jupyter_scheduler.parameterize import add_parameters @@ -159,25 +154,21 @@ def execute(self): finally: self.add_side_effects_files(staging_dir) self.create_output_files(job, nb) + self._download_from_staging(job.job_id) - def download_from_staging(self, job_id: str): + def _download_from_staging(self, job_id: str): download_initiated_time = get_utc_timestamp() download_id = generate_uuid() - download_cache = DescribeDownload( + download = DescribeDownload( job_id=job_id, download_id=download_id, download_initiated_time=download_initiated_time, ) with self.db_session() as session: - new_download = Downloads(**download_cache.dict()) - session.add(new_download) + download_record = Download(**download.dict()) + session.add(download_record) session.commit() - download_task = DownloadTask( - job_id=job_id, - download_id=download_id, - download_initiated_time=download_initiated_time, - ) - self.download_queue.put(download_task) + self.download_queue.put(download) def add_side_effects_files(self, staging_dir: str): """Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files""" diff --git a/jupyter_scheduler/models.py b/jupyter_scheduler/models.py index 38e240e0e..65f0ec4fe 100644 --- a/jupyter_scheduler/models.py +++ b/jupyter_scheduler/models.py @@ -295,3 +295,15 @@ class JobFeature(str, Enum): output_filename_template = "output_filename_template" stop_job = "stop_job" delete_job = "delete_job" + + +class DescribeDownload(BaseModel): + job_id: str + download_id: str + download_initiated_time: int + + class Config: + orm_mode = True + + def __str__(self) -> str: + return self.json() diff --git a/jupyter_scheduler/orm.py b/jupyter_scheduler/orm.py index 853c55d47..2c06d9f32 100644 --- a/jupyter_scheduler/orm.py +++ b/jupyter_scheduler/orm.py @@ -111,7 +111,7 @@ class JobDefinition(CommonColumns, Base): active = Column(Boolean, default=True) -class Downloads(Base): +class Download(Base): __tablename__ = "downloads" job_id = Column(String(36), primary_key=True) download_id = Column(String(36), primary_key=True) From 56f095d321150888816b7d5fcc713f92e5525833 Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Tue, 9 Apr 2024 23:51:35 -0700 Subject: [PATCH 04/15] propagate redownload, use download_id in get download --- jupyter_scheduler/download_manager.py | 7 ++++--- jupyter_scheduler/download_runner.py | 8 ++++---- jupyter_scheduler/executors.py | 1 + jupyter_scheduler/handlers.py | 13 ++----------- jupyter_scheduler/models.py | 1 + jupyter_scheduler/orm.py | 1 + 6 files changed, 13 insertions(+), 18 deletions(-) diff --git a/jupyter_scheduler/download_manager.py b/jupyter_scheduler/download_manager.py index 3d9e571ed..f9623b80a 100644 --- a/jupyter_scheduler/download_manager.py +++ b/jupyter_scheduler/download_manager.py @@ -17,9 +17,9 @@ def put(self, download: DescribeDownload): session.add(download) session.commit() - def get(self, job_id: str) -> Optional[DescribeDownload]: + def get(self, download_id: str) -> Optional[DescribeDownload]: with self.session() as session: - download = session.query(Download).filter(Download.job_id == job_id).first() + download = session.query(Download).filter(Download.download_id == download_id).first() if download: return DescribeDownload.from_orm(download) @@ -46,13 +46,14 @@ def __init__(self, db_url: str): self.record_manager = DownloadRecordManager(db_url=db_url) self.queue = Queue() - def download_from_staging(self, job_id: str): + def download_from_staging(self, job_id: str, redownload: bool): download_initiated_time = get_utc_timestamp() download_id = generate_uuid() download = DescribeDownload( job_id=job_id, download_id=download_id, download_initiated_time=download_initiated_time, + redownload=redownload, ) self.record_manager.put(download) self.queue.put(download) diff --git a/jupyter_scheduler/download_runner.py b/jupyter_scheduler/download_runner.py index b0da7e5fd..448c417f7 100644 --- a/jupyter_scheduler/download_runner.py +++ b/jupyter_scheduler/download_runner.py @@ -23,11 +23,11 @@ def __init__( async def process_download_queue(self): while not self.download_manager.queue.empty(): download = self.download_manager.queue.get() - cache = self.download_manager.record_manager.get(download.job_id) - if not cache or not download: + download_record = self.download_manager.record_manager.get(download.download_id) + if not download_record: continue - await self.job_files_manager.copy_from_staging(cache.job_id) - self.download_manager.record_manager.delete_download(cache.download_id) + await self.job_files_manager.copy_from_staging(download.job_id, download.redownload) + self.download_manager.delete_download(download.download_id) async def start(self): self.download_manager.populate_queue() diff --git a/jupyter_scheduler/executors.py b/jupyter_scheduler/executors.py index 91ef17465..7cd9a35d2 100644 --- a/jupyter_scheduler/executors.py +++ b/jupyter_scheduler/executors.py @@ -163,6 +163,7 @@ def _download_from_staging(self, job_id: str): job_id=job_id, download_id=download_id, download_initiated_time=download_initiated_time, + redownload=True, ) with self.db_session() as session: download_record = Download(**download.dict()) diff --git a/jupyter_scheduler/handlers.py b/jupyter_scheduler/handlers.py index 30d16c7ee..82fb48249 100644 --- a/jupyter_scheduler/handlers.py +++ b/jupyter_scheduler/handlers.py @@ -395,16 +395,8 @@ def get(self): class FilesDownloadHandler(ExtensionHandlerMixin, APIHandler): - # _job_files_manager = None _download_from_staging = None - # @property - # def job_files_manager(self): - # if not self._job_files_manager: - # self._job_files_manager = self.settings.get("job_files_manager", None) - - # return self._job_files_manager - @property def download_from_staging(self): if not self._download_from_staging: @@ -414,10 +406,9 @@ def download_from_staging(self): @authenticated async def get(self, job_id): - # redownload = self.get_query_argument("redownload", False) + redownload = self.get_query_argument("redownload", False) try: - # await self.job_files_manager.copy_from_staging(job_id=job_id, redownload=redownload) - self.download_from_staging(job_id) + self.download_from_staging(job_id, redownload) except Exception as e: self.log.exception(e) raise HTTPError(500, str(e)) from e diff --git a/jupyter_scheduler/models.py b/jupyter_scheduler/models.py index 65f0ec4fe..b80c9556b 100644 --- a/jupyter_scheduler/models.py +++ b/jupyter_scheduler/models.py @@ -301,6 +301,7 @@ class DescribeDownload(BaseModel): job_id: str download_id: str download_initiated_time: int + redownload: bool class Config: orm_mode = True diff --git a/jupyter_scheduler/orm.py b/jupyter_scheduler/orm.py index 2c06d9f32..c3480edea 100644 --- a/jupyter_scheduler/orm.py +++ b/jupyter_scheduler/orm.py @@ -116,6 +116,7 @@ class Download(Base): job_id = Column(String(36), primary_key=True) download_id = Column(String(36), primary_key=True) download_initiated_time = Column(Integer) + redownload = Column(Boolean, default=False) def create_tables(db_url, drop_tables=False): From ba44f7ccc76b2f5a87b1816a34dc6553dadc5721 Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Wed, 10 Apr 2024 14:14:46 -0700 Subject: [PATCH 05/15] def initiate_download_standalonefunction and use it in DownloadManager and ExecutionManager --- jupyter_scheduler/download_manager.py | 34 ++++++++++++++++++--------- jupyter_scheduler/executors.py | 27 +++++++-------------- jupyter_scheduler/extension.py | 4 ++-- jupyter_scheduler/handlers.py | 12 +++++----- 4 files changed, 39 insertions(+), 38 deletions(-) diff --git a/jupyter_scheduler/download_manager.py b/jupyter_scheduler/download_manager.py index f9623b80a..a55705d25 100644 --- a/jupyter_scheduler/download_manager.py +++ b/jupyter_scheduler/download_manager.py @@ -7,6 +7,24 @@ from jupyter_scheduler.pydantic_v1 import BaseModel +def initiate_download_standalone(job_id: str, queue: Queue, db_session, redownload: bool = False): + """ + This static method initiates a download in a standalone manner independent of the DownloadManager instance. It is suitable for use in multiprocessing environment where a direct reference to DownloadManager instance is not feasible. + """ + download_initiated_time = get_utc_timestamp() + download_id = generate_uuid() + download = DescribeDownload( + job_id=job_id, + download_id=download_id, + download_initiated_time=download_initiated_time, + redownload=redownload, + ) + download_record = Download(**download.dict()) + db_session.add(download_record) + db_session.commit() + queue.put(download) + + class DownloadRecordManager: def __init__(self, db_url): self.session = create_session(db_url) @@ -46,17 +64,11 @@ def __init__(self, db_url: str): self.record_manager = DownloadRecordManager(db_url=db_url) self.queue = Queue() - def download_from_staging(self, job_id: str, redownload: bool): - download_initiated_time = get_utc_timestamp() - download_id = generate_uuid() - download = DescribeDownload( - job_id=job_id, - download_id=download_id, - download_initiated_time=download_initiated_time, - redownload=redownload, - ) - self.record_manager.put(download) - self.queue.put(download) + def initiate_download(self, job_id: str, redownload: bool): + with self.record_manager.session() as session: + initiate_download_standalone( + job_id=job_id, queue=self.queue, db_session=session, redownload=redownload + ) def delete_download(self, download_id: str): self.record_manager.delete_download(download_id) diff --git a/jupyter_scheduler/executors.py b/jupyter_scheduler/executors.py index 7cd9a35d2..1fd27c9f5 100644 --- a/jupyter_scheduler/executors.py +++ b/jupyter_scheduler/executors.py @@ -11,11 +11,9 @@ import nbformat from nbconvert.preprocessors import CellExecutionError, ExecutePreprocessor -from jupyter_scheduler.models import DescribeJob, JobFeature, Status from jupyter_scheduler.orm import Job, create_session -from jupyter_scheduler.download_manager import DescribeDownload, Download +from jupyter_scheduler.download_manager import initiate_download_standalone from jupyter_scheduler.models import DescribeJob, JobFeature, Status -from jupyter_scheduler.orm import Job, create_session, generate_uuid from jupyter_scheduler.parameterize import add_parameters from jupyter_scheduler.utils import get_utc_timestamp @@ -154,22 +152,13 @@ def execute(self): finally: self.add_side_effects_files(staging_dir) self.create_output_files(job, nb) - self._download_from_staging(job.job_id) - - def _download_from_staging(self, job_id: str): - download_initiated_time = get_utc_timestamp() - download_id = generate_uuid() - download = DescribeDownload( - job_id=job_id, - download_id=download_id, - download_initiated_time=download_initiated_time, - redownload=True, - ) - with self.db_session() as session: - download_record = Download(**download.dict()) - session.add(download_record) - session.commit() - self.download_queue.put(download) + with self.db_session() as session: + initiate_download_standalone( + job_id=job.job_id, + queue=self.download_queue, + db_session=session, + redownload=True, + ) def add_side_effects_files(self, staging_dir: str): """Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files""" diff --git a/jupyter_scheduler/extension.py b/jupyter_scheduler/extension.py index a87d05d4c..3d062fc36 100644 --- a/jupyter_scheduler/extension.py +++ b/jupyter_scheduler/extension.py @@ -81,8 +81,8 @@ def initialize_settings(self): root_dir=self.serverapp.root_dir, environments_manager=environments_manager, db_url=self.db_url, - config=self.config, download_queue=download_manager.queue, + config=self.config, ) job_files_manager = self.job_files_manager_class(scheduler=scheduler) @@ -95,7 +95,7 @@ def initialize_settings(self): environments_manager=environments_manager, scheduler=scheduler, job_files_manager=job_files_manager, - download_from_staging=download_manager.download_from_staging, + initiate_download=download_manager.initiate_download, ) if scheduler.task_runner: diff --git a/jupyter_scheduler/handlers.py b/jupyter_scheduler/handlers.py index 82fb48249..f0827f0aa 100644 --- a/jupyter_scheduler/handlers.py +++ b/jupyter_scheduler/handlers.py @@ -395,20 +395,20 @@ def get(self): class FilesDownloadHandler(ExtensionHandlerMixin, APIHandler): - _download_from_staging = None + _initiate_download = None @property - def download_from_staging(self): - if not self._download_from_staging: - self._download_from_staging = self.settings.get("download_from_staging", None) + def initiate_download(self): + if not self._initiate_download: + self._initiate_download = self.settings.get("initiate_download", None) - return self._download_from_staging + return self._initiate_download @authenticated async def get(self, job_id): redownload = self.get_query_argument("redownload", False) try: - self.download_from_staging(job_id, redownload) + self.initiate_download(job_id, redownload) except Exception as e: self.log.exception(e) raise HTTPError(500, str(e)) from e From 381dcae83ae5f23ea4fefd3cc139bc5cf13c6e10 Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Wed, 10 Apr 2024 14:37:07 -0700 Subject: [PATCH 06/15] add traitlets-configurable downloads_poll_interval --- jupyter_scheduler/download_runner.py | 34 ++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/jupyter_scheduler/download_runner.py b/jupyter_scheduler/download_runner.py index 448c417f7..0b9302690 100644 --- a/jupyter_scheduler/download_runner.py +++ b/jupyter_scheduler/download_runner.py @@ -1,24 +1,44 @@ import asyncio +import traitlets +from traitlets.config import LoggingConfigurable +from jupyter_server.transutils import _i18n from jupyter_scheduler.download_manager import DownloadManager from jupyter_scheduler.job_files_manager import JobFilesManager -class BaseDownloadRunner: +class BaseDownloadRunner(LoggingConfigurable): + """Base download runner, this class's start method is called + at the start of jupyter server, and is responsible for + polling for downloads to download. + """ + + def __init__(self, config=None, **kwargs): + super().__init__(config=config) + + downloads_poll_interval = traitlets.Integer( + default_value=3, + config=True, + help=_i18n( + "The interval in seconds that the download runner polls for downloads to download." + ), + ) + def start(self): raise NotImplementedError("Must be implemented by subclass") class DownloadRunner(BaseDownloadRunner): + """Default download runner that maintains a record and a queue of initiated downloads , and polls the queue every `poll_interval` seconds + for downloads to download. + """ + def __init__( - self, - download_manager: DownloadManager, - job_files_manager: JobFilesManager, - poll_interval: int = 5, + self, download_manager: DownloadManager, job_files_manager: JobFilesManager, config=None ): + super().__init__(config=config) self.download_manager = download_manager self.job_files_manager = job_files_manager - self.poll_interval = poll_interval async def process_download_queue(self): while not self.download_manager.queue.empty(): @@ -33,4 +53,4 @@ async def start(self): self.download_manager.populate_queue() while True: await self.process_download_queue() - await asyncio.sleep(self.poll_interval) + await asyncio.sleep(self.downloads_poll_interval) From a723337980f24fb2c368a837ec75bc28b349bef4 Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Wed, 10 Apr 2024 15:05:34 -0700 Subject: [PATCH 07/15] refactor output files creation logic into a separate function for clarity --- jupyter_scheduler/executors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jupyter_scheduler/executors.py b/jupyter_scheduler/executors.py index 1fd27c9f5..52c8c6e31 100644 --- a/jupyter_scheduler/executors.py +++ b/jupyter_scheduler/executors.py @@ -11,9 +11,9 @@ import nbformat from nbconvert.preprocessors import CellExecutionError, ExecutePreprocessor -from jupyter_scheduler.orm import Job, create_session from jupyter_scheduler.download_manager import initiate_download_standalone from jupyter_scheduler.models import DescribeJob, JobFeature, Status +from jupyter_scheduler.orm import Job, create_session from jupyter_scheduler.parameterize import add_parameters from jupyter_scheduler.utils import get_utc_timestamp From 92a457432a78d778ad8131f55305caff874712e0 Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Wed, 10 Apr 2024 16:00:36 -0700 Subject: [PATCH 08/15] remove download records associated with job on job delete --- jupyter_scheduler/extension.py | 2 +- jupyter_scheduler/scheduler.py | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/jupyter_scheduler/extension.py b/jupyter_scheduler/extension.py index 3d062fc36..d74ac203d 100644 --- a/jupyter_scheduler/extension.py +++ b/jupyter_scheduler/extension.py @@ -81,7 +81,7 @@ def initialize_settings(self): root_dir=self.serverapp.root_dir, environments_manager=environments_manager, db_url=self.db_url, - download_queue=download_manager.queue, + download_manager=download_manager, config=self.config, ) diff --git a/jupyter_scheduler/scheduler.py b/jupyter_scheduler/scheduler.py index fbf3f4984..a540c7f72 100644 --- a/jupyter_scheduler/scheduler.py +++ b/jupyter_scheduler/scheduler.py @@ -15,6 +15,7 @@ from traitlets import Unicode, default from traitlets.config import LoggingConfigurable +from jupyter_scheduler.download_manager import DownloadManager from jupyter_scheduler.environments import EnvironmentManager from jupyter_scheduler.exceptions import ( IdempotencyTokenError, @@ -404,7 +405,7 @@ def __init__( root_dir: str, environments_manager: Type[EnvironmentManager], db_url: str, - download_queue: mp.Queue, + download_manager: DownloadManager, config=None, **kwargs, ): @@ -414,7 +415,7 @@ def __init__( self.db_url = db_url if self.task_runner_class: self.task_runner = self.task_runner_class(scheduler=self, config=config) - self.download_queue = download_queue + self.download_manager = download_manager @property def db_session(self): @@ -494,7 +495,7 @@ def create_job(self, model: CreateJob) -> str: staging_paths=staging_paths, root_dir=self.root_dir, db_url=self.db_url, - download_queue=self.download_queue, + download_queue=self.download_manager.queue, ).process ) p.start() @@ -586,6 +587,7 @@ def delete_job(self, job_id: str): session.query(Job).filter(Job.job_id == job_id).delete() session.commit() + self.download_manager.delete_job_downloads(job_id) def stop_job(self, job_id): with self.db_session() as session: From bc4bc88deca8a58fe53e36996624e15a2e7da5fb Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Fri, 12 Apr 2024 10:17:22 -0700 Subject: [PATCH 09/15] clarify comments --- jupyter_scheduler/download_manager.py | 10 ++++++---- jupyter_scheduler/executors.py | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/jupyter_scheduler/download_manager.py b/jupyter_scheduler/download_manager.py index a55705d25..29f967b9f 100644 --- a/jupyter_scheduler/download_manager.py +++ b/jupyter_scheduler/download_manager.py @@ -7,9 +7,11 @@ from jupyter_scheduler.pydantic_v1 import BaseModel -def initiate_download_standalone(job_id: str, queue: Queue, db_session, redownload: bool = False): +def initiate_download_standalone( + job_id: str, download_queue: Queue, db_session, redownload: bool = False +): """ - This static method initiates a download in a standalone manner independent of the DownloadManager instance. It is suitable for use in multiprocessing environment where a direct reference to DownloadManager instance is not feasible. + This method initiates a download in a standalone manner independent of the DownloadManager instance. It is suitable for use in multiprocessing environment where a direct reference to DownloadManager instance is not feasible. """ download_initiated_time = get_utc_timestamp() download_id = generate_uuid() @@ -22,7 +24,7 @@ def initiate_download_standalone(job_id: str, queue: Queue, db_session, redownlo download_record = Download(**download.dict()) db_session.add(download_record) db_session.commit() - queue.put(download) + download_queue.put(download) class DownloadRecordManager: @@ -67,7 +69,7 @@ def __init__(self, db_url: str): def initiate_download(self, job_id: str, redownload: bool): with self.record_manager.session() as session: initiate_download_standalone( - job_id=job_id, queue=self.queue, db_session=session, redownload=redownload + job_id=job_id, download_queue=self.queue, db_session=session, redownload=redownload ) def delete_download(self, download_id: str): diff --git a/jupyter_scheduler/executors.py b/jupyter_scheduler/executors.py index 52c8c6e31..97fd4423e 100644 --- a/jupyter_scheduler/executors.py +++ b/jupyter_scheduler/executors.py @@ -155,7 +155,7 @@ def execute(self): with self.db_session() as session: initiate_download_standalone( job_id=job.job_id, - queue=self.download_queue, + download_queue=self.download_queue, db_session=session, redownload=True, ) From 551561ea16d9dd5dcfeb10316027e6e976205131 Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Fri, 12 Apr 2024 17:00:47 -0700 Subject: [PATCH 10/15] fix existing pytests --- conftest.py | 7 +++++-- jupyter_scheduler/tests/mocks.py | 6 ++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/conftest.py b/conftest.py index 71915074f..29296764c 100644 --- a/conftest.py +++ b/conftest.py @@ -5,7 +5,7 @@ from jupyter_scheduler.orm import create_session, create_tables from jupyter_scheduler.scheduler import Scheduler -from jupyter_scheduler.tests.mocks import MockEnvironmentManager +from jupyter_scheduler.tests.mocks import MockDownloadManager, MockEnvironmentManager pytest_plugins = ("jupyter_server.pytest_plugin",) @@ -48,5 +48,8 @@ def jp_scheduler_db(): @pytest.fixture def jp_scheduler(): return Scheduler( - db_url=DB_URL, root_dir=str(TEST_ROOT_DIR), environments_manager=MockEnvironmentManager() + db_url=DB_URL, + root_dir=(TEST_ROOT_DIR), + environments_manager=MockEnvironmentManager(), + download_manager=MockDownloadManager(DB_URL), ) diff --git a/jupyter_scheduler/tests/mocks.py b/jupyter_scheduler/tests/mocks.py index 9a60e6b74..317e7523b 100644 --- a/jupyter_scheduler/tests/mocks.py +++ b/jupyter_scheduler/tests/mocks.py @@ -1,5 +1,6 @@ from typing import Dict, List +from jupyter_scheduler.download_manager import DownloadManager from jupyter_scheduler.environments import EnvironmentManager from jupyter_scheduler.executors import ExecutionManager from jupyter_scheduler.models import JobFeature, RuntimeEnvironment, UpdateJobDefinition @@ -73,3 +74,8 @@ def pause_jobs(self, job_definition_id: str): def resume_jobs(self, job_definition_id: str): pass + + +class MockDownloadManager(DownloadManager): + def __init__(self, db_url: str): + pass From 1cb9c8447f7a194e7f8ef51dbbc9e9e0b5361e3e Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Wed, 17 Apr 2024 17:26:29 -0700 Subject: [PATCH 11/15] Force Process spawn, not fork in in JobFilesManager --- jupyter_scheduler/job_files_manager.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/jupyter_scheduler/job_files_manager.py b/jupyter_scheduler/job_files_manager.py index 0e39c2b76..4072ff5ba 100644 --- a/jupyter_scheduler/job_files_manager.py +++ b/jupyter_scheduler/job_files_manager.py @@ -1,7 +1,7 @@ import os import random import tarfile -from multiprocessing import Process +import multiprocessing as mp from typing import Dict, List, Optional, Type import fsspec @@ -23,7 +23,15 @@ async def copy_from_staging(self, job_id: str, redownload: Optional[bool] = Fals output_filenames = self.scheduler.get_job_filenames(job) output_dir = self.scheduler.get_local_output_path(model=job, root_dir_relative=True) - p = Process( + # The MP context forces new processes to not be forked on Linux. + # This is necessary because `asyncio.get_event_loop()` is bugged in + # forked processes in Python versions below 3.12. This method is + # called by `jupyter_core` by `nbconvert` in the default executor. + # + # See: https://github.com/python/cpython/issues/66285 + # See also: https://github.com/jupyter/jupyter_core/pull/362 + mp_ctx = mp.get_context("spawn") + p = mp_ctx.Process( target=Downloader( output_formats=job.output_formats, output_filenames=output_filenames, From 4c2b558d2df63507c507a7e2bdb44820b1a9cb4d Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Thu, 18 Apr 2024 17:11:02 -0700 Subject: [PATCH 12/15] force multiprocessing start method to be "spawn" at the start of extension, --- jupyter_scheduler/extension.py | 10 ++++++++++ jupyter_scheduler/job_files_manager.py | 13 +++---------- jupyter_scheduler/scheduler.py | 12 ++---------- 3 files changed, 15 insertions(+), 20 deletions(-) diff --git a/jupyter_scheduler/extension.py b/jupyter_scheduler/extension.py index d74ac203d..d2a5e0f87 100644 --- a/jupyter_scheduler/extension.py +++ b/jupyter_scheduler/extension.py @@ -1,4 +1,5 @@ import asyncio +import multiprocessing from jupyter_core.paths import jupyter_data_dir from jupyter_server.extension.application import ExtensionApp @@ -69,6 +70,15 @@ def _db_url_default(self): ) def initialize_settings(self): + # Forces new processes to not be forked on Linux. + # This is necessary because `asyncio.get_event_loop()` is bugged in + # forked processes in Python versions below 3.12. This method is + # called by `jupyter_core` by `nbconvert` in the default executor. + + # See: https://github.com/python/cpython/issues/66285 + # See also: https://github.com/jupyter/jupyter_core/pull/362 + multiprocessing.set_start_method("spawn", force=True) + super().initialize_settings() create_tables(self.db_url, self.drop_tables) diff --git a/jupyter_scheduler/job_files_manager.py b/jupyter_scheduler/job_files_manager.py index 4072ff5ba..0c3434ef9 100644 --- a/jupyter_scheduler/job_files_manager.py +++ b/jupyter_scheduler/job_files_manager.py @@ -1,7 +1,8 @@ +from multiprocessing import Process import os import random import tarfile -import multiprocessing as mp + from typing import Dict, List, Optional, Type import fsspec @@ -23,15 +24,7 @@ async def copy_from_staging(self, job_id: str, redownload: Optional[bool] = Fals output_filenames = self.scheduler.get_job_filenames(job) output_dir = self.scheduler.get_local_output_path(model=job, root_dir_relative=True) - # The MP context forces new processes to not be forked on Linux. - # This is necessary because `asyncio.get_event_loop()` is bugged in - # forked processes in Python versions below 3.12. This method is - # called by `jupyter_core` by `nbconvert` in the default executor. - # - # See: https://github.com/python/cpython/issues/66285 - # See also: https://github.com/jupyter/jupyter_core/pull/362 - mp_ctx = mp.get_context("spawn") - p = mp_ctx.Process( + p = Process( target=Downloader( output_formats=job.output_formats, output_filenames=output_filenames, diff --git a/jupyter_scheduler/scheduler.py b/jupyter_scheduler/scheduler.py index a540c7f72..4ecb314f5 100644 --- a/jupyter_scheduler/scheduler.py +++ b/jupyter_scheduler/scheduler.py @@ -1,4 +1,4 @@ -import multiprocessing as mp +from multiprocessing import Process import os import random import shutil @@ -481,15 +481,7 @@ def create_job(self, model: CreateJob) -> str: else: self.copy_input_file(model.input_uri, staging_paths["input"]) - # The MP context forces new processes to not be forked on Linux. - # This is necessary because `asyncio.get_event_loop()` is bugged in - # forked processes in Python versions below 3.12. This method is - # called by `jupyter_core` by `nbconvert` in the default executor. - # - # See: https://github.com/python/cpython/issues/66285 - # See also: https://github.com/jupyter/jupyter_core/pull/362 - mp_ctx = mp.get_context("spawn") - p = mp_ctx.Process( + p = Process( target=self.execution_manager_class( job_id=job.job_id, staging_paths=staging_paths, From a6b75ed884ab3556b03a6543db8dd545dcd35d82 Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Thu, 18 Apr 2024 17:11:51 -0700 Subject: [PATCH 13/15] test that Process is called only once --- jupyter_scheduler/tests/test_job_files_manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/jupyter_scheduler/tests/test_job_files_manager.py b/jupyter_scheduler/tests/test_job_files_manager.py index e6fcb5d6d..a95876f21 100644 --- a/jupyter_scheduler/tests/test_job_files_manager.py +++ b/jupyter_scheduler/tests/test_job_files_manager.py @@ -58,6 +58,7 @@ async def test_copy_from_staging(): redownload=False, include_staging_files=None, ) + mock_process.assert_called_once() HERE = Path(__file__).parent.resolve() From 9d4110342312598b7f62d87d29faa04015cfff43 Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Thu, 9 May 2024 13:45:53 -0700 Subject: [PATCH 14/15] fix pytest tests --- jupyter_scheduler/tests/mocks.py | 4 +++- jupyter_scheduler/tests/test_execution_manager.py | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/jupyter_scheduler/tests/mocks.py b/jupyter_scheduler/tests/mocks.py index 317e7523b..304915dd4 100644 --- a/jupyter_scheduler/tests/mocks.py +++ b/jupyter_scheduler/tests/mocks.py @@ -1,4 +1,6 @@ +from multiprocessing import Queue from typing import Dict, List +from unittest.mock import Mock from jupyter_scheduler.download_manager import DownloadManager from jupyter_scheduler.environments import EnvironmentManager @@ -78,4 +80,4 @@ def resume_jobs(self, job_definition_id: str): class MockDownloadManager(DownloadManager): def __init__(self, db_url: str): - pass + self.queue = Queue() diff --git a/jupyter_scheduler/tests/test_execution_manager.py b/jupyter_scheduler/tests/test_execution_manager.py index a9393eb9c..fad3f3fe9 100644 --- a/jupyter_scheduler/tests/test_execution_manager.py +++ b/jupyter_scheduler/tests/test_execution_manager.py @@ -7,6 +7,7 @@ from conftest import DB_URL from jupyter_scheduler.executors import DefaultExecutionManager from jupyter_scheduler.orm import Job +from jupyter_scheduler.tests.mocks import MockDownloadManager JOB_ID = "69856f4e-ce94-45fd-8f60-3a587457fce7" NOTEBOOK_NAME = "side_effects.ipynb" @@ -30,11 +31,13 @@ def load_job(jp_scheduler_db): def test_add_side_effects_files(jp_scheduler_db, load_job): + download_manager = MockDownloadManager(DB_URL) manager = DefaultExecutionManager( job_id=JOB_ID, root_dir=str(NOTEBOOK_DIR), db_url=DB_URL, staging_paths={"input": str(NOTEBOOK_PATH)}, + download_queue=download_manager.queue, ) manager.add_side_effects_files(str(NOTEBOOK_DIR)) From 88e7b0b28112035f246a1b9f22dc709f7ebd4663 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 9 May 2024 22:04:32 +0000 Subject: [PATCH 15/15] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- jupyter_scheduler/download_manager.py | 2 +- jupyter_scheduler/download_runner.py | 3 ++- jupyter_scheduler/job_files_manager.py | 3 +-- jupyter_scheduler/scheduler.py | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/jupyter_scheduler/download_manager.py b/jupyter_scheduler/download_manager.py index 29f967b9f..1cb3fefb4 100644 --- a/jupyter_scheduler/download_manager.py +++ b/jupyter_scheduler/download_manager.py @@ -3,8 +3,8 @@ from jupyter_scheduler.models import DescribeDownload from jupyter_scheduler.orm import Download, create_session, generate_uuid -from jupyter_scheduler.utils import get_utc_timestamp from jupyter_scheduler.pydantic_v1 import BaseModel +from jupyter_scheduler.utils import get_utc_timestamp def initiate_download_standalone( diff --git a/jupyter_scheduler/download_runner.py b/jupyter_scheduler/download_runner.py index 0b9302690..0e284e3ce 100644 --- a/jupyter_scheduler/download_runner.py +++ b/jupyter_scheduler/download_runner.py @@ -1,8 +1,9 @@ import asyncio + import traitlets +from jupyter_server.transutils import _i18n from traitlets.config import LoggingConfigurable -from jupyter_server.transutils import _i18n from jupyter_scheduler.download_manager import DownloadManager from jupyter_scheduler.job_files_manager import JobFilesManager diff --git a/jupyter_scheduler/job_files_manager.py b/jupyter_scheduler/job_files_manager.py index 0c3434ef9..0e39c2b76 100644 --- a/jupyter_scheduler/job_files_manager.py +++ b/jupyter_scheduler/job_files_manager.py @@ -1,8 +1,7 @@ -from multiprocessing import Process import os import random import tarfile - +from multiprocessing import Process from typing import Dict, List, Optional, Type import fsspec diff --git a/jupyter_scheduler/scheduler.py b/jupyter_scheduler/scheduler.py index 4ecb314f5..a3861c73b 100644 --- a/jupyter_scheduler/scheduler.py +++ b/jupyter_scheduler/scheduler.py @@ -1,7 +1,7 @@ -from multiprocessing import Process import os import random import shutil +from multiprocessing import Process from typing import Dict, List, Optional, Type, Union import fsspec