diff --git a/stimela/backends/__init__.py b/stimela/backends/__init__.py index 44e0fa15..4030c96f 100644 --- a/stimela/backends/__init__.py +++ b/stimela/backends/__init__.py @@ -10,6 +10,7 @@ from .singularity import SingularityBackendOptions from .kube import KubeBackendOptions from .native import NativeBackendOptions +from .slurm import SlurmOptions import stimela @@ -20,11 +21,15 @@ SUPPORTED_BACKENDS = set(Backend.__members__) -def get_backend(name: str): +def get_backend(name: str, backend_opts: Optional[Dict] = None): + """ + Gets backend, given a name and an optional set of options for that backend. + Returns backend module, or None if it is not available. + """ if name not in SUPPORTED_BACKENDS: return None backend = __import__(f"stimela.backends.{name}", fromlist=[name]) - if backend.is_available(): + if backend.is_available(backend_opts): return backend return None @@ -48,7 +53,7 @@ class StimelaBackendOptions(object): kube: Optional[KubeBackendOptions] = None native: Optional[NativeBackendOptions] = None docker: Optional[Dict] = None # placeholder for future impl - slurm: Optional[Dict] = None # placeholder for future impl + slurm: Optional[SlurmOptions] = None ## Resource limits applied during run -- see resource module rlimits: Dict[str, Any] = EmptyDictDefault() @@ -71,6 +76,8 @@ def __post_init__(self): self.native = NativeBackendOptions() if self.kube is None and get_backend("kube"): self.kube = KubeBackendOptions() + if self.slurm is None: + self.slurm = SlurmOptions() StimelaBackendSchema = OmegaConf.structured(StimelaBackendOptions) @@ -106,7 +113,7 @@ def _call_backends(backend_opts: StimelaBackendOptions, log: logging.Logger, met # check that backend has not been disabled opts = getattr(backend_opts, engine, None) if not opts or opts.enable: - backend = get_backend(engine) + backend = get_backend(engine, opts) func = backend and getattr(backend, method, None) if func: try: diff --git a/stimela/backends/docker.py b/stimela/backends/docker.py index d31632d6..4202604d 100644 --- a/stimela/backends/docker.py +++ b/stimela/backends/docker.py @@ -25,7 +25,7 @@ STATUS = VERSION = BINARY = None -def is_available(): +def is_available(opts = None): global STATUS, VERSION, BINARY if STATUS is None: BINARY = which("docker") diff --git a/stimela/backends/kube/__init__.py b/stimela/backends/kube/__init__.py index 655f0c5c..dfade66f 100644 --- a/stimela/backends/kube/__init__.py +++ b/stimela/backends/kube/__init__.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Optional, Any +from typing import Dict, List, Optional, Any, Callable from enum import Enum from omegaconf import OmegaConf from dataclasses import dataclass @@ -36,52 +36,6 @@ def run(*args, **kw): raise RuntimeError(f"kubernetes backend {STATUS}") -def is_available(): - return AVAILABLE - -def get_status(): - return STATUS - -def is_remote(): - return True - -def init(backend: 'stimela.backend.StimelaBackendOptions', log: logging.Logger): - from . import infrastructure - global AVAILABLE, STATUS - if not infrastructure.init(backend, log): - AVAILABLE = False - STATUS = "initialization error" - -def close(backend: 'stimela.backend.StimelaBackendOptions', log: logging.Logger): - from . import infrastructure - if AVAILABLE: - infrastructure.close(backend, log) - -def cleanup(backend: 'stimela.backend.StimelaBackendOptions', log: logging.Logger): - from . import infrastructure - infrastructure.cleanup(backend, log) - -def run(cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], fqname: str, - backend: 'stimela.backend.StimelaBackendOptions', - log: logging.Logger, subst: Optional[Dict[str, Any]] = None): - from . import run_kube - return run_kube.run(cab=cab, params=params, fqname=fqname, backend=backend, log=log, subst=subst) - -_kube_client = _kube_config = _kube_context = None - -def get_kube_api(context: Optional[str]=None): - global _kube_client - global _kube_config - global _kube_context - - if _kube_config is None: - _kube_config = True - _kube_context = context - kubernetes.config.load_kube_config(context=context) - elif context != _kube_context: - raise BackendError(f"k8s context has changed (was {_kube_context}, now {context}), this is not permitted") - - return core_v1_api.CoreV1Api(), CustomObjectsApi() # dict of methods for converting an object to text format @@ -110,8 +64,6 @@ class PodLimits(object): # arbitrary additional structure copied into the pod spec custom_pod_spec: Dict[str, Any] = EmptyDictDefault() - - @dataclass class KubeBackendOptions(object): """ @@ -268,6 +220,57 @@ class UserInfo(object): KubeBackendSchema = OmegaConf.structured(KubeBackendOptions) + +def is_available(opts: Optional[KubeBackendOptions]= None): + return AVAILABLE + +def get_status(): + return STATUS + +def is_remote(): + return True + +def init(backend: 'stimela.backend.StimelaBackendOptions', log: logging.Logger): + from . import infrastructure + global AVAILABLE, STATUS + if not infrastructure.init(backend, log): + AVAILABLE = False + STATUS = "initialization error" + +def close(backend: 'stimela.backend.StimelaBackendOptions', log: logging.Logger): + from . import infrastructure + if AVAILABLE: + infrastructure.close(backend, log) + +def cleanup(backend: 'stimela.backend.StimelaBackendOptions', log: logging.Logger): + from . import infrastructure + infrastructure.cleanup(backend, log) + +def run(cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], fqname: str, + backend: 'stimela.backend.StimelaBackendOptions', + log: logging.Logger, subst: Optional[Dict[str, Any]] = None, + command_wrapper: Optional[Callable] = None): + # normally runner.py won't allow this, but check just in case + if command_wrapper: + raise BackendError(f"kube backend cannot be used with a command wrapper") + from . import run_kube + return run_kube.run(cab=cab, params=params, fqname=fqname, backend=backend, log=log, subst=subst) + +_kube_client = _kube_config = _kube_context = None + +def get_kube_api(context: Optional[str]=None): + global _kube_client + global _kube_config + global _kube_context + + if _kube_config is None: + _kube_config = True + _kube_context = context + kubernetes.config.load_kube_config(context=context) + elif context != _kube_context: + raise BackendError(f"k8s context has changed (was {_kube_context}, now {context}), this is not permitted") + + return core_v1_api.CoreV1Api(), CustomObjectsApi() _uid = os.getuid() _gid = os.getgid() diff --git a/stimela/backends/kube/run_kube.py b/stimela/backends/kube/run_kube.py index 8b0b0fc9..e1c6ed4c 100644 --- a/stimela/backends/kube/run_kube.py +++ b/stimela/backends/kube/run_kube.py @@ -1,5 +1,5 @@ import logging, time, json, datetime, os.path, pathlib, secrets, shlex -from typing import Dict, Optional, Any, List +from typing import Dict, Optional, Any, List, Callable from dataclasses import fields from datetime import timedelta from requests import ConnectionError @@ -44,13 +44,13 @@ def run(cab: Cab, params: Dict[str, Any], fqname: str, """ if not cab.image: - raise StimelaCabRuntimeError(f"kube runner requires cab.image to be set") + raise BackendError(f"kube backend requires cab.image to be set") kube = backend.kube namespace = kube.namespace if not namespace: - raise StimelaCabRuntimeError(f"runtime.kube.namespace must be set") + raise BackendError(f"runtime.kube.namespace must be set") args = cab.flavour.get_arguments(cab, params, subst, check_executable=False) log.debug(f"command line is {args}") diff --git a/stimela/backends/native/__init__.py b/stimela/backends/native/__init__.py index 194be51e..73144f92 100644 --- a/stimela/backends/native/__init__.py +++ b/stimela/backends/native/__init__.py @@ -3,7 +3,7 @@ import logging import stimela -def is_available(): +def is_available(opts = None): return True def get_status(): diff --git a/stimela/backends/native/run_native.py b/stimela/backends/native/run_native.py index 41f83083..cd2c0853 100644 --- a/stimela/backends/native/run_native.py +++ b/stimela/backends/native/run_native.py @@ -1,6 +1,6 @@ import logging, datetime, resource, os.path -from typing import Dict, Optional, Any, List +from typing import Dict, Optional, Any, List, Callable import stimela import stimela.kitchen @@ -37,14 +37,18 @@ def build_command_line(cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], s def run(cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], fqname: str, backend: 'stimela.backend.StimelaBackendOptions', - log: logging.Logger, subst: Optional[Dict[str, Any]] = None): - """Runs cab contents + log: logging.Logger, subst: Optional[Dict[str, Any]] = None, + command_wrapper: Optional[Callable] = None): + """ + Runs cab contents Args: - cab (Cab): cab object + cab: cab object + params: cab parameters + backend: backed settings object log (logger): logger to use subst (Optional[Dict[str, Any]]): Substitution dict for commands etc., if any. - + command_wrapper (Callable): takes a list of args and returns modified list of args Returns: Any: return value (e.g. exit code) of content """ @@ -79,6 +83,9 @@ def elapsed(since=None): return str(datetime.datetime.now() - (since or start_time)).split('.', 1)[0] # log.info(f"argument lengths are {[len(a) for a in args]}") + + if command_wrapper: + args = command_wrapper(args) retcode = xrun(args[0], args[1:], shell=False, log=log, output_wrangler=cabstat.apply_wranglers, diff --git a/stimela/backends/podman.py b/stimela/backends/podman.py index b66f2705..45060aa0 100644 --- a/stimela/backends/podman.py +++ b/stimela/backends/podman.py @@ -8,7 +8,7 @@ import datetime import tempfile -def is_available(): +def is_available(opts = None): return False def get_status(): diff --git a/stimela/backends/runner.py b/stimela/backends/runner.py index ea82b9a3..35703310 100644 --- a/stimela/backends/runner.py +++ b/stimela/backends/runner.py @@ -1,12 +1,38 @@ -from typing import Dict, Optional, Any +import logging +from typing import Dict, Optional, Any, Callable +from dataclasses import dataclass from omegaconf import OmegaConf from omegaconf.errors import OmegaConfBaseException +import stimela from stimela.backends import StimelaBackendOptions, StimelaBackendSchema from stimela.exceptions import BackendError -from . import get_backend, get_backend_status +from . import get_backend, get_backend_status, slurm -def validate_backend_settings(backend_opts: Dict[str, Any]): + +@dataclass +class BackendWrapper(object): + opts: StimelaBackendOptions + is_remote: bool + is_remote_fs: bool + backend: Any + backend_name: str + run_command_wrapper: Optional[Callable] + build_command_wrapper: Optional[Callable] + + def run(self, cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], fqname: str, + log: logging.Logger, subst: Optional[Dict[str, Any]] = None): + return self.backend.run(cab, params, fqname=fqname, backend=self.opts, log=log, subst=subst, + command_wrapper=self.run_command_wrapper) + + def build(self, cab: 'stimela.kitchen.cab.Cab', log: logging.Logger, rebuild=False): + if not hasattr(self.backend, 'build'): + raise BackendError(f"{self.backend_name} backend does not support the build command") + return self.backend.build(cab, backend=self.opts, log=log, rebuild=rebuild, + command_wrapper=self.build_command_wrapper) + + +def validate_backend_settings(backend_opts: Dict[str, Any]) -> BackendWrapper: """Checks that backend settings refer to a valid backend Returs tuple of options, main, wrapper, where 'main' the the main backend, and 'wrapper' is an optional wrapper backend @@ -15,26 +41,35 @@ def validate_backend_settings(backend_opts: Dict[str, Any]): if not isinstance(backend_opts, StimelaBackendOptions): backend_opts = OmegaConf.to_object(backend_opts) - main = main_backend = None + backend_name = backend = None selected = backend_opts.select or ['native'] # select containerization engine, if any - for engine in selected: + for name in selected: # check that backend has not been disabled - opts = getattr(backend_opts, engine, None) + opts = getattr(backend_opts, name, None) if not opts or opts.enable: - main_backend = get_backend(engine) - if main_backend is not None: - main = engine + backend = get_backend(name, opts) + if backend is not None: + backend_name = name break else: raise BackendError(f"selected backends ({', '.join(selected)}) not available") - + + is_remote = is_remote_fs = backend.is_remote() + # check if slurm wrapper is to be applied - wrapper = None - if False: # placeholder -- should be: if backend.slurm and backed.slurm.enable - wrapper = get_backend('slurm') - if wrapper is None: - raise BackendError(f"backend 'slurm' not available ({get_backend_status('slurm')})") + if backend_opts.slurm.enable: + if is_remote: + raise BackendError(f"can't combine slurm with {backend_name} backend") + is_remote = True + is_remote_fs = False + run_command_wrapper = backend_opts.slurm.run_command_wrapper + build_command_wrapper = backend_opts.slurm.build_command_wrapper + else: + run_command_wrapper = build_command_wrapper = None - return backend_opts, main_backend, wrapper + return BackendWrapper(opts=backend_opts, is_remote=is_remote, is_remote_fs=is_remote_fs, + backend=backend, backend_name=backend_name, + run_command_wrapper=run_command_wrapper, + build_command_wrapper=build_command_wrapper) diff --git a/stimela/backends/singularity.py b/stimela/backends/singularity.py index 3a7fc18e..7789d4b0 100644 --- a/stimela/backends/singularity.py +++ b/stimela/backends/singularity.py @@ -7,7 +7,7 @@ from shutil import which from dataclasses import dataclass from omegaconf import OmegaConf -from typing import Dict, List, Any, Optional, Tuple +from typing import Dict, List, Any, Optional, Callable from contextlib import ExitStack from scabha.basetypes import EmptyListDefault import datetime @@ -23,8 +23,9 @@ class SingularityBackendOptions(object): image_dir: str = os.path.expanduser("~/.singularity") auto_build: bool = True rebuild: bool = False - auto_update: bool = False + auto_update: bool = False # currently unused executable: Optional[str] = None + remote_only: bool = False # if True, won't look for singularity on local system -- useful in combination with slurm wrapper # @dataclass # class EmptyVolume(object): @@ -37,22 +38,26 @@ class SingularityBackendOptions(object): STATUS = VERSION = BINARY = None -_auto_updated_images = set() +# images rebuilt in this run +_rebuilt_images = set() -def is_available(): +def is_available(opts: Optional[SingularityBackendOptions] = None): global STATUS, VERSION, BINARY if STATUS is None: - BINARY = which("singularity") - if BINARY: - __version_string = subprocess.check_output([BINARY, "--version"]).decode("utf8") - STATUS = VERSION = __version_string.strip().split()[-1] - # if VERSION < "3.0.0": - # suffix = ".img" - # else: - # suffix = ".sif" + if opts and opts.remote_only: + STATUS = VERSION = "remote" else: - STATUS = "not installed" - VERSION = None + BINARY = (opts and opts.executable) or which("singularity") + if BINARY: + __version_string = subprocess.check_output([BINARY, "--version"]).decode("utf8") + STATUS = VERSION = __version_string.strip().split()[-1] + # if VERSION < "3.0.0": + # suffix = ".img" + # else: + # suffix = ".sif" + else: + STATUS = "not installed" + VERSION = None return VERSION is not None def get_status(): @@ -115,89 +120,96 @@ def build_command_line(cab: 'stimela.kitchen.cab.Cab', backend: 'stimela.backend "--pwd", cwd, simg_path] + args +def build(cab: 'stimela.kitchen.cab.Cab', backend: 'stimela.backend.StimelaBackendOptions', log: logging.Logger, + command_wrapper: Optional[Callable]=None, + build=True, rebuild=False): + """Builds image for cab, if necessary. - -def run(cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], fqname: str, - backend: 'stimela.backend.StimelaBackendOptions', - log: logging.Logger, subst: Optional[Dict[str, Any]] = None): - """Runs cab contents - - Args: - cab (Cab): cab object - log (logger): logger to use - subst (Optional[Dict[str, Any]]): Substitution dict for commands etc., if any. + build: if True, build missing images regardless of backend settings + rebuild: if True, rebuild all images regardless of backend settings Returns: - Any: return value (e.g. exit code) of content + str: path to corresponding singularity image """ - native.update_rlimits(backend.rlimits, log) - image_name, simg_path, auto_update = get_image_info(cab, backend) + image_name, simg_path, auto_update_allowed = get_image_info(cab, backend) - rebuild = backend.singularity.rebuild + # this is True if we're allowed to build missing images + build = build or rebuild or backend.singularity.auto_build or backend.singularity.auto_update + # this is True if we're asked to force-rebuild images + rebuild = rebuild or backend.singularity.rebuild + cached_image_exists = os.path.exists(simg_path) # no image? Better have builds enabled then if not cached_image_exists: log.info(f"singularity image {simg_path} does not exist") - rebuild = rebuild or backend.singularity.auto_build or backend.singularity.auto_update - if not rebuild: + if not build: raise BackendError(f"no image, and singularity build options not enabled") - - # check if existing image needs to be rebuilt - if auto_update and backend.singularity.auto_update and not rebuild: - if image_name in _auto_updated_images: - log.info("image was used earlier in this run, not checking for auto-updates again") + # else we have an image + # if rebuild is enabled, delete it + elif rebuild: + if simg_path in _rebuilt_images: + log.info(f"singularity image {simg_path} was rebuilt earlier") else: - _auto_updated_images.add(image_name) - # force check of docker binary - docker.is_available() - if docker.BINARY is None: - log.warn("a docker runtime is required for auto-update of singularity images: forcing unconditional rebuild") - rebuild = True - else: - log.info("singularity auto-update: pulling and inspecting docker image") - # pull image from hub - retcode = xrun(docker.BINARY, ["pull", image_name], - shell=False, log=log, - return_errcode=True, command_name="(docker pull)", - log_command=True, - log_result=True) - if retcode != 0: - raise BackendError(f"docker pull failed with return code {retcode}") - if os.path.exists(simg_path): - # check timestamp - result = subprocess.run( - [docker.BINARY, "inspect", "-f", "{{ .Created }}", image_name], - capture_output=True) - if result.returncode != 0: - for line in result.stdout.split("\n"): - log.warn(f"docker inpect stdout: {line}") - for line in result.stderr.split("\n"): - log.error(f"docker inpect stderr: {line}") - raise BackendError(f"docker inspect failed with return code {result.returncode}") - timestamp = result.stdout.decode().strip() - log.info(f"docker inspect returns timestamp {timestamp}") - # parse timestamps like '2023-04-07T13:39:19.187572398Z' - # Pre-3.11 pythons don't do it natively so we mess around... - match = re.fullmatch("(.*)T([^.]*)(\.\d+)?Z?", timestamp) - if not match: - raise BackendError(f"docker inspect returned invalid timestamp '{timestamp}'") - try: - dt = datetime.datetime.fromisoformat(f'{match.group(1)} {match.group(2)} +00:00') - except ValueError as exc: - raise BackendError(f"docker inspect returned invalid timestamp '{timestamp}', exc") - - if dt.timestamp() > os.path.getmtime(simg_path): - log.warn("docker image is newer than cached singularity image, rebuilding") - rebuild = True - else: - log.info("cached singularity image appears to be up-to-date") - - # delete image if rebuild is being forced - if cached_image_exists and rebuild: - os.unlink(simg_path) - cached_image_exists = False + log.info(f"singularity image {simg_path} exists but a rebuild was specified") + os.unlink(simg_path) + cached_image_exists = False + else: + log.info(f"singularity image {simg_path} exists") + + ## OMS: taking this out for now, need some better auto-update logic, let's come back to it later + + # # else check if it need to be auto-updated + # elif auto_update_allowed and backend.singularity.auto_update: + # if image_name in _auto_updated_images: + # log.info("image was used earlier in this run, not checking for auto-updates again") + # else: + # _auto_updated_images.add(image_name) + # # force check of docker binary + # docker.is_available() + # if docker.BINARY is None: + # log.warn("a docker runtime is required for auto-update of singularity images: forcing unconditional rebuild") + # build = True + # else: + # log.info("singularity auto-update: pulling and inspecting docker image") + # # pull image from hub + # retcode = xrun(docker.BINARY, ["pull", image_name], + # shell=False, log=log, + # return_errcode=True, command_name="(docker pull)", + # log_command=True, + # log_result=True) + # if retcode != 0: + # raise BackendError(f"docker pull failed with return code {retcode}") + # if os.path.exists(simg_path): + # # check timestamp + # result = subprocess.run( + # [docker.BINARY, "inspect", "-f", "{{ .Created }}", image_name], + # capture_output=True) + # if result.returncode != 0: + # for line in result.stdout.split("\n"): + # log.warn(f"docker inpect stdout: {line}") + # for line in result.stderr.split("\n"): + # log.error(f"docker inpect stderr: {line}") + # raise BackendError(f"docker inspect failed with return code {result.returncode}") + # timestamp = result.stdout.decode().strip() + # log.info(f"docker inspect returns timestamp {timestamp}") + # # parse timestamps like '2023-04-07T13:39:19.187572398Z' + # # Pre-3.11 pythons don't do it natively so we mess around... + # match = re.fullmatch("(.*)T([^.]*)(\.\d+)?Z?", timestamp) + # if not match: + # raise BackendError(f"docker inspect returned invalid timestamp '{timestamp}'") + # try: + # dt = datetime.datetime.fromisoformat(f'{match.group(1)} {match.group(2)} +00:00') + # except ValueError as exc: + # raise BackendError(f"docker inspect returned invalid timestamp '{timestamp}', exc") + + # if dt.timestamp() > os.path.getmtime(simg_path): + # log.warn("docker image is newer than cached singularity image, rebuilding") + # os.unlink(simg_path) + # cached_image_exists = False + # else: + # log.info("cached singularity image appears to be up-to-date") # if image doesn't exist, build it. We will have already checked for build settings # being enabled above @@ -206,6 +218,9 @@ def run(cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], fqname: str, args = [BINARY, "build", simg_path, f"docker://{image_name}"] + if command_wrapper: + args = command_wrapper(args) + retcode = xrun(args[0], args[1:], shell=False, log=log, return_errcode=True, command_name="(singularity build)", gentle_ctrl_c=True, @@ -217,9 +232,42 @@ def run(cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], fqname: str, if not os.path.exists(simg_path): raise BackendError(f"singularity build did not return an error code, but the image did not appear") - - args = build_command_line(cab, backend, params, subst, simg_path=simg_path) + + _rebuilt_images.add(simg_path) + + return simg_path + + +def run(cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], fqname: str, + backend: 'stimela.backend.StimelaBackendOptions', + log: logging.Logger, subst: Optional[Dict[str, Any]] = None, + command_wrapper: Optional[Callable] = None): + + """Runs cab contents + + Args: + cab (Cab): cab object + log (logger): logger to use + subst (Optional[Dict[str, Any]]): Substitution dict for commands etc., if any. + + Returns: + Any: return value (e.g. exit code) of content + """ + native.update_rlimits(backend.rlimits, log) + + # get path to image, rebuilding if backend options allow this + simg_path = build(cab, backend=backend, log=log, build=False) + + # build up command line + cwd = os.getcwd() + args = [backend.singularity.executable or BINARY, + "exec", + "--containall", + "--bind", f"{cwd}:{cwd}", + "--pwd", cwd, + simg_path] + args += cab.flavour.get_arguments(cab, params, subst, check_executable=False) log.debug(f"command line is {args}") cabstat = cab.reset_status() @@ -234,6 +282,9 @@ def elapsed(since=None): # log.info(f"argument lengths are {[len(a) for a in args]}") + if command_wrapper: + args = command_wrapper(args, fqname=fqname) + retcode = xrun(args[0], args[1:], shell=False, log=log, output_wrangler=cabstat.apply_wranglers, return_errcode=True, command_name=command_name, diff --git a/stimela/backends/slurm.py b/stimela/backends/slurm.py new file mode 100644 index 00000000..f95a61af --- /dev/null +++ b/stimela/backends/slurm.py @@ -0,0 +1,64 @@ +import subprocess +import os +import re +import logging +from stimela import utils +import stimela +from shutil import which +from dataclasses import dataclass, make_dataclass +from omegaconf import OmegaConf +from typing import Dict, List, Any, Optional, Tuple +from contextlib import ExitStack +from scabha.basetypes import EmptyListDefault, EmptyDictDefault +import datetime +from stimela.utils.xrun_asyncio import xrun + +from stimela.exceptions import BackendError + + +# path to default srun binary +_default_srun_path = None + +@dataclass +class SlurmOptions(object): + enable: bool = False # enables passing off jobs to slurm via srun + srun_path: Optional[str] = None # path to srun executable + srun_opts: Dict[str, str] = EmptyDictDefault() # extra options passed to srun. "--" prepended, and "_" replaced by "-" + build_local = True # if True, images will be built locally (i.e. on the head node) even when slurm is enabled + + def get_executable(self): + global _default_srun_path + if self.srun_path is None: + if _default_srun_path is None: + _default_srun_path = which("srun") + if not _default_srun_path: + _default_srun_path = False + if _default_srun_path is False: + raise BackendError(f"slurm 'srun' binary not found") + return _default_srun_path + else: + if not os.access(self.srun_path, os.X_OK): + raise BackendError(f"slurm.srun_path '{self.srun}' is not an executable") + return self.srun + + def run_command_wrapper(self, args: List[str], fqname: Optional[str]=None) -> List[str]: + output_args = [self.get_executable()] + + if fqname is not None: + output_args += ["-J", fqname] + + # add all base options that have been specified + for name, value in self.srun_opts.items(): + output_args += ["--" + name.replace("_", "-"), value] + + output_args += args + return output_args + + def build_command_wrapper(self, args: List[str], fqname: Optional[str]=None) -> List[str]: + if self.build_local: + return args + return self.run_command_wrapper(args, fqname=fqname) + + +SlurmOptionsSchema = OmegaConf.structured(SlurmOptions) + diff --git a/stimela/commands/build.py b/stimela/commands/build.py index 9e2cb93d..287ea766 100644 --- a/stimela/commands/build.py +++ b/stimela/commands/build.py @@ -1,63 +1,42 @@ import click from typing import List from stimela.main import cli -import stimela +import click +from typing import List, Optional, Tuple +from .run import run -@cli.command( - help="""(Re)build stimela base images. Specify a list of image names and versions (if a version is omitted, builds all known versions of image), - or else use --all to build all required base images. - """, - short_help="build stimela base images", - no_args_is_help=True) -@click.argument("images", nargs=-1, metavar="NAME[:VERSION]", required=False) -# help="image/version to build (builds all versions of image by default)") -@click.option("-a", "--all", is_flag=True, - help="build all unavailable images (all images, in combination with --rebuild)") -@click.option("-r", "--rebuild", is_flag=True, - help="force rebuild of image(s)") -def build(images: List[str], all=False, rebuild=False): - from stimela.main import BACKEND - from stimela import CONFIG - log = stimela.logger() - - available_images = BACKEND.available_images() - if all: - build_images = CONFIG.base.keys() - else: - build_images = images - - if not build_images: - log.info("No images specified. Run 'stimela build -h' for help.") - return 0 - - for imagename in build_images: - if ':' in imagename: - imagename, version = imagename.split(":", 1) - else: - version = None - - if imagename not in CONFIG.base: - log.error(f"base image '{imagename}' is not known to Stimela") - return 2 - - image = CONFIG.base[imagename] - - if version is None: - build_versions = image.images.keys() - elif version in image.images: - build_versions = [version] - else: - log.error(f"version '{version}' is not defined for base image '{imagename}'") - return 2 - - # now loop over build versions - for version in build_versions: - - # check if already exists - if imagename in available_images and version in available_images[imagename]: - if not rebuild: - log.info(f"image '{imagename}:{version}' already exists, skipping") - continue +from stimela.main import cli - BACKEND.build(image, version) +@cli.command("build", + help=""" + Builds singularity images required by the recipe. Only available if the singularity backend is selected. + """, + no_args_is_help=True) +@click.option("-r", "--rebuild", is_flag=True, + help="""rebuilds all images from scratch.""") +@click.option("-s", "--step", "step_ranges", metavar="STEP(s)", multiple=True, + help="""only look at specific step(s) from the recipe. Use commas, or give multiple times to cherry-pick steps. + Use [BEGIN]:[END] to specify a range of steps. Note that cherry-picking an individual step via this option + also impies --enable-step.""") +@click.option("-t", "--tags", "tags", metavar="TAG(s)", multiple=True, + help="""only look at steps wth the given tags (and also steps tagged as "always"). + Use commas, or give multiple times for multiple tags.""") +@click.option("--skip-tags", "skip_tags", metavar="TAG(s)", multiple=True, + help="""explicitly skips steps wth the given tags. + Use commas, or give multiple times for multiple tags.""") +@click.option("-e", "--enable-step", "enable_steps", metavar="STEP(s)", multiple=True, + help="""Force-enable steps even if the recipe marks them as skipped. Use commas, or give multiple times + for multiple steps.""") +@click.option("-a", "--assign", metavar="PARAM VALUE", nargs=2, multiple=True, + help="""assigns values to parameters: equivalent to PARAM=VALUE, but plays nicer with the shell's + tab completion.""") +@click.option("-l", "--last-recipe", is_flag=True, + help="""if multiple recipes are defined, selects the last one for building.""") +@click.argument("what", metavar="filename.yml|cab name") +def build(what: str, last_recipe: bool = False, rebuild: bool = False, + assign: List[Tuple[str, str]] = [], + step_ranges: List[str] = [], tags: List[str] = [], skip_tags: List[str] = [], enable_steps: List[str] = []): + return run.callback(what, last_recipe=last_recipe, assign=assign, step_ranges=step_ranges, + tags=tags, skip_tags=skip_tags, enable_steps=enable_steps, + build=True, rebuild=rebuild) diff --git a/stimela/commands/run.py b/stimela/commands/run.py index 9a479291..094fc06a 100644 --- a/stimela/commands/run.py +++ b/stimela/commands/run.py @@ -86,7 +86,8 @@ def load_recipe_file(filename: str): @click.argument("parameters", nargs=-1, metavar="[recipe name] [PARAM=VALUE] [X.Y.Z=FOO] ...", required=False) def run(what: str, parameters: List[str] = [], dry_run: bool = False, last_recipe: bool = False, profile: Optional[int] = None, assign: List[Tuple[str, str]] = [], - step_ranges: List[str] = [], tags: List[str] = [], skip_tags: List[str] = [], enable_steps: List[str] = []): + step_ranges: List[str] = [], tags: List[str] = [], skip_tags: List[str] = [], enable_steps: List[str] = [], + build=False, rebuild=False): log = logger() params = OrderedDict() @@ -288,38 +289,59 @@ def convert_value(value): def elapsed(): return str(datetime.now() - start_time).split('.', 1)[0] - try: - outputs = outer_step.run(backend=stimela.CONFIG.opts.backend) - except Exception as exc: - stimela.backends.close_backends(log) + # build the images + if build: + try: + outer_step.build(backend=stimela.CONFIG.opts.backend, rebuild=rebuild, log=log) + except Exception as exc: + stimela.backends.close_backends(log) - task_stats.save_profiling_stats(outer_step.log, - print_depth=profile if profile is not None else stimela.CONFIG.opts.profile.print_depth, - unroll_loops=stimela.CONFIG.opts.profile.unroll_loops) - if not isinstance(exc, ScabhaBaseException) or not exc.logged: - log_exception(StimelaRuntimeError(f"run failed after {elapsed()}", exc, - tb=not isinstance(exc, ScabhaBaseException))) - else: - log.error("run failed, exiting with error code 1") - for line in traceback.format_exc().split("\n"): - log.debug(line) - last_log_dir = stimelogging.get_logfile_dir(outer_step.log) or '.' - outer_step.log.info(f"last log directory was {stimelogging.apply_style(last_log_dir, 'bold green')}") - sys.exit(1) - - if outputs and outer_step.log.isEnabledFor(logging.DEBUG): - outer_step.log.debug(f"run successful after {elapsed()}, outputs follow:") - for name, value in outputs.items(): - if name in recipe.outputs: - outer_step.log.debug(f" {name}: {value}") + if not isinstance(exc, ScabhaBaseException) or not exc.logged: + log_exception(StimelaRuntimeError(f"build failed after {elapsed()}", exc, + tb=not isinstance(exc, ScabhaBaseException))) + else: + log.error("build failed, exiting with error code 1") + for line in traceback.format_exc().split("\n"): + log.debug(line) + last_log_dir = stimelogging.get_logfile_dir(outer_step.log) or '.' + outer_step.log.info(f"last log directory was {stimelogging.apply_style(last_log_dir, 'bold green')}") + sys.exit(1) + + # else run the recipe else: - outer_step.log.info(f"run successful after {elapsed()}") + try: + outputs = outer_step.run(backend=stimela.CONFIG.opts.backend) + except Exception as exc: + stimela.backends.close_backends(log) + + task_stats.save_profiling_stats(outer_step.log, + print_depth=profile if profile is not None else stimela.CONFIG.opts.profile.print_depth, + unroll_loops=stimela.CONFIG.opts.profile.unroll_loops) + if not isinstance(exc, ScabhaBaseException) or not exc.logged: + log_exception(StimelaRuntimeError(f"run failed after {elapsed()}", exc, + tb=not isinstance(exc, ScabhaBaseException))) + else: + log.error("run failed, exiting with error code 1") + for line in traceback.format_exc().split("\n"): + log.debug(line) + last_log_dir = stimelogging.get_logfile_dir(outer_step.log) or '.' + outer_step.log.info(f"last log directory was {stimelogging.apply_style(last_log_dir, 'bold green')}") + sys.exit(1) + + if outputs and outer_step.log.isEnabledFor(logging.DEBUG): + outer_step.log.debug(f"run successful after {elapsed()}, outputs follow:") + for name, value in outputs.items(): + if name in recipe.outputs: + outer_step.log.debug(f" {name}: {value}") + else: + outer_step.log.info(f"run successful after {elapsed()}") stimela.backends.close_backends(log) - task_stats.save_profiling_stats(outer_step.log, - print_depth=profile if profile is not None else stimela.CONFIG.opts.profile.print_depth, - unroll_loops=stimela.CONFIG.opts.profile.unroll_loops) + if not build: + task_stats.save_profiling_stats(outer_step.log, + print_depth=profile if profile is not None else stimela.CONFIG.opts.profile.print_depth, + unroll_loops=stimela.CONFIG.opts.profile.unroll_loops) last_log_dir = stimelogging.get_logfile_dir(outer_step.log) or '.' outer_step.log.info(f"last log directory was {stimelogging.apply_style(last_log_dir, 'bold green')}") diff --git a/stimela/kitchen/recipe.py b/stimela/kitchen/recipe.py index 54f88228..2cfa0151 100644 --- a/stimela/kitchen/recipe.py +++ b/stimela/kitchen/recipe.py @@ -1,4 +1,4 @@ -import os, os.path, re, fnmatch, copy, traceback +import os, os.path, re, fnmatch, copy, traceback, logging from typing import Any, Tuple, List, Dict, Optional, Union from dataclasses import dataclass from omegaconf import MISSING, OmegaConf, DictConfig, ListConfig @@ -1128,6 +1128,16 @@ def _iterate_loop_worker(self, params, info, subst, backend, count, iter_var, ra return task_stats.collect_stats(), outputs, exception, tb + def build(self, backend={}, rebuild=False, log: Optional[logging.Logger] = None): + # set up backend + backend = OmegaConf.merge(backend, self.backend or {}) + # build recursively + log = log or self.log + log.info(f"building image(s) for recipe '{self.fqname}'") + for step in self.steps.values(): + step.build(backend, rebuild=rebuild, log=log) + + def _run(self, params, subst=None, backend={}) -> Dict[str, Any]: """Internal recipe run method. Meant to be called from a wrapper Step object (which validates the parameters, etc.) diff --git a/stimela/kitchen/step.py b/stimela/kitchen/step.py index 1745ecd0..0c3642d6 100644 --- a/stimela/kitchen/step.py +++ b/stimela/kitchen/step.py @@ -317,6 +317,30 @@ def assign_value(self, key: str, value: Any, override: bool = False): raise AssignmentError(f"{self.name}: invalid assignment {key}={value}", exc) + def build(self, backend={}, rebuild=False, log: Optional[logging.Logger] = None): + # skipping step? ignore the build + if self.skip is True: + return + log = log or self.log + # recurse into sub-recipe + from .recipe import Recipe + if type(self.cargo) is Recipe: + return self.cargo.build(backend, rebuild=rebuild, log=log) + # else build + else: + # validate backend settings and call the build function + try: + backend = OmegaConf.merge(backend, self.cargo.backend or {}, self.backend or {}) + backend = OmegaConf.to_object(OmegaConf.merge(StimelaBackendSchema, backend)) + backend_wrapper = runner.validate_backend_settings(backend) + except Exception as exc: + newexc = BackendError("error validating backend settings", exc) + raise newexc from None + log.info(f"building image for step '{self.fqname}'") + with stimelogging.declare_subtask(self.name): + return backend_wrapper.build(self.cargo, log=log, rebuild=rebuild) + + def run(self, backend={}, subst=None, parent_log=None): """Runs the step""" from .recipe import Recipe @@ -330,12 +354,10 @@ def run(self, backend={}, subst=None, parent_log=None): backend = OmegaConf.merge(backend, self.cargo.backend or {}, self.backend or {}) backend_opts = evaluate_and_substitute_object(backend, subst, recursion_level=-1, location=[self.fqname, "backend"]) backend_opts = OmegaConf.to_object(OmegaConf.merge(StimelaBackendSchema, backend_opts)) - backend_opts, backend_main, backend_wrapper = runner.validate_backend_settings(backend_opts) + backend_wrapper = runner.validate_backend_settings(backend_opts) except Exception as exc: newexc = BackendError("error validating backend settings", exc) raise newexc from None - # self.log_exception(newexc) - remote_backend = backend_main.is_remote() # if step is being explicitly skipped, omit from profiling, and drop info/warning messages to debug level explicit_skip = self.skip is True @@ -343,7 +365,7 @@ def run(self, backend={}, subst=None, parent_log=None): context = nullcontext() parent_log_info = parent_log_warning = parent_log.debug else: - context = stimelogging.declare_subtask(self.name, hide_local_metrics=remote_backend) + context = stimelogging.declare_subtask(self.name, hide_local_metrics=backend_wrapper.is_remote) stimelogging.declare_chapter(f"{self.fqname}") parent_log_info, parent_log_warning = parent_log.info, parent_log.warning @@ -373,7 +395,7 @@ def run(self, backend={}, subst=None, parent_log=None): self.log.debug(f"validating inputs {subst and list(subst.keys())}") validated = None try: - params = self.cargo.validate_inputs(params, loosely=skip, remote_fs=remote_backend, subst=subst) + params = self.cargo.validate_inputs(params, loosely=skip, remote_fs=backend_wrapper.is_remote_fs, subst=subst) validated = True except ScabhaBaseException as exc: @@ -425,7 +447,7 @@ def run(self, backend={}, subst=None, parent_log=None): ## check if we need to skip based on existing/fresh file outputs ## if skip on fresh outputs is in effect, find mtime of most recent input - if not remote_backend and not skip and self.skip_if_outputs: + if not backend_wrapper.is_remote_fs and not skip and self.skip_if_outputs: # max_mtime will remain 0 if we're not echecking for freshness, or if there are no file-type inputs max_mtime, max_mtime_path = 0, None if self.skip_if_outputs == OUTPUTS_FRESH: @@ -515,7 +537,7 @@ def run(self, backend={}, subst=None, parent_log=None): if not skip: # check for outputs that need removal - if not remote_backend: + if not backend_wrapper.is_remote_fs: for name, schema in self.outputs.items(): if name in params and schema.remove_if_exists and schema.is_file_type: path = params[name] @@ -528,7 +550,7 @@ def run(self, backend={}, subst=None, parent_log=None): if type(self.cargo) is Recipe: self.cargo._run(params, subst, backend=backend) elif type(self.cargo) is Cab: - cabstat = backend_main.run(self.cargo, params=params, log=self.log, subst=subst, backend=backend_opts, fqname=self.fqname) + cabstat = backend_wrapper.run(self.cargo, params=params, log=self.log, subst=subst, fqname=self.fqname) # check for runstate if cabstat.success is False: raise StimelaCabRuntimeError(f"error running cab '{self.cargo.name}'", cabstat.errors) @@ -547,7 +569,7 @@ def run(self, backend={}, subst=None, parent_log=None): validated = False try: - params = self.cargo.validate_outputs(params, loosely=skip,remote_fs=remote_backend, subst=subst) + params = self.cargo.validate_outputs(params, loosely=skip,remote_fs=backend_wrapper.is_remote_fs, subst=subst) validated = True except ScabhaBaseException as exc: severity = "warning" if skip else "error" diff --git a/stimela/main.py b/stimela/main.py index 200136cf..e34b15e7 100644 --- a/stimela/main.py +++ b/stimela/main.py @@ -89,9 +89,9 @@ def cli(config_files=[], config_dotlist=[], include=[], backend=None, if config.CONFIG_LOADED: log.info(f"loaded config from {config.CONFIG_LOADED}") - # select backend + # select backend, passing it any config options that have been set up if backend: - if backends.get_backend(backend) is None: + if backends.get_backend(backend, getattr(stimela.CONFIG.opts.backend, backend, None)) is None: log.error(f"backend '{backend}' not available: {backends.get_backend_status(backend)}") sys.exit(1) @@ -118,7 +118,7 @@ def cli(config_files=[], config_dotlist=[], include=[], backend=None, # import commands -from stimela.commands import doc, run, save_config, cleanup +from stimela.commands import doc, run, build, save_config, cleanup ## These one needs to be reimplemented, current backed auto-pulls and auto-builds: # images, pull, build, clean diff --git a/tests/stimela_tests/test_slurm.yml b/tests/stimela_tests/test_slurm.yml new file mode 100644 index 00000000..e852189f --- /dev/null +++ b/tests/stimela_tests/test_slurm.yml @@ -0,0 +1,34 @@ +_include: + - (cultcargo)breizorro.yml + # note that this looks in . first, then at the location of testy_kube.yml, if different + - test_slurm_config.yml + +opts: + backend: + select: singularity + singularity: + auto_build: false # on slurm, we pre-build on the head node explitictly + slurm: + enable: true + +recipe: + name: "demo recipe" + info: 'top level recipe definition' + for_loop: + var: threshold + over: [5,6,7,8,9] + scatter: -1 + steps: + mask1: + cab: breizorro + params: + restored-image: im3-MFS-image.fits + threshold: =recipe.threshold + mask: =STRIPEXT(current.restored-image) + "th{recipe.threshold}.mask.fits" + mask2: + recipe: + cab: breizorro + params: + mask-image: =previous.mask + dilate: 2 + mask: =STRIPEXT(previous.restored-image) + "th{recipe.threshold}.mask2.fits" diff --git a/tests/stimela_tests/test_slurm_config.yml b/tests/stimela_tests/test_slurm_config.yml new file mode 100644 index 00000000..e69de29b