Skip to content

Commit

Permalink
Merge pull request #298 from caracal-pipeline/py11
Browse files Browse the repository at this point in the history
make Py11 compliant
  • Loading branch information
o-smirnov authored Jun 1, 2024
2 parents cf25412 + 4087ac8 commit 9448658
Show file tree
Hide file tree
Showing 16 changed files with 102 additions and 127 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.8", "3.9", "3.10"]
python-version: ["3.8", "3.9", "3.10", "3.12"]

steps:
- uses: actions/checkout@v3
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "stimela"
version = "2.0"
version = "2.0.1"
description = "Framework for system agnostic pipelines for (not just) radio interferometry"
authors = ["Sphesihle Makhathini <sphemakh@gmail.com>", "Oleg Smirnov and RATT <osmirnov@gmail.com>"]
readme = "README.rst"
Expand All @@ -14,7 +14,7 @@ packages = [
]

[tool.poetry.dependencies]
python = "^3.8"
python = ">=3.8,<3.13"
munch = "^2.5.0"
omegaconf = "^2.1"
importlib_metadata = { version = "4.13.0", python = "3.7" }
Expand Down
3 changes: 3 additions & 0 deletions scabha/basetypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ def ListDefault(*args):
def DictDefault(**kw):
return field(default_factory=lambda:dict(**kw))

def EmptyClassDefault(obj):
return field(default_factory=obj)


@dataclass
class Unresolved(object):
Expand Down
27 changes: 13 additions & 14 deletions scabha/cargo.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import dataclasses
import re, importlib, sys
import re, importlib
from collections import OrderedDict
from enum import Enum, IntEnum
from dataclasses import dataclass, field
from omegaconf import MISSING, ListConfig, DictConfig, OmegaConf
from enum import IntEnum
from dataclasses import dataclass
from omegaconf import ListConfig, DictConfig, OmegaConf

import rich.box
import rich.markup
Expand All @@ -13,7 +13,6 @@
from .exceptions import ParameterValidationError, DefinitionError, SchemaError, AssignmentError
from .validate import validate_parameters, Unresolved
from .substitutions import SubstitutionNS
from .basetypes import EmptyDictDefault, EmptyListDefault, UNSET, is_file_list_type, is_file_type

# need * imports from both to make eval(self.dtype, globals()) work
from typing import *
Expand Down Expand Up @@ -147,7 +146,7 @@ class Parameter(object):
nom_de_guerre: Optional[str] = None

# policies object, specifying a non-default way to handle this parameter
policies: ParameterPolicies = field(default_factory=ParameterPolicies)
policies: ParameterPolicies = EmptyClassDefault(ParameterPolicies)

