Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 235 #236

Merged
merged 3 commits into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "stimela"
version = "2.0rc10"
version = "2.0rc11"
description = "Framework for system agnostic pipelines for (not just) radio interferometry"
authors = ["Sphesihle Makhathini <sphemakh@gmail.com>", "Oleg Smirnov and RATT <osmirnov@gmail.com>"]
readme = "README.rst"
Expand Down
31 changes: 24 additions & 7 deletions scabha/cargo.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ def __post_init__(self):
self._dyn_schema = getattr(mod, funcname, None)
if not callable(self._dyn_schema):
raise DefinitionError(f"{modulename}.{funcname} is not a valid callable")
# make backup copy of original inputs/outputs
self._original_inputs_outputs = self.inputs.copy(), self.outputs.copy()

@property
def inputs_outputs(self):
Expand All @@ -363,12 +365,12 @@ def finalize(self, config=None, log=None, fqname=None, backend=None, nesting=0):
self.log = log
self.logopts = config.opts.log.copy()

def apply_dynamic_schemas(self, params):
def apply_dynamic_schemas(self, params, subst: Optional[SubstitutionNS]=None):
# update schemas, if dynamic schema is enabled
if self._dyn_schema:
self._inputs_outputs = None
try:
self.inputs, self.outputs = self._dyn_schema(params, self.inputs, self.outputs)
self.inputs, self.outputs = self._dyn_schema(params, *self._original_inputs_outputs)
except Exception as exc:
raise SchemaError(f"error evaluating dynamic schema", exc) # [exc, sys.exc_info()[2]])
for io in self.inputs, self.outputs:
Expand All @@ -383,10 +385,18 @@ def apply_dynamic_schemas(self, params):
for schema in self.outputs.values():
schema._is_input = False
# re-resolve implicits
self._resolve_implicit_parameters(params)

def _resolve_implicit_parameters(self, params):
self._resolve_implicit_parameters(params, subst)

def _resolve_implicit_parameters(self, params, subst: Optional[SubstitutionNS]=None):
# remove previously defined implicits
current = subst and getattr(subst, 'current', None)
for p in self._implicit_params:
if p in params:
del params[p]
if current and p in current:
del current[p]
self._implicit_params = set()
# regenerate
for name, schema in self.inputs_outputs.items():
if schema.implicit is not None and type(schema.implicit) is not Unresolved:
if name in params and name not in self._implicit_params and params[name] != schema.implicit:
Expand All @@ -395,6 +405,8 @@ def _resolve_implicit_parameters(self, params):
raise SchemaError(f"implicit parameter {name} also has a default value")
params[name] = schema.implicit
self._implicit_params.add(name)
if current:
current[name] = schema.implicit


def prevalidate(self, params: Optional[Dict[str, Any]], subst: Optional[SubstitutionNS]=None, root=False):
Expand All @@ -403,7 +415,12 @@ def prevalidate(self, params: Optional[Dict[str, Any]], subst: Optional[Substitu
A dynamic schema, if defined, is applied at this point."""
self.finalize()
# add implicits, if resolved
self._resolve_implicit_parameters(params)
# remove previous ones from substitution namespace
if subst and hasattr(subst, 'current'):
for p in self._implicit_params:
if p in subst.current:
del subst.current[p]
self._resolve_implicit_parameters(params, subst)
# assign unset categories
for name, schema in self.inputs_outputs.items():
schema.get_category()
Expand All @@ -422,7 +439,7 @@ def validate_inputs(self, params: Dict[str, Any], subst: Optional[SubstitutionNS
If remote_fs is True, doesn't check files and directories.
"""
assert(self.finalized)
self._resolve_implicit_parameters(params)
self._resolve_implicit_parameters(params, subst)

# check inputs
params1 = validate_parameters(params, self.inputs, defaults=self.defaults, subst=subst, fqname=self.fqname,
Expand Down
2 changes: 1 addition & 1 deletion stimela/kitchen/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ def prevalidate_steps():

try:
step_params = step.prevalidate(subst)
subst.current._merge_(step_params) # these may have changed in prevalidation
subst.current._merge_(step_params)
except ScabhaBaseException as exc:
errors.append(RecipeValidationError(f"step '{label}' failed prevalidation", exc))
except Exception as exc:
Expand Down
1 change: 1 addition & 0 deletions stimela/kitchen/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ def finalize(self, config=None, log=None, fqname=None, backend=None, nesting=0):

def prevalidate(self, subst: Optional[SubstitutionNS]=None, root=False):
self.finalize()
self.cargo.apply_dynamic_schemas(self.params, subst)
# validate cab or recipe
params = self.validated_params = self.cargo.prevalidate(self.params, subst, root=root)
# add missing outputs
Expand Down
Loading