Skip to content

Commit 383a37e

Browse files
committed
copy staging folder to output folder after job runs (SUCESS or FAILURE)
1 parent f55e355 commit 383a37e

File tree

8 files changed

+149
-38
lines changed

8 files changed

+149
-38
lines changed

jupyter_scheduler/executors.py

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@
1111
import nbformat
1212
from nbconvert.preprocessors import CellExecutionError, ExecutePreprocessor
1313

14-
from jupyter_scheduler.models import DescribeJob, JobFeature, Status
14+
from jupyter_scheduler.models import DescribeJob, JobFeature, JobFile, Status
1515
from jupyter_scheduler.orm import Job, create_session
1616
from jupyter_scheduler.parameterize import add_parameters
17-
from jupyter_scheduler.utils import get_utc_timestamp
17+
from jupyter_scheduler.utils import copy_directory, get_utc_timestamp
1818

1919

2020
class ExecutionManager(ABC):
@@ -29,11 +29,19 @@ class ExecutionManager(ABC):
2929
_model = None
3030
_db_session = None
3131

32-
def __init__(self, job_id: str, root_dir: str, db_url: str, staging_paths: Dict[str, str]):
32+
def __init__(
33+
self,
34+
job_id: str,
35+
root_dir: str,
36+
db_url: str,
37+
staging_paths: Dict[str, str],
38+
output_dir: str,
39+
):
3340
self.job_id = job_id
3441
self.staging_paths = staging_paths
3542
self.root_dir = root_dir
3643
self.db_url = db_url
44+
self.output_dir = output_dir
3745

3846
@property
3947
def model(self):
@@ -131,13 +139,13 @@ def execute(self):
131139
if job.parameters:
132140
nb = add_parameters(nb, job.parameters)
133141

134-
notebook_dir = os.path.dirname(self.staging_paths["input"])
142+
staging_dir = os.path.dirname(self.staging_paths["input"])
135143
ep = ExecutePreprocessor(
136-
kernel_name=nb.metadata.kernelspec["name"], store_widget_state=True, cwd=notebook_dir
144+
kernel_name=nb.metadata.kernelspec["name"], store_widget_state=True, cwd=staging_dir
137145
)
138146

139147
try:
140-
ep.preprocess(nb, {"metadata": {"path": notebook_dir}})
148+
ep.preprocess(nb, {"metadata": {"path": staging_dir}})
141149
except CellExecutionError as e:
142150
raise e
143151
finally:
@@ -147,6 +155,22 @@ def execute(self):
147155
with fsspec.open(self.staging_paths[output_format], "w", encoding="utf-8") as f:
148156
f.write(output)
149157

