Skip to content

Commit

Permalink
Merge pull request #236 from caracal-pipeline/issue-235
Browse files Browse the repository at this point in the history
Issue 235
  • Loading branch information
o-smirnov authored Feb 18, 2024
2 parents c351635 + c0a7b31 commit 78bfbdb
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 9 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "stimela"
version = "2.0rc10"
version = "2.0rc11"
description = "Framework for system agnostic pipelines for (not just) radio interferometry"
authors = ["Sphesihle Makhathini <sphemakh@gmail.com>", "Oleg Smirnov and RATT <osmirnov@gmail.com>"]
readme = "README.rst"
Expand Down
31 changes: 24 additions & 7 deletions scabha/cargo.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ def __post_init__(self):
self._dyn_schema = getattr(mod, funcname, None)
if not callable(self._dyn_schema):
raise DefinitionError(f"{modulename}.{funcname} is not a valid callable")
# make backup copy of original inputs/outputs
self._original_inputs_outputs = self.inputs.copy(), self.outputs.copy()

@property
def inputs_outputs(self):
Expand All @@ -363,12 +365,12 @@ def finalize(self, config=None, log=None, fqname=None, backend=None, nesting=0):
self.log = log
self.logopts = config.opts.log.copy()

def apply_dynamic_schemas(self, params):
def apply_dynamic_schemas(self, params, subst: Optional[SubstitutionNS]=None):
# update schemas, if dynamic schema is enabled
if self._dyn_schema:
self._inputs_outputs = None
try:
self.inputs, self.outputs = self._dyn_schema(params, self.inputs, self.outputs)
self.inputs, self.outputs = self._dyn_schema(params, *self._original_inputs_outputs)
except Exception as exc:
raise SchemaError(f"error evaluating dynamic schema", exc) # [exc, sys.exc_info()[2]])
for io in self.inputs, self.outputs:
Expand All @@ -383,10 +385,18 @@ def apply_dynamic_schemas(self, params):
for schema in self.outputs.values():
schema._is_input = False
# re-resolve implicits
self._resolve_implicit_parameters(params)

def _resolve_implicit_parameters(self, params):
self._resolve_implicit_parameters(params, subst)

def _resolve_implicit_parameters(self, params, subst: Optional[SubstitutionNS]=None):
# remove previously defined implicits
current = subst and getattr(subst, 'current', None)
for p in self._implicit_params:
if p in params:
del params[p]
if current and p in current:
del current[p]
self._implicit_params = set()
# regenerate
for name, schema in self.inputs_outputs.items():
if schema.implicit is not None and type(schema.implicit) is not Unresolved:
if name in params and name not in self._implicit_params and params[name] != schema.implicit:
Expand All @@ -395,6 +405,8 @@ def _resolve_implicit_parameters(self, params):
raise SchemaError(f"implicit parameter {name} also has a default value")
params[name] = schema.implicit
self._implicit_params.add(name)
if current:
current[name] = schema.implicit


def prevalidate(self, params: Optional[Dict[str, Any]], subst: Optional[SubstitutionNS]=None, root=False):
Expand All @@ -403,7 +415,12 @@ def prevalidate(self, params: Optional[Dict[str, Any]], subst: Optional[Substitu
A dynamic schema, if defined, is applied at this point."""
self.finalize()
# add implicits, if resolved
self._resolve_implicit_parameters(params)
# remove previous ones from substitution namespace
if subst and hasattr(subst, 'current'):
for p in self._implicit_params:
if p in subst.current:
del subst.current[p]
self._resolve_implicit_parameters(params, subst)
# assign unset categories
for name, schema in self.inputs_outputs.items():
schema.get_category()
Expand All @@ -422,7 +439,7 @@ def validate_inputs(self, params: Dict[str, Any], subst: Optional[SubstitutionNS
If remote_fs is True, doesn't check files and directories.
"""
assert(self.finalized)
self._resolve_implicit_parameters(params)
self._resolve_implicit_parameters(params, subst)

# check inputs
params1 = validate_parameters(params, self.inputs, defaults=self.defaults, subst=subst, fqname=self.fqname,
Expand Down
2 changes: 1 addition & 1 deletion stimela/kitchen/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ def prevalidate_steps():

try:
step_params = step.prevalidate(subst)
subst.current._merge_(step_params) # these may have changed in prevalidation
subst.current._merge_(step_params)
except ScabhaBaseException as exc:
errors.append(RecipeValidationError(f"step '{label}' failed prevalidation", exc))
except Exception as exc:
Expand Down
1 change: 1 addition & 0 deletions stimela/kitchen/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ def finalize(self, config=None, log=None, fqname=None, backend=None, nesting=0):

def prevalidate(self, subst: Optional[SubstitutionNS]=None, root=False):
self.finalize()
self.cargo.apply_dynamic_schemas(self.params, subst)
# validate cab or recipe
params = self.validated_params = self.cargo.prevalidate(self.params, subst, root=root)
# add missing outputs
Expand Down

0 comments on commit 78bfbdb

Please sign in to comment.