Skip to content

Commit

Permalink
Merge pull request #368 from caracal-pipeline/issue-152
Browse files Browse the repository at this point in the history
Enables list item syntax e.g. =current.foo[2]. Adds option to skip freshness checks
  • Loading branch information
o-smirnov authored Mar 3, 2025
2 parents cdd6d8f + 69da3f8 commit 757f6fc
Show file tree
Hide file tree
Showing 16 changed files with 308 additions and 22 deletions.
2 changes: 2 additions & 0 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

language = 'en'

highlight_language = 'yaml'

# -- Options for HTML output -------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output

Expand Down
10 changes: 6 additions & 4 deletions docs/source/fundamentals/substitutions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ As we saw above, a parameter value starting with ``=`` invokes the formula parse

* the keyword ``UNSET``. A formula evaluating to ``UNSET`` will result in that parameter becoming unset.

* the keyword ``EMPTY``, evaluating to an empty string
* the keyword ``EMPTY``, evaluating to an empty string.

* item lookup, e.g. ``current.foo[item]``. The item may be any valid expression.

* built-in functions. The list of available functions is growing with every new stimela version; at time of writing the following are available:

Expand Down Expand Up @@ -173,9 +175,9 @@ As we saw above, a parameter value starting with ``=`` invokes the formula parse

* ``IS_STR(arg)`` true if the argument is a string type.

* ``VALID(arg)`` true if the argument is valid, and evaluates to non-zero. This is a useful pattern when dealing
with parameters of a mixed type (that can be e.g. strings or numbers). For example, ``recipe.a > 0`` would throw an
error is ``a`` is a string, but ``VALID(recipe.a > 0)`` would return False in this case.
* ``VALID(arg)`` true if the argument is valid, and evaluates to non-zero. This is a useful pattern when dealing with parameters of a mixed type (that can be e.g. strings or numbers). For example, ``recipe.a > 0`` would throw an error is ``a`` is a string, but ``VALID(recipe.a > 0)`` would return False in this case.

* ``GETITEM(list, item)`` equivalent to ``list[item]``.


As should be evident from the list above, certain functions expect arguments of a particular type (for example, the pathname manipulation functions expect strings).
Expand Down
124 changes: 124 additions & 0 deletions docs/source/reference/clickify.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
.. highlight: yml
.. _clickify:


Clickify parameters
===================


For any given command-line tool, most of the information in the cab schema (i.e. argument names and types, help strings) directly mirrors that already provided to the tool's command-line parser. When wrapping a third-party package in a cab, this leads to an unavoidable duplication of effort (with all the attendant potential for inconsistencies) -- after all, the package developer has already implemented their own command-line interface (CLI) parser, and this CLI needs to be described to \stimela. Note, however, that the schema itself provides all the information that would be needed to construct a CLI in the first place. For newly-developed packages, this provides a substantial labour-saving opportunity. \stimela\ includes a utility function that can convert a schema into a CLI using the `click <https://click.palletsprojects.com>`_ package. For a notional example, consider this
``hello_schema.yml`` file defining a simple schema with two inputs::

inputs:
name:
dtype: str
info: Your name
required: true
policies:
positional: true
count:
dtype: int
default: 1
info: Number of greetings

This file can be instantly converted into a CLI as follows:

.. code-block:: python
#!/usr/bin/env python
import click
from scabha.schema_utils import clickify_parameters
@click.command()
@clickify_parameters("hello_schema.yml")
def hello(count, name):
"""Simple program that greets NAME for a
total of COUNT times."""
for x in range(count):
print(f"Hello {name}!")
if __name__ == '__main__':
hello()
The resulting tool now has a fully-functional CLI:

.. code-block:: none
$ ./hello.py --help
Usage: hello.py [OPTIONS] NAME
Simple program that greets NAME for a total
of COUNT times.
Options:
--count INTEGER Number of greetings
--help Show this message and exit.
To integrate the tool into stimela, all we need is a cab definition, which can directly include the schema file::

cabs:
hello:
_include: hello_schema.yml
command: hello.py


This mechanism ensures that all inputs and outputs need only be defined by the developer once, in a single place -- and provides both a CLI and \stimela\ integration with no additional effort, while ensuring that these
are mutually consistent by construction. The `QuartiCal <https://quartical.readthedocs.io/en/latest/>`_, `pfb-imaging <https://github.com/ratt-ru/pfb-imaging>`_ and `breizorro <https://github.com/ratt-ru/breizorro>`_ packages, for example, make extensive use of this.

