From 3d110ee49316bf281c2a5e3823c0925695dc2a83 Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Thu, 22 Feb 2024 09:02:52 -0800 Subject: [PATCH 01/13] start mlflow server --- jupyter_scheduler/scheduler.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/jupyter_scheduler/scheduler.py b/jupyter_scheduler/scheduler.py index 867034c6..cb4acb62 100644 --- a/jupyter_scheduler/scheduler.py +++ b/jupyter_scheduler/scheduler.py @@ -3,6 +3,7 @@ import random import shutil from typing import Dict, List, Optional, Type, Union +import subprocess import fsspec import psutil @@ -399,6 +400,22 @@ class Scheduler(BaseScheduler): task_runner = Instance(allow_none=True, klass="jupyter_scheduler.task_runner.BaseTaskRunner") + def start_mlflow_server(self): + subprocess.Popen( + [ + "mlflow", + "server", + "--backend-store-uri", + "./mlruns", + "--default-artifact-root", + "./mlartifacts", + "--host", + "0.0.0.0", + "--port", + "5000", + ] + ) + def __init__( self, root_dir: str, @@ -414,6 +431,8 @@ def __init__( if self.task_runner_class: self.task_runner = self.task_runner_class(scheduler=self, config=config) + self.start_mlflow_server() + @property def db_session(self): if not self._db_session: From 05df7a7aaf615bb9e85cee64331fee317302949b Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Thu, 22 Feb 2024 14:46:08 -0800 Subject: [PATCH 02/13] Rename "Input filename" -> "Input file" in Job Def Table to be consistent --- src/mainviews/list-jobs.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mainviews/list-jobs.tsx b/src/mainviews/list-jobs.tsx index 1821d501..769371bf 100644 --- a/src/mainviews/list-jobs.tsx +++ b/src/mainviews/list-jobs.tsx @@ -226,7 +226,7 @@ function ListJobDefinitionsTable(props: ListJobDefinitionsTableProps) { }, { sortField: 'input_filename', - name: trans.__('Input filename') + name: trans.__('Input file') }, { sortField: 'create_time', From 47c4dafdf73736d8d4e2e71ebcc48b3374336567 Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Fri, 23 Feb 2024 14:54:24 -0800 Subject: [PATCH 03/13] add "Log with MLFlow" checkbox --- src/components/mlflow-checkbox.tsx | 18 ++++++++++++++++++ src/mainviews/create-job.tsx | 2 ++ 2 files changed, 20 insertions(+) create mode 100644 src/components/mlflow-checkbox.tsx diff --git a/src/components/mlflow-checkbox.tsx b/src/components/mlflow-checkbox.tsx new file mode 100644 index 00000000..81b39c09 --- /dev/null +++ b/src/components/mlflow-checkbox.tsx @@ -0,0 +1,18 @@ +import React, { ChangeEvent } from 'react'; + +import { Checkbox, FormControlLabel, FormGroup } from '@mui/material'; + +export type MLFlowCheckboxProps = { + onChange: (event: ChangeEvent) => void; +}; + +export function MLFlowCheckbox(props: MLFlowCheckboxProps): JSX.Element { + return ( + + } + label="Log with MLFlow" + /> + + ); +} diff --git a/src/mainviews/create-job.tsx b/src/mainviews/create-job.tsx index 98fee5fc..7526cc87 100644 --- a/src/mainviews/create-job.tsx +++ b/src/mainviews/create-job.tsx @@ -43,6 +43,7 @@ import { import { Box, Stack } from '@mui/system'; import { getErrorMessage } from '../util/errors'; import { PackageInputFolderControl } from '../components/input-folder-checkbox'; +import { MLFlowCheckbox } from '../components/mlflow-checkbox'; export interface ICreateJobProps { model: ICreateJobModel; @@ -511,6 +512,7 @@ export function CreateJob(props: ICreateJobProps): JSX.Element { onChange={handleInputChange} inputFile={props.model.inputFile} /> + Date: Tue, 27 Feb 2024 00:58:11 -0800 Subject: [PATCH 04/13] log jobs and job defs in mlflow --- jupyter_scheduler/executors.py | 24 +++++++++----- jupyter_scheduler/models.py | 10 ++++++ jupyter_scheduler/orm.py | 3 ++ jupyter_scheduler/scheduler.py | 35 ++++++++++++++++---- src/components/mlflow-checkbox.tsx | 8 ++--- src/handler.ts | 10 ++++++ src/mainviews/create-job.tsx | 4 +-- src/mainviews/detail-view/job-definition.tsx | 23 +++++++++++++ src/mainviews/detail-view/job-detail.tsx | 34 +++++++++++++++++++ src/model.ts | 17 ++++++++-- 10 files changed, 145 insertions(+), 23 deletions(-) diff --git a/jupyter_scheduler/executors.py b/jupyter_scheduler/executors.py index 7e1a9974..f43d9e75 100644 --- a/jupyter_scheduler/executors.py +++ b/jupyter_scheduler/executors.py @@ -7,6 +7,7 @@ from typing import Dict import fsspec +import mlflow import nbconvert import nbformat from nbconvert.preprocessors import CellExecutionError, ExecutePreprocessor @@ -14,6 +15,7 @@ from jupyter_scheduler.models import DescribeJob, JobFeature, Status from jupyter_scheduler.orm import Job, create_session from jupyter_scheduler.parameterize import add_parameters +from jupyter_scheduler.scheduler import MLFLOW_SERVER_URI from jupyter_scheduler.utils import get_utc_timestamp @@ -136,13 +138,17 @@ def execute(self): kernel_name=nb.metadata.kernelspec["name"], store_widget_state=True, cwd=staging_dir ) - try: - ep.preprocess(nb, {"metadata": {"path": staging_dir}}) - except CellExecutionError as e: - raise e - finally: - self.add_side_effects_files(staging_dir) - self.create_output_files(job, nb) + mlflow.set_tracking_uri(MLFLOW_SERVER_URI) + with mlflow.start_run(run_id=job.mlflow_run_id): + try: + ep.preprocess(nb, {"metadata": {"path": staging_dir}}) + if job.parameters: + mlflow.log_params(job.parameters) + except CellExecutionError as e: + raise e + finally: + self.add_side_effects_files(staging_dir) + self.create_output_files(job, nb) def add_side_effects_files(self, staging_dir: str): """Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files""" @@ -170,8 +176,10 @@ def create_output_files(self, job: DescribeJob, notebook_node): for output_format in job.output_formats: cls = nbconvert.get_exporter(output_format) output, _ = cls().from_notebook_node(notebook_node) - with fsspec.open(self.staging_paths[output_format], "w", encoding="utf-8") as f: + output_path = self.staging_paths[output_format] + with fsspec.open(output_path, "w", encoding="utf-8") as f: f.write(output) + mlflow.log_artifact(output_path) def supported_features(cls) -> Dict[JobFeature, bool]: return { diff --git a/jupyter_scheduler/models.py b/jupyter_scheduler/models.py index 38e240e0..bd9b0b0c 100644 --- a/jupyter_scheduler/models.py +++ b/jupyter_scheduler/models.py @@ -86,6 +86,9 @@ class CreateJob(BaseModel): output_filename_template: Optional[str] = OUTPUT_FILENAME_TEMPLATE compute_type: Optional[str] = None package_input_folder: Optional[bool] = None + mlflow_logging: Optional[bool] = None + mlflow_experiment_id: Optional[str] = None + mlflow_run_id: Optional[str] = None @root_validator def compute_input_filename(cls, values) -> Dict: @@ -148,6 +151,9 @@ class DescribeJob(BaseModel): downloaded: bool = False package_input_folder: Optional[bool] = None packaged_files: Optional[List[str]] = [] + mlflow_logging: Optional[bool] = None + mlflow_experiment_id: Optional[str] = None + mlflow_run_id: Optional[str] = None class Config: orm_mode = True @@ -213,6 +219,8 @@ class CreateJobDefinition(BaseModel): schedule: Optional[str] = None timezone: Optional[str] = None package_input_folder: Optional[bool] = None + mlflow_logging: Optional[bool] = None + mlflow_experiment_id: Optional[str] = None @root_validator def compute_input_filename(cls, values) -> Dict: @@ -240,6 +248,8 @@ class DescribeJobDefinition(BaseModel): active: bool package_input_folder: Optional[bool] = None packaged_files: Optional[List[str]] = [] + mlflow_logging: Optional[bool] = None + mlflow_experiment_id: Optional[str] = None class Config: orm_mode = True diff --git a/jupyter_scheduler/orm.py b/jupyter_scheduler/orm.py index dbbbfad8..449d4fe1 100644 --- a/jupyter_scheduler/orm.py +++ b/jupyter_scheduler/orm.py @@ -89,6 +89,8 @@ class CommonColumns: # Any default values specified for new columns will be ignored during the migration process. package_input_folder = Column(Boolean) packaged_files = Column(JsonType, default=[]) + mlflow_logging = Column(Boolean) + mlflow_experiment_id = Column(String(256), nullable=True) class Job(CommonColumns, Base): @@ -105,6 +107,7 @@ class Job(CommonColumns, Base): idempotency_token = Column(String(256)) # All new columns added to this table must be nullable to ensure compatibility during database migrations. # Any default values specified for new columns will be ignored during the migration process. + mlflow_run_id = Column(String(256), nullable=True) class JobDefinition(CommonColumns, Base): diff --git a/jupyter_scheduler/scheduler.py b/jupyter_scheduler/scheduler.py index cb4acb62..b4b98381 100644 --- a/jupyter_scheduler/scheduler.py +++ b/jupyter_scheduler/scheduler.py @@ -4,8 +4,11 @@ import shutil from typing import Dict, List, Optional, Type, Union import subprocess +from typing import Dict, Optional, Type, Union +from uuid import uuid4 import fsspec +import mlflow import psutil from jupyter_core.paths import jupyter_data_dir from jupyter_server.transutils import _i18n @@ -46,6 +49,10 @@ create_output_filename, ) +MLFLOW_SERVER_HOST = "127.0.0.1" +MLFLOW_SERVER_PORT = "5000" +MLFLOW_SERVER_URI = f"http://{MLFLOW_SERVER_HOST}:{MLFLOW_SERVER_PORT}" + class BaseScheduler(LoggingConfigurable): """Base class for schedulers. A default implementation @@ -405,16 +412,13 @@ def start_mlflow_server(self): [ "mlflow", "server", - "--backend-store-uri", - "./mlruns", - "--default-artifact-root", - "./mlartifacts", "--host", - "0.0.0.0", + MLFLOW_SERVER_HOST, "--port", - "5000", + MLFLOW_SERVER_PORT, ] ) + mlflow.set_tracking_uri(MLFLOW_SERVER_URI) def __init__( self, @@ -481,6 +485,19 @@ def create_job(self, model: CreateJob) -> str: if not model.output_formats: model.output_formats = [] + mlflow_client = mlflow.MlflowClient() + + if model.job_definition_id and model.mlflow_experiment_id: + experiment_id = model.mlflow_experiment_id + else: + experiment_id = mlflow_client.create_experiment(f"{model.name}-{uuid4()}") + model.mlflow_experiment_id = experiment_id + input_file_path = os.path.join(self.root_dir, model.input_uri) + mlflow.log_artifact(input_file_path, "input") + + mlflow_run = mlflow_client.create_run(experiment_id=experiment_id, run_name=model.name) + model.mlflow_run_id = mlflow_run.info.run_id + job = Job(**model.dict(exclude_none=True, exclude={"input_uri"})) session.add(job) @@ -628,6 +645,12 @@ def create_job_definition(self, model: CreateJobDefinition) -> str: if not self.file_exists(model.input_uri): raise InputUriError(model.input_uri) + mlflow_client = mlflow.MlflowClient() + experiment_id = mlflow_client.create_experiment(f"{model.name}-{uuid4()}") + model.mlflow_experiment_id = experiment_id + input_file_path = os.path.join(self.root_dir, model.input_uri) + mlflow.log_artifact(input_file_path, "input") + job_definition = JobDefinition(**model.dict(exclude_none=True, exclude={"input_uri"})) session.add(job_definition) session.commit() diff --git a/src/components/mlflow-checkbox.tsx b/src/components/mlflow-checkbox.tsx index 81b39c09..8ef3db4c 100644 --- a/src/components/mlflow-checkbox.tsx +++ b/src/components/mlflow-checkbox.tsx @@ -2,15 +2,13 @@ import React, { ChangeEvent } from 'react'; import { Checkbox, FormControlLabel, FormGroup } from '@mui/material'; -export type MLFlowCheckboxProps = { +export function MLFlowLoggingControl(props: { onChange: (event: ChangeEvent) => void; -}; - -export function MLFlowCheckbox(props: MLFlowCheckboxProps): JSX.Element { +}): JSX.Element { return ( } + control={} label="Log with MLFlow" /> diff --git a/src/handler.ts b/src/handler.ts index 4381bbd3..7d8700d6 100644 --- a/src/handler.ts +++ b/src/handler.ts @@ -364,6 +364,8 @@ export namespace Scheduler { schedule?: string; timezone?: string; package_input_folder?: boolean; + mlflow_logging?: boolean; + mlflow_experiment_id?: string; } export interface IUpdateJobDefinition { @@ -391,6 +393,8 @@ export namespace Scheduler { update_time: number; active: boolean; package_input_folder?: boolean; + mlflow_logging: boolean; + mlflow_experiment_id?: string; } export interface IEmailNotifications { @@ -418,6 +422,9 @@ export namespace Scheduler { output_formats?: string[]; compute_type?: string; package_input_folder?: boolean; + mlflow_logging?: boolean; + mlflow_experiment_id?: string; + mlflow_run_id?: string; } export interface ICreateJobFromDefinition { @@ -467,6 +474,9 @@ export namespace Scheduler { end_time?: number; downloaded: boolean; package_input_folder?: boolean; + mlflow_logging?: boolean; + mlflow_experiment_id?: string; + mlflow_run_id?: string; } export interface ICreateJobResponse { diff --git a/src/mainviews/create-job.tsx b/src/mainviews/create-job.tsx index 7526cc87..3885b6ac 100644 --- a/src/mainviews/create-job.tsx +++ b/src/mainviews/create-job.tsx @@ -43,7 +43,7 @@ import { import { Box, Stack } from '@mui/system'; import { getErrorMessage } from '../util/errors'; import { PackageInputFolderControl } from '../components/input-folder-checkbox'; -import { MLFlowCheckbox } from '../components/mlflow-checkbox'; +import { MLFlowLoggingControl } from '../components/mlflow-checkbox'; export interface ICreateJobProps { model: ICreateJobModel; @@ -512,7 +512,7 @@ export function CreateJob(props: ICreateJobProps): JSX.Element { onChange={handleInputChange} inputFile={props.model.inputFile} /> - + {trans.__('Edit Job Definition')} + {model.mlflowLogging === true && ( + + )} { log('job-definition-detail.delete'); @@ -231,6 +244,16 @@ export function JobDefinition(props: IJobDefinitionProps): JSX.Element { label: trans.__('Time zone') } ], + [ + { + value: model.mlflowLogging ? trans.__('Yes') : trans.__('No'), + label: trans.__('MLFlow Logging') + }, + { + value: props.model.mlflowExperimentId, + label: trans.__('MLFLow Experiment Id') + } + ], [ { value: model.packageInputFolder ? trans.__('Yes') : trans.__('No'), diff --git a/src/mainviews/detail-view/job-detail.tsx b/src/mainviews/detail-view/job-detail.tsx index 7aaefd1d..c4b10965 100644 --- a/src/mainviews/detail-view/job-detail.tsx +++ b/src/mainviews/detail-view/job-detail.tsx @@ -39,6 +39,11 @@ import { LabeledValue } from '../../components/labeled-value'; import { getErrorMessage } from '../../util/errors'; +import { OpenInNew } from '@mui/icons-material'; + +const MLFLOW_SERVER_HOST = '127.0.0.1'; +const MLFLOW_SERVER_PORT = '5000'; +const MLFLOW_SERVER_URI = `http://${MLFLOW_SERVER_HOST}:${MLFLOW_SERVER_PORT}`; export interface IJobDetailProps { app: JupyterFrontEnd; @@ -167,6 +172,18 @@ export function JobDetail(props: IJobDetailProps): JSX.Element { {trans.__('Download Job Files')} )} + {props.model?.mlflowLogging === true && ( + + )} {props.model !== null && props.model.status === 'IN_PROGRESS' && ( Date: Tue, 27 Feb 2024 10:42:59 -0800 Subject: [PATCH 05/13] add mlflow python dependency --- pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index e3609293..6bfa53dc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,7 +37,8 @@ dependencies = [ "croniter~=1.4", "pytz==2023.3", "fsspec==2023.6.0", - "psutil~=5.9" + "psutil~=5.9", + "mlflow" ] [project.optional-dependencies] From aec32ef7b778b3ef6df38d487a42bb6dc49ceb19 Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Tue, 27 Feb 2024 22:09:28 -0800 Subject: [PATCH 06/13] log cells tagged with "mlflow_log" as artifacts --- jupyter_scheduler/executors.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/jupyter_scheduler/executors.py b/jupyter_scheduler/executors.py index f43d9e75..a6a2b653 100644 --- a/jupyter_scheduler/executors.py +++ b/jupyter_scheduler/executors.py @@ -144,6 +144,17 @@ def execute(self): ep.preprocess(nb, {"metadata": {"path": staging_dir}}) if job.parameters: mlflow.log_params(job.parameters) + + for index, cell in enumerate(nb.cells): + if "tags" in cell.metadata and "mlflow_log" in cell.metadata["tags"]: + mlflow.log_text(cell.source, f"source_cell_{index}.txt") + if cell.cell_type == "code" and cell.outputs: + for output in cell.outputs: + if "text/plain" in output.data: + mlflow.log_text( + output.data["text/plain"], f"output_cell_{cell.cell_id}.txt" + ) + except CellExecutionError as e: raise e finally: From 7a930696054712b1aa23453ebd5361cff8570fe0 Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Tue, 27 Feb 2024 22:57:20 -0800 Subject: [PATCH 07/13] use enumerate index instead of unavailable cell_id --- jupyter_scheduler/executors.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/jupyter_scheduler/executors.py b/jupyter_scheduler/executors.py index a6a2b653..a32718cc 100644 --- a/jupyter_scheduler/executors.py +++ b/jupyter_scheduler/executors.py @@ -145,14 +145,14 @@ def execute(self): if job.parameters: mlflow.log_params(job.parameters) - for index, cell in enumerate(nb.cells): + for idx, cell in enumerate(nb.cells): if "tags" in cell.metadata and "mlflow_log" in cell.metadata["tags"]: - mlflow.log_text(cell.source, f"source_cell_{index}.txt") + mlflow.log_text(cell.source, f"source_cell_{idx}.txt") if cell.cell_type == "code" and cell.outputs: for output in cell.outputs: if "text/plain" in output.data: mlflow.log_text( - output.data["text/plain"], f"output_cell_{cell.cell_id}.txt" + output.data["text/plain"], f"output_cell_{idx}.txt" ) except CellExecutionError as e: From ec248212e7cca443955d6af7186ed9e28291c1a5 Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Tue, 5 Mar 2024 01:07:14 -0800 Subject: [PATCH 08/13] refactor and update in line with comments --- jupyter_scheduler/executors.py | 133 ++++++++++++++++++++++++++------- jupyter_scheduler/scheduler.py | 25 ++++++- 2 files changed, 129 insertions(+), 29 deletions(-) diff --git a/jupyter_scheduler/executors.py b/jupyter_scheduler/executors.py index a32718cc..fc532ed8 100644 --- a/jupyter_scheduler/executors.py +++ b/jupyter_scheduler/executors.py @@ -1,3 +1,4 @@ +import base64 import io import os import shutil @@ -138,28 +139,15 @@ def execute(self): kernel_name=nb.metadata.kernelspec["name"], store_widget_state=True, cwd=staging_dir ) - mlflow.set_tracking_uri(MLFLOW_SERVER_URI) - with mlflow.start_run(run_id=job.mlflow_run_id): - try: - ep.preprocess(nb, {"metadata": {"path": staging_dir}}) - if job.parameters: - mlflow.log_params(job.parameters) - - for idx, cell in enumerate(nb.cells): - if "tags" in cell.metadata and "mlflow_log" in cell.metadata["tags"]: - mlflow.log_text(cell.source, f"source_cell_{idx}.txt") - if cell.cell_type == "code" and cell.outputs: - for output in cell.outputs: - if "text/plain" in output.data: - mlflow.log_text( - output.data["text/plain"], f"output_cell_{idx}.txt" - ) - - except CellExecutionError as e: - raise e - finally: - self.add_side_effects_files(staging_dir) - self.create_output_files(job, nb) + try: + ep.preprocess(nb, {"metadata": {"path": staging_dir}}) + except CellExecutionError as e: + raise e + finally: + self.add_side_effects_files(staging_dir) + self.create_output_files(job, nb) + if getattr(job, "mlflow_logging", False): + self.log_to_mlflow(job, nb) def add_side_effects_files(self, staging_dir: str): """Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files""" @@ -187,10 +175,105 @@ def create_output_files(self, job: DescribeJob, notebook_node): for output_format in job.output_formats: cls = nbconvert.get_exporter(output_format) output, _ = cls().from_notebook_node(notebook_node) - output_path = self.staging_paths[output_format] - with fsspec.open(output_path, "w", encoding="utf-8") as f: + with fsspec.open(self.staging_paths[output_format], "w", encoding="utf-8") as f: f.write(output) - mlflow.log_artifact(output_path) + + def log_to_mlflow(self, job, nb): + mlflow.set_tracking_uri(MLFLOW_SERVER_URI) + with mlflow.start_run(run_id=job.mlflow_run_id): + if job.parameters: + mlflow.log_params(job.parameters) + + for cell_idx, cell in enumerate(nb.cells): + if "tags" in cell.metadata: + if "mlflow_log" in cell.metadata["tags"]: + self.mlflow_log(cell, cell_idx) + elif "mlflow_log_input" in cell.metadata["tags"]: + self.mlflow_log_input(cell, cell_idx) + elif "mlflow_log_output" in cell.metadata["tags"]: + self.mlflow_log_output(cell, cell_idx) + + for output_format in job.output_formats: + output_path = self.staging_paths[output_format] + directory, file_name_with_extension = os.path.split(output_path) + file_name, file_extension = os.path.splitext(file_name_with_extension) + file_name_parts = file_name.split("-") + file_name_without_timestamp = "-".join(file_name_parts[:-7]) + file_name_final = f"{file_name_without_timestamp}{file_extension}" + new_output_path = os.path.join(directory, file_name_final) + shutil.copy(output_path, new_output_path) + timestamp = "-".join(file_name_parts[-7:]).split(".")[0] + mlflow.log_param("job_created", timestamp) + mlflow.log_artifact(new_output_path, "") + os.remove(new_output_path) + + def mlflow_log(self, cell, cell_idx): + self.mlflow_log_input(cell, cell_idx) + self.mlflow_log_output(cell, cell_idx) + + def mlflow_log_input(self, cell, cell_idx): + mlflow.log_text(cell.source, f"cell_{cell_idx}_input.txt") + + def mlflow_log_output(self, cell, cell_idx): + if cell.cell_type == "code" and hasattr(cell, "outputs"): + self._log_code_output(cell_idx, cell.outputs) + elif cell.cell_type == "markdown": + self._log_markdown_output(cell, cell_idx) + + def _log_code_output(self, cell_idx, outputs): + for output_idx, output in enumerate(outputs): + if output.output_type == "stream": + self._log_stream_output(cell_idx, output_idx, output) + elif hasattr(output, "data"): + for output_data_idx, output_data in enumerate(output.data): + if output_data == "text/plain": + mlflow.log_text( + output.data[output_data], + f"cell_{cell_idx}_output_{output_data_idx}.txt", + ) + elif output_data == "text/html": + self._log_html_output(output, cell_idx, output_data_idx) + elif output_data == "application/pdf": + self._log_pdf_output(output, cell_idx, output_data_idx) + elif output_data.startswith("image"): + self._log_image_output(output, cell_idx, output_data_idx, output_data) + + def _log_stream_output(self, cell_idx, output_idx, output): + mlflow.log_text("".join(output.text), f"cell_{cell_idx}_output_{output_idx}.txt") + + def _log_html_output(self, output, cell_idx, output_idx): + if "text/html" in output.data: + html_content = output.data["text/html"] + if isinstance(html_content, list): + html_content = "".join(html_content) + mlflow.log_text(html_content, f"cell_{cell_idx}_output_{output_idx}.html") + + def _log_pdf_output(self, output, cell_idx, output_idx): + pdf_data = base64.b64decode(output.data["application/pdf"].split(",")[1]) + with open(f"cell_{cell_idx}_output_{output_idx}.pdf", "wb") as pdf_file: + pdf_file.write(pdf_data) + mlflow.log_artifact(f"cell_{cell_idx}_output_{output_idx}.pdf") + + def _log_image_output(self, output, cell_idx, output_idx, mime_type): + image_data_str = output.data[mime_type] + if "," in image_data_str: + image_data_base64 = image_data_str.split(",")[1] + else: + image_data_base64 = image_data_str + + try: + image_data = base64.b64decode(image_data_base64) + image_extension = mime_type.split("/")[1] + filename = f"cell_{cell_idx}_output_{output_idx}.{image_extension}" + with open(filename, "wb") as image_file: + image_file.write(image_data) + mlflow.log_artifact(filename) + os.remove(filename) + except Exception as e: + print(f"Error logging image output in cell {cell_idx}, output {output_idx}: {e}") + + def _log_markdown_output(self, cell, cell_idx): + mlflow.log_text(cell.source, f"cell_{cell_idx}_output_0.md") def supported_features(cls) -> Dict[JobFeature, bool]: return { diff --git a/jupyter_scheduler/scheduler.py b/jupyter_scheduler/scheduler.py index b4b98381..39572ef9 100644 --- a/jupyter_scheduler/scheduler.py +++ b/jupyter_scheduler/scheduler.py @@ -3,8 +3,9 @@ import random import shutil from typing import Dict, List, Optional, Type, Union +import signal import subprocess -from typing import Dict, Optional, Type, Union +import sys from uuid import uuid4 import fsspec @@ -408,7 +409,7 @@ class Scheduler(BaseScheduler): task_runner = Instance(allow_none=True, klass="jupyter_scheduler.task_runner.BaseTaskRunner") def start_mlflow_server(self): - subprocess.Popen( + mlflow_process = subprocess.Popen( [ "mlflow", "server", @@ -416,9 +417,23 @@ def start_mlflow_server(self): MLFLOW_SERVER_HOST, "--port", MLFLOW_SERVER_PORT, - ] + ], + preexec_fn=os.setsid, ) mlflow.set_tracking_uri(MLFLOW_SERVER_URI) + return mlflow_process + + def stop_mlflow_server(self): + if self.mlflow_process is not None: + os.killpg(os.getpgid(self.mlflow_process.pid), signal.SIGTERM) + self.mlflow_process.wait() + self.mlflow_process = None + print("MLFlow server stopped") + + def mlflow_signal_handler(self, signum, frame): + print("Shutting down MLFlow server") + self.stop_mlflow_server() + sys.exit(0) def __init__( self, @@ -435,7 +450,9 @@ def __init__( if self.task_runner_class: self.task_runner = self.task_runner_class(scheduler=self, config=config) - self.start_mlflow_server() + self.mlflow_process = self.start_mlflow_server() + signal.signal(signal.SIGINT, self.mlflow_signal_handler) + signal.signal(signal.SIGTERM, self.mlflow_signal_handler) @property def db_session(self): From 296908917f9dd389a1da0055f3bc6149f299c3d4 Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Tue, 5 Mar 2024 10:35:55 -0800 Subject: [PATCH 09/13] use input filename with extension in the name of the experiment and run --- jupyter_scheduler/scheduler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/jupyter_scheduler/scheduler.py b/jupyter_scheduler/scheduler.py index 39572ef9..07125c44 100644 --- a/jupyter_scheduler/scheduler.py +++ b/jupyter_scheduler/scheduler.py @@ -507,12 +507,12 @@ def create_job(self, model: CreateJob) -> str: if model.job_definition_id and model.mlflow_experiment_id: experiment_id = model.mlflow_experiment_id else: - experiment_id = mlflow_client.create_experiment(f"{model.name}-{uuid4()}") + experiment_id = mlflow_client.create_experiment(f"{model.input_filename}-{uuid4()}") model.mlflow_experiment_id = experiment_id input_file_path = os.path.join(self.root_dir, model.input_uri) mlflow.log_artifact(input_file_path, "input") - mlflow_run = mlflow_client.create_run(experiment_id=experiment_id, run_name=model.name) + mlflow_run = mlflow_client.create_run(experiment_id=experiment_id, run_name=model.input_filename) model.mlflow_run_id = mlflow_run.info.run_id job = Job(**model.dict(exclude_none=True, exclude={"input_uri"})) @@ -663,7 +663,7 @@ def create_job_definition(self, model: CreateJobDefinition) -> str: raise InputUriError(model.input_uri) mlflow_client = mlflow.MlflowClient() - experiment_id = mlflow_client.create_experiment(f"{model.name}-{uuid4()}") + experiment_id = mlflow_client.create_experiment(f"{model.input_filename}-{uuid4()}") model.mlflow_experiment_id = experiment_id input_file_path = os.path.join(self.root_dir, model.input_uri) mlflow.log_artifact(input_file_path, "input") From 9c3a0f9fce52fbac62705607a3e4048b1632413e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 5 Mar 2024 18:36:10 +0000 Subject: [PATCH 10/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- jupyter_scheduler/scheduler.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/jupyter_scheduler/scheduler.py b/jupyter_scheduler/scheduler.py index 07125c44..2a019f91 100644 --- a/jupyter_scheduler/scheduler.py +++ b/jupyter_scheduler/scheduler.py @@ -512,7 +512,9 @@ def create_job(self, model: CreateJob) -> str: input_file_path = os.path.join(self.root_dir, model.input_uri) mlflow.log_artifact(input_file_path, "input") - mlflow_run = mlflow_client.create_run(experiment_id=experiment_id, run_name=model.input_filename) + mlflow_run = mlflow_client.create_run( + experiment_id=experiment_id, run_name=model.input_filename + ) model.mlflow_run_id = mlflow_run.info.run_id job = Job(**model.dict(exclude_none=True, exclude={"input_uri"})) From 120718bb841a722242f1a0fe03c4f1b339a7be11 Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Tue, 5 Mar 2024 10:38:23 -0800 Subject: [PATCH 11/13] add uuid4 to run name --- jupyter_scheduler/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jupyter_scheduler/scheduler.py b/jupyter_scheduler/scheduler.py index 2a019f91..5dbbe3d5 100644 --- a/jupyter_scheduler/scheduler.py +++ b/jupyter_scheduler/scheduler.py @@ -513,7 +513,7 @@ def create_job(self, model: CreateJob) -> str: mlflow.log_artifact(input_file_path, "input") mlflow_run = mlflow_client.create_run( - experiment_id=experiment_id, run_name=model.input_filename + experiment_id=experiment_id, run_name=f"{model.input_filename}-{uuid4()}" ) model.mlflow_run_id = mlflow_run.info.run_id From 2bdacc6b4cb6920dac3ebb0b03eebdf1a28e16b5 Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Tue, 27 Feb 2024 00:58:11 -0800 Subject: [PATCH 12/13] fix CreateJob --- src/mainviews/create-job.tsx | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/mainviews/create-job.tsx b/src/mainviews/create-job.tsx index 3885b6ac..7b024258 100644 --- a/src/mainviews/create-job.tsx +++ b/src/mainviews/create-job.tsx @@ -176,7 +176,6 @@ export function CreateJob(props: ICreateJobProps): JSX.Element { const handleInputChange = (event: ChangeEvent) => { const target = event.target; - const parameterNameIdx = parameterNameMatch(target.name); const parameterValueIdx = parameterValueMatch(target.name); const newParams = props.model.parameters || []; @@ -323,7 +322,10 @@ export function CreateJob(props: ICreateJobProps): JSX.Element { idempotency_token: props.model.idempotencyToken, tags: props.model.tags, runtime_environment_parameters: props.model.runtimeEnvironmentParameters, - package_input_folder: props.model.packageInputFolder + package_input_folder: props.model.packageInputFolder, + mlflow_logging: props.model.mlflowLogging, + mlflow_experiment_id: props.model.mlflowExperimentId, + mlflow_run_id: props.model.mlflowRunId }; if (props.model.parameters !== undefined) { @@ -372,7 +374,9 @@ export function CreateJob(props: ICreateJobProps): JSX.Element { runtime_environment_parameters: props.model.runtimeEnvironmentParameters, schedule: props.model.schedule, timezone: props.model.timezone, - package_input_folder: props.model.packageInputFolder + package_input_folder: props.model.packageInputFolder, + mlflow_logging: props.model.mlflowLogging, + mlflow_experiment_id: props.model.mlflowExperimentId }; if (props.model.parameters !== undefined) { From 7ea2bcff4e8487f08a0a0e6c2d0ed60587ae2eaf Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 20 Aug 2024 18:23:57 +0000 Subject: [PATCH 13/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- jupyter_scheduler/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jupyter_scheduler/scheduler.py b/jupyter_scheduler/scheduler.py index 5dbbe3d5..cbf41fc1 100644 --- a/jupyter_scheduler/scheduler.py +++ b/jupyter_scheduler/scheduler.py @@ -2,10 +2,10 @@ import os import random import shutil -from typing import Dict, List, Optional, Type, Union import signal import subprocess import sys +from typing import Dict, List, Optional, Type, Union from uuid import uuid4 import fsspec