158+
self.copy_staged_files_to_output()
159+
160+
def copy_staged_files_to_output(self):
161+
"""Copies snapshot of the original notebook and staged input files from the staging directory to the output directory and includes them into job_files."""
162+
staging_dir = os.path.dirname(self.staging_paths["input"])
163+
copied_files = copy_directory(
164+
source_dir=staging_dir, destination_dir=self.output_dir, base_dir=self.root_dir
165+
)
166+
167+
if copied_files:
168+
for rel_path in copied_files:
169+
if not any(job_file.file_path == rel_path for job_file in self.model.job_files):
170+
self.model.job_files.append(
171+
JobFile(display_name="File", file_format="File", file_path=rel_path)
172+
)
173+
150174
def supported_features(cls) -> Dict[JobFeature, bool]:
151175
return {
152176
JobFeature.job_name: True,

jupyter_scheduler/job_files_manager.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ async def copy_from_staging(self, job_id: str, redownload: Optional[bool] = Fals
2121
job = await ensure_async(self.scheduler.get_job(job_id, False))
2222
staging_paths = await ensure_async(self.scheduler.get_staging_paths(job))
2323
output_filenames = self.scheduler.get_job_filenames(job)
24-
output_dir = self.scheduler.get_local_output_path(job)
24+
output_dir = self.scheduler.get_local_output_path(
25+
input_filename=job.input_filename, job_id=job.job_id, root_dir_relative=True
26+
)
2527

2628
p = Process(
2729
target=Downloader(
@@ -30,6 +32,7 @@ async def copy_from_staging(self, job_id: str, redownload: Optional[bool] = Fals
3032
staging_paths=staging_paths,
3133
output_dir=output_dir,
3234
redownload=redownload,
35+
include_staging_files=job.package_input_folder,
3336
).download
3437
)
3538
p.start()
@@ -43,22 +46,34 @@ def __init__(
4346
staging_paths: Dict[str, str],
4447
output_dir: str,
4548
redownload: bool,
49+
include_staging_files: bool = False,
4650
):
4751
self.output_formats = output_formats
4852
self.output_filenames = output_filenames
4953
self.staging_paths = staging_paths
5054
self.output_dir = output_dir
5155
self.redownload = redownload
56+
self.include_staging_files = include_staging_files
5257

5358
def generate_filepaths(self):
5459
"""A generator that produces filepaths"""
55-
output_formats = self.output_formats + ["input"]
56-
57-
for output_format in output_formats:
58-
input_filepath = self.staging_paths[output_format]
59-
output_filepath = os.path.join(self.output_dir, self.output_filenames[output_format])
60-
if not os.path.exists(output_filepath) or self.redownload:
61-
yield input_filepath, output_filepath
60+
if self.include_staging_files:
61+
staging_dir = os.path.dirname(self.staging_paths["input"])
62+
for root, _, files in os.walk(staging_dir):
63+
for file in files:
64+
input_filepath = os.path.join(root, file)
65+
relative_path = os.path.relpath(input_filepath, staging_dir)
66+
output_filepath = os.path.join(self.output_dir, relative_path)
67+
yield input_filepath, output_filepath
68+
else:
69+
output_formats = self.output_formats + ["input"]
70+
for output_format in output_formats:
71+
input_filepath = self.staging_paths[output_format]
72+
output_filepath = os.path.join(
73+
self.output_dir, self.output_filenames[output_format]
74+
)
75+
if not os.path.exists(output_filepath) or self.redownload:
76+
yield input_filepath, output_filepath
6277

6378
def download_tar(self, archive_format: str = "tar"):
6479
archive_filepath = self.staging_paths[archive_format]

jupyter_scheduler/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ class DescribeJob(BaseModel):
147147
status_message: Optional[str] = None
148148
downloaded: bool = False
149149
package_input_folder: Optional[bool] = None
150+
output_folder: Optional[str] = None
150151

151152
class Config:
152153
orm_mode = True

jupyter_scheduler/scheduler.py

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
UpdateJobDefinition,
4040
)
4141
from jupyter_scheduler.orm import Job, JobDefinition, create_session
42-
from jupyter_scheduler.utils import create_output_directory, create_output_filename
42+
from jupyter_scheduler.utils import copy_directory, create_output_directory, create_output_filename
4343

4444

4545
class BaseScheduler(LoggingConfigurable):
@@ -289,7 +289,10 @@ def add_job_files(self, model: DescribeJob):
289289
mapping = self.environments_manager.output_formats_mapping()
290290
job_files = []
291291
output_filenames = self.get_job_filenames(model)
292-
output_dir = os.path.relpath(self.get_local_output_path(model), self.root_dir)
292+
output_dir = self.get_local_output_path(
293+
input_filename=model.input_filename, job_id=model.job_id, root_dir_relative=True
294+
)
295+
293296
for output_format in model.output_formats:
294297
filename = output_filenames[output_format]
295298
output_path = os.path.join(output_dir, filename)
@@ -316,13 +319,20 @@ def add_job_files(self, model: DescribeJob):
316319
model.job_files = job_files
317320
model.downloaded = all(job_file.file_path for job_file in job_files)
318321

319-
def get_local_output_path(self, model: DescribeJob) -> str:
322+
def get_local_output_path(
323+
self, input_filename: str, job_id: str, root_dir_relative: Optional[bool] = False
324+
) -> str:
320325
"""Returns the local output directory path
321326
where all the job files will be downloaded
322327
from the staging location.
323328
"""
324-
output_dir_name = create_output_directory(model.input_filename, model.job_id)
325-
return os.path.join(self.root_dir, self.output_directory, output_dir_name)
329+
output_dir_name = create_output_directory(input_filename, job_id)
330+
if root_dir_relative:
331+
return os.path.relpath(
332+
os.path.join(self.root_dir, self.output_directory, output_dir_name), self.root_dir
333+
)
334+
else:
335+
return os.path.join(self.root_dir, self.output_directory, output_dir_name)
326336

327337

328338
class Scheduler(BaseScheduler):
@@ -375,20 +385,10 @@ def copy_input_folder(self, input_uri: str, nb_copy_to_path: str):
375385
"""Copies the input file along with the input directory to the staging directory"""
376386
input_dir_path = os.path.dirname(os.path.join(self.root_dir, input_uri))
377387
staging_dir = os.path.dirname(nb_copy_to_path)
378-
379-
# Copy the input file
380-
self.copy_input_file(input_uri, nb_copy_to_path)
381-
382-
# Copy the rest of the input folder excluding the input file
383-
for item in os.listdir(input_dir_path):
384-
source = os.path.join(input_dir_path, item)
385-
destination = os.path.join(staging_dir, item)
386-
if os.path.isdir(source):
387-
shutil.copytree(source, destination)
388-
elif os.path.isfile(source) and item != os.path.basename(input_uri):
389-
with fsspec.open(source) as src_file:
390-
with fsspec.open(destination, "wb") as output_file:
391-
output_file.write(src_file.read())
388+
copy_directory(
389+
source_dir=input_dir_path,
390+
destination_dir=staging_dir,
391+
)
392392

393393
def create_job(self, model: CreateJob) -> str:
394394
if not model.job_definition_id and not self.file_exists(model.input_uri):
@@ -439,6 +439,10 @@ def create_job(self, model: CreateJob) -> str:
439439
staging_paths=staging_paths,
440440
root_dir=self.root_dir,
441441
db_url=self.db_url,
442+
output_dir=self.get_local_output_path(
443+
input_filename=model.input_filename,
444+
job_id=job.job_id,
445+
),
442446
).process
443447
)
444448
p.start()
@@ -489,6 +493,10 @@ def list_jobs(self, query: ListJobsQuery) -> ListJobsResponse:
489493
for job in jobs:
490494
model = DescribeJob.from_orm(job)
491495
self.add_job_files(model=model)
496+
if model.package_input_folder:
497+
model.output_folder = self.get_local_output_path(
498+
input_filename=model.input_filename, job_id=model.job_id, root_dir_relative=True
499+
)
492500
jobs_list.append(model)
493501