In the above example, ``clickify_parameters()`` is passed a filename to read the schema from. An alternative to this is to pass it a Dict containing ``inputs``, ``outputs`` and (optionally) ``policies`` sections (see :ref:`policies_reference`). One can also pass a second argument containing a Dict of policies that will override the policies in the first Dict. This is useful when you ship a package containing full cab definitions, and want to read the schemas directly from the latter. Here we combine it with click's subcommand feature:

.. code-block:: python
import click
from scabha.schema_utils import clickify_parameters
from omegaconf import OmegaConf
schemas = OmegaConf.load(os.path.join(os.path.dirname(__file__), "cabs/mypackage.yml"))
@cli.command("hello",
help=_schemas.cabs.get("hello-world").info,
no_args_is_help=True)
@clickify_parameters(_schemas.cabs.get("hello-world"))
def hello_world(name, count):
for x in range(count):
print(f"Hello {name}!")
where ``mypackage.yaml`` contains::

cabs:
hello-world:
info: Greets NAME for a total of COUNT times
inputs:
name:
dtype: str
info: Your name
required: true
policies:
positional: true
count:
dtype: int
default: 1
info: Number of greetings

If your package defines multiple commands, it can be useful to create a new decorator that you can then reuse for multiple functions:

.. code-block:: python
import click
from scabha.schema_utils import clickify_parameters
from omegaconf import OmegaConf
def clickify(command_name, schema_name=None):
schema_name = schema_name or command_name
return lambda func: \
cli.command(command_name, help=schemas.cabs.get(schema_name).info, no_args_is_help=True)(
clickify_parameters(schemas.cabs.get(schema_name))(func)
)
@clickify("hello", "hello-world"):
def hello_world(name, count):
for x in range(count):
print(f"Hello {name}!")
3 changes: 3 additions & 0 deletions docs/source/reference/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,8 @@ Stimela reference
cabdefs
schema_ref
policies
clickify




2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "stimela"
version = "2.0.2"
version = "2.0.3"
description = "Framework for system agnostic pipelines for (not just) radio interferometry"
authors = ["Oleg Smirnov and RATT <osmirnov@gmail.com>", "Sphesihle Makhathini <sphemakh@gmail.com>"]
readme = "README.rst"
Expand Down
41 changes: 31 additions & 10 deletions scabha/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,24 @@ def pa(s, l, t):
i += incr
return ret

class GetItemHandler(ResultsHandler):
def __init__(self, base):
self.base, self.index = base[0], base[1]

@staticmethod
def pa(s, l, t):
# https://stackoverflow.com/questions/4571441/recursive-expressions-with-pyparsing
return GetItemHandler(*t)

def evaluate(self, evaluator):
base = evaluator._evaluate_result(self.base)
index = evaluator._evaluate_result(self.index)
if type(base) is UNSET:
return base
if type(index) is UNSET:
return index
return base[index]

class FunctionHandler(ResultsHandler):
def __init__(self, func, *args):
self.func, self.args = func, args
Expand Down Expand Up @@ -174,6 +192,11 @@ def MIN(self, evaluator, args):
def MAX(self, evaluator, args):
return self.evaluate_generic_callable(evaluator, "MAX", max, args, min_args=1)

def GETITEM(self, evaluator, args):
def get_item(x, y):
return x[y]
return self.evaluate_generic_callable(evaluator, "GETITEM", get_item, args, min_args=2, max_args=2)

def IS_STR(self, evaluator, args):
def is_str(x):
return type(x) is str
Expand Down Expand Up @@ -298,8 +321,8 @@ def _sort_impl(self, evaluator, args, funcname, reverse=False):
def construct_parser():
lparen = Literal("(").suppress()
rparen = Literal(")").suppress()
lbrack = Keyword("[").suppress()
rbrack = Keyword("]").suppress()
lbrack = Literal("[").suppress()
rbrack = Literal("]").suppress()
comma = Literal(",").suppress()
period = Literal(".").suppress()
string = (QuotedString('"') | QuotedString("'"))("constant")
Expand All @@ -320,7 +343,7 @@ def construct_parser():
expr = Forward()

