Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 189 #208

Merged
merged 6 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions scabha/cargo.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ class Parameter(object):
# if true, treat parameter as a path, and ensure that the parent directories it refers to exist
mkdir: bool = False

# if True, and parameter is a path, access to its parent directory is required
access_parent_dir: bool = False
# if True, and parameter is a path, access to its parent directory is required in writable mode
write_parent_dir: bool = False

# for file and dir-type parameters: if True, the file(s)/dir(s) must exist. If False, they can be missing.
# if None, then the default logic applies: inputs must exist, and outputs don't
must_exist: Optional[bool] = None
Expand Down
3 changes: 2 additions & 1 deletion stimela/backends/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def build(self, cab: 'stimela.kitchen.cab.Cab', log: logging.Logger, rebuild=Fal
command_wrapper=self.build_command_wrapper)


def validate_backend_settings(backend_opts: Dict[str, Any]) -> BackendWrapper:
def validate_backend_settings(backend_opts: Dict[str, Any], log: logging.Logger) -> BackendWrapper:
"""Checks that backend settings refer to a valid backend

Returs tuple of options, main, wrapper, where 'main' the the main backend, and 'wrapper' is an optional wrapper backend
Expand Down Expand Up @@ -63,6 +63,7 @@ def validate_backend_settings(backend_opts: Dict[str, Any]) -> BackendWrapper:
raise BackendError(f"can't combine slurm with {backend_name} backend")
is_remote = True
is_remote_fs = False
backend_opts.slurm.validate(log)
run_command_wrapper = backend_opts.slurm.run_command_wrapper
build_command_wrapper = backend_opts.slurm.build_command_wrapper
else:
Expand Down
39 changes: 14 additions & 25 deletions stimela/backends/singularity.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,26 +100,6 @@ def get_image_info(cab: 'stimela.kitchen.cab.Cab', backend: 'stimela.backend.Sti
return image_name, simg_path, True


def build_command_line(cab: 'stimela.kitchen.cab.Cab', backend: 'stimela.backend.StimelaBackendOptions',
params: Dict[str, Any],
binds: List[Any],
subst: Optional[Dict[str, Any]] = None,
binary: Optional[str] = None,
simg_path: Optional[str] = None):
args = cab.flavour.get_arguments(cab, params, subst, check_executable=False)

if simg_path is None:
_, simg_path = get_image_info(cab, backend)

cwd = os.getcwd()

return [binary or backend.singularity.executable or BINARY,
"exec",
"--containall",
"--bind", f"{cwd}:{cwd}",
"--pwd", cwd,
simg_path] + args

def build(cab: 'stimela.kitchen.cab.Cab', backend: 'stimela.backend.StimelaBackendOptions', log: logging.Logger,
command_wrapper: Optional[Callable]=None,
build=True, rebuild=False):
Expand Down Expand Up @@ -219,7 +199,7 @@ def build(cab: 'stimela.kitchen.cab.Cab', backend: 'stimela.backend.StimelaBacke
args = [BINARY, "build", simg_path, f"docker://{image_name}"]

if command_wrapper:
args = command_wrapper(args)
args = command_wrapper(args, log=log)

retcode = xrun(args[0], args[1:], shell=False, log=log,
return_errcode=True, command_name="(singularity build)",
Expand Down Expand Up @@ -254,6 +234,8 @@ def run(cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], fqname: str,
Returns:
Any: return value (e.g. exit code) of content
"""
from .utils import resolve_required_mounts

native.update_rlimits(backend.rlimits, log)

# get path to image, rebuilding if backend options allow this
Expand All @@ -264,9 +246,16 @@ def run(cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], fqname: str,
args = [backend.singularity.executable or BINARY,
"exec",
"--containall",
"--bind", f"{cwd}:{cwd}",
"--pwd", cwd,
simg_path]
"--pwd", cwd]

# initial set of mounts has cwd as read-write
mounts = {cwd: True}
# get extra required filesystem bindings
resolve_required_mounts(mounts, params, cab.inputs, cab.outputs)
for path, rw in mounts.items():
args += ["--bind", f"{path}:{path}:{'rw' if rw else 'ro'}"]

