Skip to content

Commit

Permalink
Merge pull request #208 from caracal-pipeline/issue-189
Browse files Browse the repository at this point in the history
Issue 189
  • Loading branch information
o-smirnov authored Feb 2, 2024
2 parents 661e160 + 5c24f1f commit 9045b6d
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 76 deletions.
5 changes: 5 additions & 0 deletions scabha/cargo.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ class Parameter(object):
# if true, treat parameter as a path, and ensure that the parent directories it refers to exist
mkdir: bool = False

# if True, and parameter is a path, access to its parent directory is required
access_parent_dir: bool = False
# if True, and parameter is a path, access to its parent directory is required in writable mode
write_parent_dir: bool = False

# for file and dir-type parameters: if True, the file(s)/dir(s) must exist. If False, they can be missing.
# if None, then the default logic applies: inputs must exist, and outputs don't
must_exist: Optional[bool] = None
Expand Down
3 changes: 2 additions & 1 deletion stimela/backends/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def build(self, cab: 'stimela.kitchen.cab.Cab', log: logging.Logger, rebuild=Fal
command_wrapper=self.build_command_wrapper)


def validate_backend_settings(backend_opts: Dict[str, Any]) -> BackendWrapper:
def validate_backend_settings(backend_opts: Dict[str, Any], log: logging.Logger) -> BackendWrapper:
"""Checks that backend settings refer to a valid backend
Returs tuple of options, main, wrapper, where 'main' the the main backend, and 'wrapper' is an optional wrapper backend
Expand Down Expand Up @@ -63,6 +63,7 @@ def validate_backend_settings(backend_opts: Dict[str, Any]) -> BackendWrapper:
raise BackendError(f"can't combine slurm with {backend_name} backend")
is_remote = True
is_remote_fs = False
backend_opts.slurm.validate(log)
run_command_wrapper = backend_opts.slurm.run_command_wrapper
build_command_wrapper = backend_opts.slurm.build_command_wrapper
else:
Expand Down
39 changes: 14 additions & 25 deletions stimela/backends/singularity.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,26 +100,6 @@ def get_image_info(cab: 'stimela.kitchen.cab.Cab', backend: 'stimela.backend.Sti
return image_name, simg_path, True


def build_command_line(cab: 'stimela.kitchen.cab.Cab', backend: 'stimela.backend.StimelaBackendOptions',
params: Dict[str, Any],
binds: List[Any],
subst: Optional[Dict[str, Any]] = None,
binary: Optional[str] = None,
simg_path: Optional[str] = None):
args = cab.flavour.get_arguments(cab, params, subst, check_executable=False)

if simg_path is None:
_, simg_path = get_image_info(cab, backend)

cwd = os.getcwd()

return [binary or backend.singularity.executable or BINARY,
"exec",
"--containall",
"--bind", f"{cwd}:{cwd}",
"--pwd", cwd,
simg_path] + args

def build(cab: 'stimela.kitchen.cab.Cab', backend: 'stimela.backend.StimelaBackendOptions', log: logging.Logger,
command_wrapper: Optional[Callable]=None,
build=True, rebuild=False):
Expand Down Expand Up @@ -219,7 +199,7 @@ def build(cab: 'stimela.kitchen.cab.Cab', backend: 'stimela.backend.StimelaBacke
args = [BINARY, "build", simg_path, f"docker://{image_name}"]

if command_wrapper:
args = command_wrapper(args)
args = command_wrapper(args, log=log)

retcode = xrun(args[0], args[1:], shell=False, log=log,
return_errcode=True, command_name="(singularity build)",
Expand Down Expand Up @@ -254,6 +234,8 @@ def run(cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], fqname: str,
Returns:
Any: return value (e.g. exit code) of content
"""
from .utils import resolve_required_mounts

native.update_rlimits(backend.rlimits, log)

# get path to image, rebuilding if backend options allow this
Expand All @@ -264,9 +246,16 @@ def run(cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], fqname: str,
args = [backend.singularity.executable or BINARY,
"exec",
"--containall",
"--bind", f"{cwd}:{cwd}",
"--pwd", cwd,
simg_path]
"--pwd", cwd]

# initial set of mounts has cwd as read-write
mounts = {cwd: True}
# get extra required filesystem bindings
resolve_required_mounts(mounts, params, cab.inputs, cab.outputs)
for path, rw in mounts.items():
args += ["--bind", f"{path}:{path}:{'rw' if rw else 'ro'}"]

args += [simg_path]
args += cab.flavour.get_arguments(cab, params, subst, check_executable=False)
log.debug(f"command line is {args}")

Expand All @@ -283,7 +272,7 @@ def elapsed(since=None):
# log.info(f"argument lengths are {[len(a) for a in args]}")

if command_wrapper:
args = command_wrapper(args, fqname=fqname)
args = command_wrapper(args, fqname=fqname, log=log)