# functions
functions = reduce(operator.or_, map(Keyword, ["IF", "IFSET", "GLOB", "EXISTS", "LIST",
functions = reduce(operator.or_, map(Keyword, ["IF", "IFSET", "GLOB", "EXISTS", "LIST", "GETITEM",
"BASENAME", "DIRNAME", "EXTENSION", "STRIPEXT", "MIN", "MAX", "IS_STR", "IS_NUM", "VALID", "RANGE", "NOSUBST", "SORT", "RSORT"]))
# these functions take one argument, which could also be a sequence
anyseq_functions = reduce(operator.or_, map(Keyword, ["GLOB", "EXISTS"]))
Expand All @@ -331,7 +354,9 @@ def construct_parser():
function_call = Group(functions + lparen +
Opt(delimited_list(expr|SELF)) +
rparen).setParseAction(FunctionHandler.pa)
operators = (

operators = [
((lbrack + expr + rbrack), 1, opAssoc.LEFT, GetItemHandler.pa),
(Literal("**"), 2, opAssoc.LEFT, BinaryHandler.pa),
(Literal("-")|Literal("+")|Literal("~"), 1, opAssoc.RIGHT, UnaryHandler.pa),
(Literal("*")|Literal("//")|Literal("/")|Literal("%"), 2, opAssoc.LEFT, BinaryHandler.pa),
Expand All @@ -344,7 +369,7 @@ def construct_parser():
(CaselessKeyword("in")|CaselessKeyword("not in"), 2, opAssoc.LEFT, BinaryHandler.pa),
(CaselessKeyword("not"), 1, opAssoc.RIGHT, UnaryHandler.pa),
(CaselessKeyword("and")|CaselessKeyword("or"), 2, opAssoc.LEFT, BinaryHandler.pa),
)
]

infix = infix_notation(atomic_value | function_call | function_call_anyseq | nested_field,
operators)("subexpression")
Expand Down Expand Up @@ -616,11 +641,7 @@ def evaluate_dict(self, params: Dict[str, Any],
if type(value) is str:
try:
new_value = self.evaluate(value, sublocation=sublocation + [name])
except AttributeError as err:
if raise_substitution_errors:
raise
new_value = Unresolved(errors=[err])
except SubstitutionError as err:
except (AttributeError, SubstitutionError, ParserError, FormulaError) as err:
if raise_substitution_errors:
raise
new_value = Unresolved(errors=[err])
Expand Down
2 changes: 2 additions & 0 deletions stimela/backends/kube/run_kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def run(cab: Cab, params: Dict[str, Any], fqname: str,
tmp_name = session_user + "--" + fqname.replace(".", "--").replace("_", "--")
token_hex = secrets.token_hex(4)
podname = tmp_name[0:50] + "--" + token_hex
# K8s don't like uppercase
podname = podname.lower()

namespace, kube_api, custom_obj_api = get_kube_api()

Expand Down
10 changes: 10 additions & 0 deletions stimela/backends/singularity.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ def build(cab: 'stimela.kitchen.cab.Cab', backend: 'stimela.backend.StimelaBacke
str: path to corresponding singularity image
"""

# ensure image directory exists
if os.path.exists(backend.singularity.image_dir):
if not os.path.isdir(backend.singularity.image_dir):
raise BackendError(f"invalid singularity image directory {backend.singularity.image_dir}")
else:
try:
os.mkdir(backend.singularity.image_dir)
except OSError as exc:
raise BackendError(f"failed to create singularity image directory {backend.singularity.image_dir}: {exc}")

image_name, simg_path = get_image_info(cab, backend)

# this is True if we're allowed to build missing images
Expand Down
12 changes: 11 additions & 1 deletion stimela/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,12 @@ def load_recipe_files(filenames: List[str]):
help="""explicitly skips steps wth the given tags.
Use commas, or give multiple times for multiple tags.""")
@click.option("-e", "--enable-step", "enable_steps", metavar="STEP(s)", multiple=True,
help="""Force-enable steps even if the recipe marks them as skipped. Use commas, or give multiple times
help="""force-enable steps even if the recipe marks them as skipped. Use commas, or give multiple times
for multiple steps.""")
@click.option("-f", "--disable-fresh-skips", "disable_fresh_skips", is_flag=True,
help="""forces execution of steps with a skip_if_outputs: fresh property.""")
@click.option("-F", "--disable-exist-skips", "disable_exist_skips", is_flag=True,
help="""forces execution of steps with a skip_if_outputs: exist property.""")
@click.option("-c", "--config", "config_equals", metavar="X.Y.Z=VALUE", nargs=1, multiple=True,
help="""tweak configuration options.""")
@click.option("-a", "--assign", metavar="PARAM VALUE", nargs=2, multiple=True,
Expand Down Expand Up @@ -202,6 +206,7 @@ def run(parameters: List[str] = [], dump_config: bool = False, dry_run: bool = F
config_assign: List[Tuple[str, str]] = [],
step_ranges: List[str] = [], tags: List[str] = [], skip_tags: List[str] = [], enable_steps: List[str] = [],
skip_ranges: List[str] = [],
disable_fresh_skips=False, disable_exist_skips=False,
build=False, rebuild=False, build_skips=False,
enable_native=False,
enable_singularity=False,
Expand All @@ -214,6 +219,11 @@ def run(parameters: List[str] = [], dump_config: bool = False, dry_run: bool = F
recipe_or_cab = None
files_to_load = []

if disable_fresh_skips:
stimela.CONFIG.opts.disable_skips.fresh = True
if disable_exist_skips:
stimela.CONFIG.opts.disable_skips.exist = True

def convert_value(value):
if value == "=UNSET":
return UNSET
Expand Down
11 changes: 8 additions & 3 deletions stimela/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ class StimelaLogConfig(object):
class StimelaProfilingOptions(object):
print_depth: int = 9999
unroll_loops: bool = False


@dataclass
class StimelaDisableSkipOptions(object):
fresh: bool = False
exist: bool = False

@dataclass
class StimelaOptions(object):
backend: StimelaBackendOptions = EmptyClassDefault(StimelaBackendOptions)
Expand All @@ -52,8 +57,8 @@ class StimelaOptions(object):
runtime: Dict[str, Any] = EmptyDictDefault()
## Profiling options
profile: StimelaProfilingOptions = EmptyClassDefault(StimelaProfilingOptions)


## Disables skip_if_outputs checks
disable_skips: StimelaDisableSkipOptions = EmptyClassDefault(StimelaDisableSkipOptions)

def DefaultDirs():
return field(default_factory=lambda:dict(indir='.', outdir='.'))
Expand Down
2 changes: 1 addition & 1 deletion stimela/kitchen/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,7 @@ def validate_for_loop(self, params, strict=False):
else:
raise ParameterValidationError(f"recipe '{self.name}': for_loop.over={self.for_loop.over} is unset")
if strict and isinstance(values, Unresolved):
raise ParameterValidationError(f"recipe '{self.name}': for_loop.over={self.for_loop.over} is unresolved")
raise ParameterValidationError(f"recipe '{self.name}': for_loop.over={self.for_loop.over} is unresolved", [values])
else:
if self._for_loop_values is None:
raise ParameterValidationError(f"recipe '{self.name}': for_loop.over is unset")
Expand Down
18 changes: 16 additions & 2 deletions stimela/kitchen/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,11 +487,25 @@ def run(self, backend: Optional[Dict] = None, subst: Optional[Dict[str, Any]] =
raise StepValidationError(f"step '{self.name}': invalid inputs: {join_quote(invalid)}", log=self.log)

## check if we need to skip based on existing/fresh file outputs
skip_if_outputs = self.skip_if_outputs
# don't check if skipping anyway
if skip:
skip_if_outputs = None
# don't check if remote filesystem
elif backend_runner.is_remote_fs:
parent_log_info(f"ignoring skip_if_outputs: {skip_if_outputs} because backend has remote filesystem")
skip_if_outputs = None
# don't check if force-disabled
elif (skip_if_outputs == OUTPUTS_EXISTS and stimela.CONFIG.opts.disable_skips.exist) or \
(skip_if_outputs == OUTPUTS_FRESH and stimela.CONFIG.opts.disable_skips.fresh):
parent_log_info(f"ignoring skip_if_outputs: {skip_if_outputs} because it has been force-disabled")
skip_if_outputs = None

## if skip on fresh outputs is in effect, find mtime of most recent input
if not backend_runner.is_remote_fs and not skip and self.skip_if_outputs:
if skip_if_outputs:
# max_mtime will remain 0 if we're not echecking for freshness, or if there are no file-type inputs
max_mtime, max_mtime_path = 0, None
if self.skip_if_outputs == OUTPUTS_FRESH:
if skip_if_outputs == OUTPUTS_FRESH:
parent_log_info("checking if file-type outputs of step are fresh")
for name, value in params.items():
schema = self.inputs_outputs[name]
Expand Down
1 change: 1 addition & 0 deletions tests/scabha_tests/test_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def test_parser():
"a.b <= 0",
"a.b",
"IFSET(a.b)",
"a.b[c.d]",
]:
print(f"\n\n\n=====================\nExpression: {string}\n")
a = expr.parseString(string, parse_all=True)
Expand Down
Loading

0 comments on commit 757f6fc

Please sign in to comment.