args += [simg_path]
args += cab.flavour.get_arguments(cab, params, subst, check_executable=False)
log.debug(f"command line is {args}")

Expand All @@ -283,7 +272,7 @@ def elapsed(since=None):
# log.info(f"argument lengths are {[len(a) for a in args]}")

if command_wrapper:
args = command_wrapper(args, fqname=fqname)
args = command_wrapper(args, fqname=fqname, log=log)

retcode = xrun(args[0], args[1:], shell=False, log=log,
output_wrangler=cabstat.apply_wranglers,
Expand Down
19 changes: 14 additions & 5 deletions stimela/backends/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from omegaconf import OmegaConf
from typing import Dict, List, Any, Optional, Tuple
from contextlib import ExitStack
from scabha.basetypes import EmptyListDefault, EmptyDictDefault
from scabha.basetypes import EmptyListDefault, EmptyDictDefault, ListDefault
import datetime
from stimela.utils.xrun_asyncio import xrun

Expand All @@ -25,6 +25,8 @@ class SlurmOptions(object):
srun_path: Optional[str] = None # path to srun executable
srun_opts: Dict[str, str] = EmptyDictDefault() # extra options passed to srun. "--" prepended, and "_" replaced by "-"
build_local = True # if True, images will be built locally (i.e. on the head node) even when slurm is enabled
# these will be checked for
required_mem_opts: Optional[List[str]] = ListDefault("mem", "mem-per-cpu", "mem-per-gpu")

def get_executable(self):
global _default_srun_path
Expand All @@ -41,23 +43,30 @@ def get_executable(self):
raise BackendError(f"slurm.srun_path '{self.srun}' is not an executable")
return self.srun

def run_command_wrapper(self, args: List[str], fqname: Optional[str]=None) -> List[str]:
def run_command_wrapper(self, args: List[str], fqname: Optional[str]=None, log: Optional[logging.Logger]=None) -> List[str]:
output_args = [self.get_executable()]

if fqname is not None:
output_args += ["-J", fqname]

# add all base options that have been specified
for name, value in self.srun_opts.items():
output_args += ["--" + name.replace("_", "-"), value]
output_args += ["--" + name, value]

output_args += args
return output_args

def build_command_wrapper(self, args: List[str], fqname: Optional[str]=None) -> List[str]:
def build_command_wrapper(self, args: List[str], fqname: Optional[str]=None, log: Optional[logging.Logger]=None) -> List[str]:
if self.build_local:
return args
return self.run_command_wrapper(args, fqname=fqname)
return self.run_command_wrapper(args, fqname=fqname, log=log)

def validate(self, log: logging.Logger):
if self.required_mem_opts:
if not set(self.srun_opts.keys()).intersection(self.required_mem_opts):
raise BackendError(f"slurm.srun_opts must set one of the following: {', '.join(self.required_mem_opts)}")




SlurmOptionsSchema = OmegaConf.structured(SlurmOptions)
Expand Down
71 changes: 34 additions & 37 deletions stimela/backends/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,20 @@

## commenting out for now -- will need to fix when we reactive the kube backend (and have tests for it)

def resolve_required_mounts(params: Dict[str, Any],
def resolve_required_mounts(mounts: Dict[str, bool],
params: Dict[str, Any],
inputs: Dict[str, Parameter],
outputs: Dict[str, Parameter],
prior_mounts: Dict[str, bool]):
):

mkdirs = {}
targets = {}

# helper function to accumulate list of target paths to be mounted
def add_target(path, must_exist, readwrite):
def add_target(param_name, path, must_exist, readwrite):
if must_exist and not os.path.exists(path):
raise SchemaError(f"{path} does not exist.")

raise SchemaError(f"parameter '{param_name}': path '{path}' does not exist")
path = os.path.abspath(path)

# if path doesn't exist, mount parent dir as read/write (file will be created in there)
if not os.path.lexists(path):
add_target(os.path.dirname(path), must_exist=True, readwrite=True)
# else path is real
else:
# already mounted? Make sure readwrite is updated
if path in targets:
targets[path] = targets[path] or readwrite
else:
# not mounted, but is a link
if os.path.islink(path):
# add destination as target
add_target(os.path.realpath(path), must_exist=must_exist, readwrite=readwrite)
# add parent dir as readonly target (to resolve the symlink)
add_target(os.path.dirname(path), must_exist=True, readwrite=False)
# add to mounts
else:
targets[path] = readwrite
mounts[path] = mounts.get(path) or readwrite

# go through parameters and accumulate target paths
for name, value in params.items():
Expand All @@ -56,33 +37,49 @@ def add_target(path, must_exist, readwrite):
else:
continue

must_exist = schema.must_exist
if must_exist is None:
must_exist = name in inputs
must_exist = schema.must_exist and name in inputs
readwrite = schema.writable or name in outputs

# for symlink targets, we need to mount the parent directory
for path in files:
add_target(path, must_exist=must_exist, readwrite=readwrite)
# check for s3:// MS references and skip them
if path.startswith("s3://") or path.startswith("S3://"):
continue
path = os.path.abspath(path).rstrip("/")
realpath = os.path.abspath(os.path.realpath(path))
# check if parent directory access is required
if schema.access_parent_dir or schema.write_parent_dir:
add_target(name, os.path.dirname(path), must_exist=True, readwrite=schema.write_parent_dir)
add_target(name, os.path.dirname(realpath), must_exist=True, readwrite=schema.write_parent_dir)
# for symlink targets, we need to mount the parent directory of the link too
if os.path.islink(path):
# if target is a real directory, mount it directly
if os.path.isdir(realpath):
add_target(name, realpath, must_exist=True, readwrite=readwrite)
# otherwise mount its parent to allow creation of symlink target
else:
add_target(name, os.path.dirname(realpath), must_exist=True, readwrite=readwrite)
# for actual targets, mount the parent, for dirs, mount the dir
else:
if os.path.isdir(path):
add_target(name, path, must_exist=must_exist, readwrite=readwrite)
else:
add_target(name, os.path.dirname(path), must_exist=True, readwrite=readwrite)


# now eliminate unnecessary targets (those that have a parent mount with the same read/write property)
# now eliminate unnecessary mounts (those that have a parent mount with no lower read/write privileges)
skip_targets = set()

for path, readwrite in targets.items():
for path, readwrite in mounts.items():
parent = os.path.dirname(path)
while parent != "/":
# if parent already mounted, and is as writeable as us, skip us
if (parent in targets and targets[parent] >= readwrite) or \
(parent in prior_mounts and prior_mounts[parent] >= readwrite):
if parent in mounts and mounts[parent] >= readwrite:
skip_targets.add(path)
break
parent = os.path.dirname(parent)

for path in skip_targets:
targets.pop(path)

return targets
mounts.pop(path)


def resolve_remote_mounts(params: Dict[str, Any],
Expand Down
20 changes: 9 additions & 11 deletions stimela/commands/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,27 @@
""",
no_args_is_help=True)
@click.option("-r", "--rebuild", is_flag=True,
help="""rebuilds all images from scratch.""")
help="""rebuilds all images from scratch. Default builds missing images only.""")
@click.option("-a", "--all-steps", is_flag=True,
help="""builds images for all steps. Default is to omit explicitly skipped steps.""")
@click.option("-s", "--step", "step_ranges", metavar="STEP(s)", multiple=True,
help="""only look at specific step(s) from the recipe. Use commas, or give multiple times to cherry-pick steps.
help="""only build images for specific step(s) from the recipe. Use commas, or give multiple times to cherry-pick steps.
Use [BEGIN]:[END] to specify a range of steps. Note that cherry-picking an individual step via this option
also impies --enable-step.""")
@click.option("-t", "--tags", "tags", metavar="TAG(s)", multiple=True,
help="""only look at steps wth the given tags (and also steps tagged as "always").
help="""only build images for steps wth the given tags (and also steps tagged as "always").
Use commas, or give multiple times for multiple tags.""")
@click.option("--skip-tags", "skip_tags", metavar="TAG(s)", multiple=True,
help="""explicitly skips steps wth the given tags.
Use commas, or give multiple times for multiple tags.""")
@click.option("-e", "--enable-step", "enable_steps", metavar="STEP(s)", multiple=True,
help="""Force-enable steps even if the recipe marks them as skipped. Use commas, or give multiple times
help="""Build image for step(s) even if the recipe marks them as skipped. Use commas, or give multiple times
for multiple steps.""")
@click.option("-a", "--assign", metavar="PARAM VALUE", nargs=2, multiple=True,
help="""assigns values to parameters: equivalent to PARAM=VALUE, but plays nicer with the shell's
tab completion.""")
@click.option("-l", "--last-recipe", is_flag=True,
help="""if multiple recipes are defined, selects the last one for building.""")
@click.argument("what", metavar="filename.yml|cab name")
def build(what: str, last_recipe: bool = False, rebuild: bool = False,
assign: List[Tuple[str, str]] = [],
def build(what: str, last_recipe: bool = False, rebuild: bool = False, all_steps: bool=False,
step_ranges: List[str] = [], tags: List[str] = [], skip_tags: List[str] = [], enable_steps: List[str] = []):
return run.callback(what, last_recipe=last_recipe, assign=assign, step_ranges=step_ranges,
return run.callback(what, last_recipe=last_recipe, step_ranges=step_ranges,
tags=tags, skip_tags=skip_tags, enable_steps=enable_steps,
build=True, rebuild=rebuild)
build=True, rebuild=rebuild, build_skips=all_steps)
25 changes: 13 additions & 12 deletions stimela/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def load_recipe_file(filename: str):
def run(what: str, parameters: List[str] = [], dry_run: bool = False, last_recipe: bool = False, profile: Optional[int] = None,
assign: List[Tuple[str, str]] = [],
step_ranges: List[str] = [], tags: List[str] = [], skip_tags: List[str] = [], enable_steps: List[str] = [],
build=False, rebuild=False):
build=False, rebuild=False, build_skips=False):

log = logger()
params = OrderedDict()
Expand Down Expand Up @@ -256,16 +256,17 @@ def convert_value(value):
sys.exit(1)

# select recipe substeps based on command line, and exit if nothing to run
selection_options = []
for opts in (tags, skip_tags, step_ranges, enable_steps):
selection_options.append(set(itertools.chain(*(opt.split(",") for opt in opts))))

try:
if not recipe.restrict_steps(*selection_options):
sys.exit(0)
except StepSelectionError as exc:
log_exception(exc)
sys.exit(2)
if not build_skips:
selection_options = []
for opts in (tags, skip_tags, step_ranges, enable_steps):
selection_options.append(set(itertools.chain(*(opt.split(",") for opt in opts))))

try:
if not recipe.restrict_steps(*selection_options):
sys.exit(0)
except StepSelectionError as exc:
log_exception(exc)
sys.exit(2)

logdir = stimelogging.get_logfile_dir(recipe.log) or '.'
log.info(f"recipe logs will be saved under {logdir}")
Expand All @@ -292,7 +293,7 @@ def elapsed():
# build the images
if build:
try:
outer_step.build(backend=stimela.CONFIG.opts.backend, rebuild=rebuild, log=log)
outer_step.build(backend=stimela.CONFIG.opts.backend, rebuild=rebuild, build_skips=build_skips, log=log)
except Exception as exc:
stimela.backends.close_backends(log)

Expand Down
4 changes: 2 additions & 2 deletions stimela/kitchen/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1128,14 +1128,14 @@ def _iterate_loop_worker(self, params, info, subst, backend, count, iter_var, ra

return task_stats.collect_stats(), outputs, exception, tb

def build(self, backend={}, rebuild=False, log: Optional[logging.Logger] = None):
def build(self, backend={}, rebuild=False, build_skips=False, log: Optional[logging.Logger] = None):
# set up backend
backend = OmegaConf.merge(backend, self.backend or {})
# build recursively
log = log or self.log
log.info(f"building image(s) for recipe '{self.fqname}'")
for step in self.steps.values():
step.build(backend, rebuild=rebuild, log=log)
step.build(backend, rebuild=rebuild, build_skips=build_skips, log=log)


def _run(self, params, subst=None, backend={}) -> Dict[str, Any]:
Expand Down
Loading
Loading