# Parameter category, purely cosmetic, used for generating help and debug messages.
# Assigned automatically if None, but a schema may explicitly mark parameters as e.g.
Expand Down Expand Up @@ -275,8 +274,8 @@ def flatten_schemas(io_dest, io, label, prefix=""):
value = value.strip()
default = default.strip()
if (default.startswith('"') and default.endswith('"')) or \
(default.startswith("'") and default.endswith("'")):
default = default[1:-1]
(default.startswith("'") and default.endswith("'")):
default = default[1:-1]
schema['default'] = default
# does value end with "*"? Mark as required
elif value.endswith("*"):
Expand Down Expand Up @@ -410,7 +409,7 @@ def _resolve_implicit_parameters(self, params, subst: Optional[SubstitutionNS]=N
if name in params and name not in self._implicit_params and params[name] != schema.implicit:
raise ParameterValidationError(f"implicit parameter {name} was supplied explicitly")
if name in self.defaults:
raise SchemaError(f"implicit parameter {name} also has a default value")
raise SchemaError(f"implicit parameter {name} also has a default value")
params[name] = schema.implicit
self._implicit_params.add(name)
if current:
Expand All @@ -434,9 +433,9 @@ def prevalidate(self, params: Optional[Dict[str, Any]], subst: Optional[Substitu
schema.get_category()

params = validate_parameters(params, self.inputs_outputs, defaults=self.defaults, subst=subst, fqname=self.fqname,
check_unknowns=True, check_required=False,
check_inputs_exist=False, check_outputs_exist=False,
create_dirs=False, ignore_subst_errors=True)
check_unknowns=True, check_required=False,
check_inputs_exist=False, check_outputs_exist=False,
create_dirs=False, ignore_subst_errors=True)

return params

Expand Down Expand Up @@ -517,8 +516,8 @@ def rich_help(self, tree, max_category=ParameterCategory.Optional):
schema.info and info.append(rich.markup.escape(schema.info))
attrs and info.append(f"[dim]\[{rich.markup.escape(', '.join(attrs))}][/dim]")
table.add_row(f"[bold]{name}[/bold]",
f"[dim]{rich.markup.escape(str(schema.dtype))}[/dim]",
" ".join(info))
f"[dim]{rich.markup.escape(str(schema.dtype))}[/dim]",
" ".join(info))

def assign_value(self, key: str, value: Any, override: bool = False):
"""assigns a parameter value to the cargo.
Expand Down
14 changes: 7 additions & 7 deletions stimela/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import logging
from dataclasses import dataclass
from typing import Union, Dict, Any, List, Optional
from typing import Dict, Any, Optional
from enum import Enum
from omegaconf import ListConfig, OmegaConf
from stimela.exceptions import BackendSpecificationError, BackendError
from stimela.stimelogging import log_exception
from scabha.basetypes import EmptyDictDefault
from scabha.basetypes import EmptyDictDefault, EmptyClassDefault

from .singularity import SingularityBackendOptions
from .kube import KubeBackendOptions
Expand Down Expand Up @@ -49,17 +49,17 @@ class StimelaBackendOptions(object):

select: Any = "singularity,native" # should be Union[str, List[str]], but OmegaConf doesn't support it, so handle in __post_init__ for now

singularity: Optional[SingularityBackendOptions] = None
kube: Optional[KubeBackendOptions] = None
native: Optional[NativeBackendOptions] = None
singularity: Optional[SingularityBackendOptions] = EmptyClassDefault(SingularityBackendOptions)
kube: Optional[KubeBackendOptions] = EmptyClassDefault(KubeBackendOptions)
native: Optional[NativeBackendOptions] = EmptyClassDefault(NativeBackendOptions)
docker: Optional[Dict] = None # placeholder for future impl
slurm: Optional[SlurmOptions] = None
slurm: Optional[SlurmOptions] = EmptyClassDefault(SlurmOptions)

## Resource limits applied during run -- see resource module
rlimits: Dict[str, Any] = EmptyDictDefault()

verbose: int = 0 # be verbose about backend selections. Higher levels mean more verbosity

def __post_init__(self):
# resolve "select" field
if type(self.select) is str:
Expand Down
41 changes: 11 additions & 30 deletions stimela/backends/flavours/casa.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
from typing import Optional, Any, Union, Dict, List
from dataclasses import dataclass
import tempfile

import stimela
from scabha.basetypes import EmptyListDefault
Expand Down Expand Up @@ -39,7 +40,7 @@ def get_image_name(self, cab: Cab, backend: 'stimela.backend.StimelaBackendOptio
return resolve_image_name(backend, cab.image or CONFIG.images['default-casa'])

def get_arguments(self, cab: Cab, params: Dict[str, Any], subst: Dict[str, Any],
virtual_env: Optional[str]=None, check_executable: bool = True):
virtual_env: Optional[str]=None, check_executable: bool = True):

with substitutions_from(subst, raise_errors=True) as context:
try:
Expand All @@ -64,35 +65,15 @@ def get_arguments(self, cab: Cab, params: Dict[str, Any], subst: Dict[str, Any],
command = f"{virtual_env}/bin/{command}"

self.command_name = command
# convert inputs into a JSON string
pass_params = cab.filter_input_params(params)
params_string = json.dumps(pass_params)

# unicode instance only exists in python2, python3 bytes
code = f"""
import sys, json
kw = json.loads(sys.argv[-1])
try:
utype = unicode
except NameError:
utype = bytes
def stringify(x):
if isinstance(x, (utype, str)):
return str(x)
elif isinstance(x, list):
return [stringify(y) for y in x]
else:
return x
kw = {{key: stringify(value) for key, value in kw.items()}}
{command}(**kw)
"""

args = casa.strip().split() + list(casa_opts) + ["-c", code, params_string]
pass_params = dict(cab.filter_input_params(params))

# parse the params direcly as python dictionary
# no need to string conversion between strings/bytes/unicode
# this works for both python 2.7 and 3.x
code = f"{command}(**{pass_params})"

args = casa.strip().split() + list(casa_opts) + ["-c", code]
return args



32 changes: 17 additions & 15 deletions stimela/backends/kube/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List, Optional, Any, Callable
from typing import Dict, List, Optional, Any
from enum import Enum
from omegaconf import OmegaConf
from dataclasses import dataclass
Expand All @@ -14,14 +14,15 @@
import os

import stimela
from scabha.basetypes import EmptyDictDefault, DictDefault, EmptyListDefault, ListDefault
from scabha.basetypes import ( EmptyDictDefault, DictDefault, EmptyListDefault,
ListDefault, EmptyClassDefault)
from stimela.exceptions import BackendError

session_id = secrets.token_hex(8)
session_user = getpass.getuser()

resource_labels = dict(stimela_user=session_user,
stimela_session_id=session_id)
stimela_session_id=session_id)

try:
import kubernetes
Expand Down Expand Up @@ -59,8 +60,8 @@ class PodLimits(object):
# selects a specific pod type from a KubeBackendOptions.predefined_pod_specs
type: Optional[str] = None
# memory limit/requirement
memory: Optional[PodLimits] = None
cpu: Optional[PodLimits] = None
memory: Optional[PodLimits] = EmptyClassDefault(PodLimits)
cpu: Optional[PodLimits] = EmptyClassDefault(PodLimits)
# arbitrary additional structure copied into the pod spec
custom_pod_spec: Dict[str, Any] = EmptyDictDefault()

Expand All @@ -84,8 +85,8 @@ class StartupOptions(object):
report_pvcs: bool = True # report any transient PVCs
cleanup_pvcs: bool = True # cleanup any transient PVCs

on_exit: ExitOptions = ExitOptions() # startup behaviour options
on_startup: StartupOptions = StartupOptions() # cleanup behaviour options
on_exit: ExitOptions = EmptyClassDefault(ExitOptions) # startup behaviour options
on_startup: StartupOptions = EmptyClassDefault(StartupOptions) # cleanup behaviour options

@dataclass
class Volume(object):
Expand Down Expand Up @@ -140,8 +141,8 @@ class DaskCluster(object):
num_workers: int = 1
threads_per_worker: int = 1
memory_limit: Optional[str] = None
worker_pod: KubePodSpec = KubePodSpec()
scheduler_pod: KubePodSpec = KubePodSpec()
worker_pod: KubePodSpec = EmptyClassDefault(KubePodSpec)
scheduler_pod: KubePodSpec = EmptyClassDefault(KubePodSpec)
forward_dashboard_port: int = 8787 # set to non-0 to forward the http dashboard to this local port

@dataclass
Expand All @@ -157,15 +158,15 @@ class LocalMount(object):
mkdir: bool = False # create dir, if it is missing


enable: bool = True
enable: bool = True

# infrastructure settings are global and can't be changed per cab or per step
infrastructure: Infrastructure = Infrastructure()
infrastructure: Infrastructure = EmptyClassDefault(Infrastructure)

context: Optional[str] = None # k8s context -- use default if not given -- can't change
namespace: Optional[str] = None # k8s namespace

dask_cluster: Optional[DaskCluster] = None # if set, a DaskJob will be created
dask_cluster: Optional[DaskCluster] = EmptyClassDefault(DaskCluster) # if set, a DaskJob will be created
service_account: str = "compute-runner"
kubectl_path: str = "kubectl"

Expand Down Expand Up @@ -198,9 +199,10 @@ class DebugOptions(object):
event_colors: Dict[str, str] = DictDefault(
warning="blue", error="yellow", default="grey50")

debug: DebugOptions = DebugOptions()
debug: DebugOptions = EmptyClassDefault(DebugOptions)

job_pod: KubePodSpec = KubePodSpec()

job_pod: KubePodSpec = EmptyClassDefault(KubePodSpec)

capture_logs_style: Optional[str] = "blue"

Expand All @@ -216,7 +218,7 @@ class UserInfo(object):
home_ramdisk: bool = True # home dir mounted as RAM disk, else local disk
inject_nss: bool = True # inject user info for NSS_WRAPPER

user: UserInfo = UserInfo()
user: UserInfo = EmptyClassDefault(UserInfo)

# user-defined set of pod types -- each is a pod spec structure keyed by pod_type
predefined_pod_specs: Dict[str, Dict[str, Any]] = EmptyDictDefault()
Expand Down
6 changes: 2 additions & 4 deletions stimela/backends/kube/infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@
import atexit
import json
import time
import rich
from typing import Optional, Dict, List

from stimela.backends import StimelaBackendOptions
from stimela.stimelogging import log_exception, declare_chapter
from stimela.stimelogging import log_exception
from stimela.task_stats import update_process_status
from scabha.basetypes import EmptyListDefault

from stimela.exceptions import BackendError
from . import session_id, session_user, resource_labels, run_kube, KubeBackendOptions, get_kube_api, get_context_namespace
Expand Down Expand Up @@ -104,7 +102,7 @@ def init(backend: StimelaBackendOptions, log: logging.Logger, cleanup: bool = Fa

# cleanup transient PVCs
transient_pvcs = {name: pvc for name, pvc in active_pvcs.items()
if pvc.metadata.labels and 'stimela_transient_pvc' in pvc.metadata.labels}
if pvc.metadata.labels and 'stimela_transient_pvc' in pvc.metadata.labels}

if transient_pvcs:
if cleanup or kube.infrastructure.on_startup.cleanup_pvcs:
Expand Down
3 changes: 1 addition & 2 deletions stimela/backends/kube/kube_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from typing import Dict, Any
import re
from datetime import datetime
import rich
from rich.markup import escape
from requests import ConnectionError
from urllib3.exceptions import HTTPError
Expand All @@ -12,7 +11,7 @@
from stimela.exceptions import StimelaCabRuntimeError

from kubernetes.client.rest import ApiException
from . import get_kube_api, KubeBackendOptions, session_id
from . import get_kube_api, KubeBackendOptions

k8s_cpu_units = {
"": 1,
Expand Down
5 changes: 4 additions & 1 deletion stimela/backends/native/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import logging
import stimela

# add these as module attributes
from .run_native import run, build_command_line, update_rlimits

def is_available(opts = None):
return True

Expand All @@ -12,12 +15,12 @@ def get_status():
def is_remote():
return False

from .run_native import run, build_command_line, update_rlimits

@dataclass
class NativeBackendOptions(object):
enable: bool = True
virtual_env: Optional[str] = None


def init(backend: 'stimela.backend.StimelaBackendOptions', log: logging.Logger):
pass
2 changes: 1 addition & 1 deletion stimela/backends/native/run_native.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging, datetime, resource, os.path

from typing import Dict, Optional, Any, List, Callable
from typing import Dict, Optional, Any

import stimela
import stimela.kitchen
Expand Down
Loading

0 comments on commit 9448658

Please sign in to comment.