retcode = xrun(args[0], args[1:], shell=False, log=log,
output_wrangler=cabstat.apply_wranglers,
Expand Down
19 changes: 14 additions & 5 deletions stimela/backends/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from omegaconf import OmegaConf
from typing import Dict, List, Any, Optional, Tuple
from contextlib import ExitStack
from scabha.basetypes import EmptyListDefault, EmptyDictDefault
from scabha.basetypes import EmptyListDefault, EmptyDictDefault, ListDefault
import datetime
from stimela.utils.xrun_asyncio import xrun

Expand All @@ -25,6 +25,8 @@ class SlurmOptions(object):
srun_path: Optional[str] = None # path to srun executable
srun_opts: Dict[str, str] = EmptyDictDefault() # extra options passed to srun. "--" prepended, and "_" replaced by "-"
build_local = True # if True, images will be built locally (i.e. on the head node) even when slurm is enabled
# these will be checked for
required_mem_opts: Optional[List[str]] = ListDefault("mem", "mem-per-cpu", "mem-per-gpu")

def get_executable(self):
global _default_srun_path
Expand All @@ -41,23 +43,30 @@ def get_executable(self):
raise BackendError(f"slurm.srun_path '{self.srun}' is not an executable")
return self.srun

def run_command_wrapper(self, args: List[str], fqname: Optional[str]=None) -> List[str]:
def run_command_wrapper(self, args: List[str], fqname: Optional[str]=None, log: Optional[logging.Logger]=None) -> List[str]:
output_args = [self.get_executable()]

if fqname is not None:
output_args += ["-J", fqname]

# add all base options that have been specified
for name, value in self.srun_opts.items():
output_args += ["--" + name.replace("_", "-"), value]
output_args += ["--" + name, value]

output_args += args
return output_args

def build_command_wrapper(self, args: List[str], fqname: Optional[str]=None) -> List[str]:
def build_command_wrapper(self, args: List[str], fqname: Optional[str]=None, log: Optional[logging.Logger]=None) -> List[str]:
if self.build_local:
return args
return self.run_command_wrapper(args, fqname=fqname)
return self.run_command_wrapper(args, fqname=fqname, log=log)

def validate(self, log: logging.Logger):
if self.required_mem_opts:
if not set(self.srun_opts.keys()).intersection(self.required_mem_opts):
raise BackendError(f"slurm.srun_opts must set one of the following: {', '.join(self.required_mem_opts)}")




SlurmOptionsSchema = OmegaConf.structured(SlurmOptions)
Expand Down
71 changes: 34 additions & 37 deletions stimela/backends/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,20 @@

## commenting out for now -- will need to fix when we reactive the kube backend (and have tests for it)

def resolve_required_mounts(params: Dict[str, Any],
def resolve_required_mounts(mounts: Dict[str, bool],
params: Dict[str, Any],
inputs: Dict[str, Parameter],
outputs: Dict[str, Parameter],
prior_mounts: Dict[str, bool]):
):

mkdirs = {}
targets = {}

# helper function to accumulate list of target paths to be mounted
def add_target(path, must_exist, readwrite):
def add_target(param_name, path, must_exist, readwrite):
if must_exist and not os.path.exists(path):
raise SchemaError(f"{path} does not exist.")

raise SchemaError(f"parameter '{param_name}': path '{path}' does not exist")
path = os.path.abspath(path)

# if path doesn't exist, mount parent dir as read/write (file will be created in there)
if not os.path.lexists(path):
add_target(os.path.dirname(path), must_exist=True, readwrite=True)
# else path is real
else:
# already mounted? Make sure readwrite is updated
if path in targets:
targets[path] = targets[path] or readwrite
else:
# not mounted, but is a link
if os.path.islink(path):
# add destination as target
add_target(os.path.realpath(path), must_exist=must_exist, readwrite=readwrite)
# add parent dir as readonly target (to resolve the symlink)
add_target(os.path.dirname(path), must_exist=True, readwrite=False)
# add to mounts
else:
targets[path] = readwrite
mounts[path] = mounts.get(path) or readwrite

# go through parameters and accumulate target paths
for name, value in params.items():
Expand All @@ -56,33 +37,49 @@ def add_target(path, must_exist, readwrite):
else:
continue

must_exist = schema.must_exist
if must_exist is None:
must_exist = name in inputs
must_exist = schema.must_exist and name in inputs
readwrite = schema.writable or name in outputs

# for symlink targets, we need to mount the parent directory
for path in files:
add_target(path, must_exist=must_exist, readwrite=readwrite)
# check for s3:// MS references and skip them
if path.startswith("s3://") or path.startswith("S3://"):
continue
path = os.path.abspath(path).rstrip("/")
realpath = os.path.abspath(os.path.realpath(path))
# check if parent directory access is required
if schema.access_parent_dir or schema.write_parent_dir:
add_target(name, os.path.dirname(path), must_exist=True, readwrite=schema.write_parent_dir)
add_target(name, os.path.dirname(realpath), must_exist=True, readwrite=schema.write_parent_dir)
# for symlink targets, we need to mount the parent directory of the link too
if os.path.islink(path):
# if target is a real directory, mount it directly
if os.path.isdir(realpath):
add_target(name, realpath, must_exist=True, readwrite=readwrite)
# otherwise mount its parent to allow creation of symlink target
else:
add_target(name, os.path.dirname(realpath), must_exist=True, readwrite=readwrite)
# for actual targets, mount the parent, for dirs, mount the dir
else:
if os.path.isdir(path):
add_target(name, path, must_exist=must_exist, readwrite=readwrite)
else:
add_target(name, os.path.dirname(path), must_exist=True, readwrite=readwrite)


# now eliminate unnecessary targets (those that have a parent mount with the same read/write property)
# now eliminate unnecessary mounts (those that have a parent mount with no lower read/write privileges)
skip_targets = set()

for path, readwrite in targets.items():
for path, readwrite in mounts.items():
parent = os.path.dirname(path)
while parent != "/":
# if parent already mounted, and is as writeable as us, skip us
if (parent in targets and targets[parent] >= readwrite) or \
(parent in prior_mounts and prior_mounts[parent] >= readwrite):
if parent in mounts and mounts[parent] >= readwrite:
skip_targets.add(path)
break
parent = os.path.dirname(parent)

for path in skip_targets:
targets.pop(path)

return targets
mounts.pop(path)


def resolve_remote_mounts(params: Dict[str, Any],
Expand Down
6 changes: 3 additions & 3 deletions stimela/kitchen/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def finalize(self, config=None, log=None, fqname=None, backend=None, nesting=0):
backend or {},
self.cargo.backend or {},
self.backend or {}))
runner.validate_backend_settings(backend_opts)
runner.validate_backend_settings(backend_opts, log=log)