494502
list_jobs_response = ListJobsResponse(

jupyter_scheduler/utils.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import json
22
import os
3-
import pathlib
43
from datetime import datetime, timezone
5-
from typing import Optional
4+
import shutil
5+
from typing import List, Optional
66
from uuid import UUID
77

8+
import fsspec
89
import pytz
910
from croniter import croniter
1011
from nbformat import NotebookNode
@@ -84,3 +85,34 @@ def get_localized_timestamp(timezone) -> int:
8485
tz = pytz.timezone(timezone)
8586
local_date = datetime.now(tz=tz)
8687
return int(local_date.timestamp() * 1000)
88+
89+
90+
def copy_directory(
91+
source_dir: str,
92+
destination_dir: str,
93+
base_dir: Optional[str] = None,
94+
exclude_files: Optional[List[str]] = [],
95+
) -> List[str]:
96+
"""Copies content of source_dir to destination_dir excluding exclude_files.
97+
Returns a list of relative paths to copied files, relative to base_dir if provided.
98+
"""
99+
copied_files = []
100+
for item in os.listdir(source_dir):
101+
source = os.path.join(source_dir, item)
102+
destination = os.path.join(destination_dir, item)
103+
if os.path.isdir(source):
104+
shutil.copytree(source, destination, ignore=shutil.ignore_patterns(*exclude_files))
105+
for dirpath, _, filenames in os.walk(destination):
106+
for filename in filenames:
107+
rel_path = os.path.relpath(
108+
os.path.join(dirpath, filename), base_dir if base_dir else destination_dir
109+
)
110+
copied_files.append(rel_path)
111+
elif os.path.isfile(source) and item not in exclude_files:
112+
with fsspec.open(source, "rb") as source_file:
113+
with fsspec.open(destination, "wb") as output_file:
114+
output_file.write(source_file.read())
115+
rel_path = os.path.relpath(destination, base_dir if base_dir else destination_dir)
116+
copied_files.append(rel_path)
117+
118+
return copied_files

src/components/job-row.tsx

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,33 @@ function JobFiles(props: {
7575
);
7676
}
7777

78+
function FilesDirectoryLink(props: {
79+
job: Scheduler.IDescribeJob;
80+
app: JupyterFrontEnd;
81+
}): JSX.Element | null {
82+
if (!props.job.package_input_folder || !props.job.output_folder) {
83+
return null;
84+
}
85+
const trans = useTranslator('jupyterlab');
86+
return (
87+
<Link
88+
href={`/lab/tree/${props.job.output_folder}`}
89+
title={trans.__(
90+
'Open output directory with files for "%1"',
91+
props.job.name
92+
)}
93+
onClick={e => {
94+
e.preventDefault();
95+
props.app.commands.execute('filebrowser:open-path', {
96+
path: props.job.output_folder
97+
});
98+
}}
99+
>
100+
{trans.__('Files')}
101+
</Link>
102+
);
103+
}
104+
78105
type DownloadFilesButtonProps = {
79106
app: JupyterFrontEnd;
80107
job: Scheduler.IDescribeJob;
@@ -90,7 +117,7 @@ function DownloadFilesButton(props: DownloadFilesButtonProps) {
90117
return (
91118
<IconButton
92119
aria-label="download"
93-
title={trans.__('Download job files for "%1"', props.job.name)}
120+
title={trans.__('Download output files for "%1"', props.job.name)}
94121
disabled={downloading}
95122
onClick={async () => {
96123
setDownloading(true);
@@ -167,6 +194,9 @@ export function buildJobRow(
167194
/>
168195
)}
169196
<JobFiles job={job} app={app} />
197+
{(job.status === 'COMPLETED' || job.status === 'FAILED') && (
198+
<FilesDirectoryLink job={job} app={app} />
199+
)}
170200
</>,
171201
<Timestamp job={job} />,
172202
translateStatus(job.status),

src/handler.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,8 @@ export namespace Scheduler {
466466
start_time?: number;
467467
end_time?: number;
468468
downloaded: boolean;
469-
packageInputFolder?: boolean;
469+
package_input_folder?: boolean;
470+
output_folder?: string;
470471
}
471472

472473
export interface ICreateJobResponse {

src/model.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ export function convertDescribeJobtoJobDetail(
388388
startTime: describeJob.start_time,
389389
endTime: describeJob.end_time,
390390
downloaded: describeJob.downloaded,
391-
packageInputFolder: describeJob.packageInputFolder
391+
packageInputFolder: describeJob.package_input_folder
392392
};
393393
}
394394

0 commit comments

Comments
 (0)