def prevalidate(self, subst: Optional[SubstitutionNS]=None, root=False):
Expand Down Expand Up @@ -332,7 +332,7 @@ def build(self, backend={}, rebuild=False, build_skips=False, log: Optional[logg
try:
backend = OmegaConf.merge(backend, self.cargo.backend or {}, self.backend or {})
backend = OmegaConf.to_object(OmegaConf.merge(StimelaBackendSchema, backend))
backend_wrapper = runner.validate_backend_settings(backend)
backend_wrapper = runner.validate_backend_settings(backend, log=log)
except Exception as exc:
newexc = BackendError("error validating backend settings", exc)
raise newexc from None
Expand All @@ -354,7 +354,7 @@ def run(self, backend={}, subst=None, parent_log=None):
backend = OmegaConf.merge(backend, self.cargo.backend or {}, self.backend or {})
backend_opts = evaluate_and_substitute_object(backend, subst, recursion_level=-1, location=[self.fqname, "backend"])
backend_opts = OmegaConf.to_object(OmegaConf.merge(StimelaBackendSchema, backend_opts))
backend_wrapper = runner.validate_backend_settings(backend_opts)
backend_wrapper = runner.validate_backend_settings(backend_opts, log=self.log)
except Exception as exc:
newexc = BackendError("error validating backend settings", exc)
raise newexc from None
Expand Down
16 changes: 11 additions & 5 deletions stimela/stimelogging.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
from rich.padding import Padding

from . import task_stats
from .task_stats import declare_subtask, declare_subtask_attributes, \
declare_subcommand, update_process_status, \
run_process_status_update
from .task_stats import declare_subcommand, declare_subtask, declare_subtask_attributes, run_process_status_update

class FunkyMessage(object):
"""Class representing a message with two versions: funky (with markup), and boring (no markup)"""
Expand Down Expand Up @@ -184,6 +182,9 @@ def logger(name="STIMELA", propagate=False, boring=False, loglevel="INFO"):
_logger_file_handlers = {}
_logger_console_handlers = {}

# keep track of all log files opened
_previous_logfiles = set()


def has_file_logger(log: logging.Logger):
return log.name in _logger_file_handlers
Expand Down Expand Up @@ -247,9 +248,14 @@ def setup_file_logger(log: logging.Logger, logfile: str, level: Optional[Union[i
if fh is not None:
fh.close()
log.removeHandler(fh)

# if file was previously open, append, else overwrite
if logfile in _previous_logfiles:
mode = 'a'
else:
mode = 'w'
_previous_logfiles.add(logfile)
# create new FH
fh = DelayedFileHandler(logfile, symlink, 'w')
fh = DelayedFileHandler(logfile, symlink, mode)
fh.setFormatter(log_boring_formatter)
log.addHandler(fh)

Expand Down

0 comments on commit 9045b6d

Please sign in to comment.