Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 190 #233

Merged
merged 2 commits into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions stimela/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class StimelaBackendOptions(object):
## Resource limits applied during run -- see resource module
rlimits: Dict[str, Any] = EmptyDictDefault()

verbose: int = 0 # be verbose about backend selections. Higher levels mean more verbosity

def __post_init__(self):
# resolve "select" field
if type(self.select) is str:
Expand Down
46 changes: 30 additions & 16 deletions stimela/backends/kube/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,11 @@ def __post_init__ (self):
# subclasses for options
@dataclass
class DaskCluster(object):
enable: bool = False
capture_logs: bool = True
capture_logs_style: Optional[str] = "blue"
name: Optional[str] = None
num_workers: int = 0
num_workers: int = 1
threads_per_worker: int = 1
memory_limit: Optional[str] = None
worker_pod: KubePodSpec = KubePodSpec()
Expand Down Expand Up @@ -184,20 +185,22 @@ class LocalMount(object):

@dataclass
class DebugOptions(object):
print: int = 0 # debug-print level. Higher numbers mean more verbosity
verbose: int = 0 # debug log level. Higher numbers mean more verbosity
pause_on_start: bool = False # pause instead of running payload
pause_on_cleanup: bool = False # pause before attempting cleanup

save_spec: Optional[str] = None # if set, pod/job specs will be saved as YaML to the named file. {}-substitutions apply to filename.

# if >0, events will be collected and reported
log_events: bool = False
# format string for reporting kubernetes events, this can include rich markup
event_format: str = "=NOSUBST('\[k8s event type: {event.type}, reason: {event.reason}] {event.message}')"
event_colors: Dict[str, str] = DictDefault(
warning="blue", error="yellow", default="grey50")

debug: DebugOptions = DebugOptions()

job_pod: KubePodSpec = KubePodSpec()

# if >0, events will be collected and reported
verbose_events: int = 0
# format string for reporting kubernetes events, this can include rich markup
verbose_event_format: str = "=NOSUBST('\[k8s event type: {event.type}, reason: {event.reason}] {event.message}')"
verbose_event_colors: Dict[str, str] = DictDefault(
warning="blue", error="yellow", default="grey50")

capture_logs_style: Optional[str] = "blue"

Expand Down Expand Up @@ -256,21 +259,32 @@ def run(cab: 'stimela.kitchen.cab.Cab', params: Dict[str, Any], fqname: str,
from . import run_kube
return run_kube.run(cab=cab, params=params, fqname=fqname, backend=backend, log=log, subst=subst)

_kube_client = _kube_config = _kube_context = None
_kube_client = _kube_config = _kube_context = _kube_namespace = None

def get_kube_api(context: Optional[str]=None):
global _kube_client
global _kube_config
global _kube_context
global _kube_client, _kube_config, _kube_context, _kube_namespace

if _kube_config is None:
_kube_config = True
_kube_context = context
kubernetes.config.load_kube_config(context=context)
elif context != _kube_context:
contexts, current_context = kubernetes.config.list_kube_config_contexts()
if context is None:
context = current_context['name']
_kube_context = context
for ctx in contexts:
if ctx['name'] == context:
break
_kube_namespace = ctx['context']['namespace']
else:
_kube_namespace = "default"

elif context is not None and context != _kube_context:
raise BackendError(f"k8s context has changed (was {_kube_context}, now {context}), this is not permitted")

return core_v1_api.CoreV1Api(), CustomObjectsApi()
return _kube_namespace, core_v1_api.CoreV1Api(), CustomObjectsApi()

def get_context_namespace():
return _kube_context, _kube_namespace

_uid = os.getuid()
_gid = os.getgid()
Expand Down
44 changes: 23 additions & 21 deletions stimela/backends/kube/infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from scabha.basetypes import EmptyListDefault

from stimela.exceptions import BackendError
from . import session_id, session_user, resource_labels, run_kube, KubeBackendOptions, get_kube_api
from . import session_id, session_user, resource_labels, run_kube, KubeBackendOptions, get_kube_api, get_context_namespace

Lifecycle = KubeBackendOptions.Volume.Lifecycle

Expand Down Expand Up @@ -56,29 +56,27 @@ def init(backend: StimelaBackendOptions, log: logging.Logger, cleanup: bool = Fa
klog = log.getChild("kube")
kube = backend.kube

if not kube.namespace:
klog.error("kube.namespace not configured, kube backend will not be available")
return False


if cleanup:
klog.info("cleaning up backend")
else:
atexit.register(close, backend, klog)
klog.info("initializing kube backend")

try:
kube_api, _ = run_kube.get_kube_api(kube.context)
namespace, kube_api, _ = get_kube_api(kube.context)
except ConfigException as exc:
log_exception(exc, log=klog)
log_exception(BackendError("error initializing kube backend", exc), log=klog)
return False

context, namespace = get_context_namespace()
klog.info(f"k8s context is {context}, namespace is {namespace}")

if cleanup or kube.infrastructure.on_startup.report_pods or kube.infrastructure.on_startup.cleanup_pods:
klog.info("checking for k8s pods from other sessions")

try:
pods = kube_api.list_namespaced_pod(namespace=kube.namespace,
pods = kube_api.list_namespaced_pod(namespace=namespace,
label_selector=f"stimela_user={session_user}")
except ApiException as exc:
raise BackendError(f"k8s API error while listing pods", json.loads(exc.body))
Expand All @@ -94,7 +92,7 @@ def init(backend: StimelaBackendOptions, log: logging.Logger, cleanup: bool = Fa
if not cleanup:
klog.warning(f"since kube.infrastructure.on_starup.cleanup_pods is set, these will be terminated")
for podname in running_pods:
_delete_pod(kube_api, podname, kube.namespace, klog)
_delete_pod(kube_api, podname, namespace, klog)

elif kube.infrastructure.on_startup.report_pods:
klog.warning(f"you have {len(running_pods)} pod(s) running from another stimela session")
Expand Down Expand Up @@ -130,11 +128,11 @@ def init(backend: StimelaBackendOptions, log: logging.Logger, cleanup: bool = Fa


def refresh_pvc_list(kube: KubeBackendOptions):
kube_api, _ = get_kube_api()
namespace, kube_api, _ = get_kube_api()
global active_pvcs, terminating_pvcs
# get existing pvcs
try:
list_pvcs = kube_api.list_namespaced_persistent_volume_claim(kube.namespace)
list_pvcs = kube_api.list_namespaced_persistent_volume_claim(namespace)
except ApiException as exc:
raise BackendError(f"k8s API error while listing PVCs", json.loads(exc.body)) from None
pvc_names = []
Expand Down Expand Up @@ -163,7 +161,7 @@ def refresh_pvc_list(kube: KubeBackendOptions):


def resolve_volumes(kube: KubeBackendOptions, log: logging.Logger, step_token=None, refresh=True):
kube_api, _ = get_kube_api()
namespace, kube_api, _ = get_kube_api()
ExistsPolicy = KubeBackendOptions.Volume.ExistPolicy

if refresh:
Expand Down Expand Up @@ -218,7 +216,7 @@ def resolve_volumes(kube: KubeBackendOptions, log: logging.Logger, step_token=No
# if existing PVC with that is still terminating, wait
if pvc.name in terminating_pvcs:
log.info(f"waiting for existing PVC '{pvc.name}' to terminate before re-creating")
_await_pvc_termination(kube.namespace, pvc, log=log)
_await_pvc_termination(namespace, pvc, log=log)
# create
newpvc = client.V1PersistentVolumeClaim()
newpvc.metadata = client.V1ObjectMeta(name=pvc.name, labels=labels)
Expand All @@ -236,7 +234,7 @@ def resolve_volumes(kube: KubeBackendOptions, log: logging.Logger, step_token=No
data_source=data_source,
resources=client.V1ResourceRequirements(requests={"storage": pvc.capacity}))
try:
resp = kube_api.create_namespaced_persistent_volume_claim(kube.namespace, newpvc)
resp = kube_api.create_namespaced_persistent_volume_claim(namespace, newpvc)
except ApiException as exc:
raise BackendError(f"k8s API error while creating PVC '{pvc.name}'", json.loads(exc.body)) from None
pvc.owner = session_user
Expand All @@ -249,7 +247,7 @@ def resolve_volumes(kube: KubeBackendOptions, log: logging.Logger, step_token=No


def await_pvcs(namespace, pvc_names, log: logging.Logger):
kube_api, _ = get_kube_api()
namespace, kube_api, _ = get_kube_api()

waiting_pvcs = set(pvc_names)
waiting_reported = set()
Expand Down Expand Up @@ -285,7 +283,7 @@ def await_pvcs(namespace, pvc_names, log: logging.Logger):
time.sleep(1)

def _await_pvc_termination(namespace, pvc: KubeBackendOptions.Volume, log: logging.Logger):
kube_api, _ = get_kube_api()
namespace, kube_api, _ = get_kube_api()
time0 = time.time()
while True:
update_process_status()
Expand All @@ -308,7 +306,7 @@ def _await_pvc_termination(namespace, pvc: KubeBackendOptions.Volume, log: loggi


def delete_pvcs(kube: KubeBackendOptions, pvc_names, log: logging.Logger, force=False, step=True, session=False, refresh=True):
kube_api, _ = get_kube_api()
namespace, kube_api, _ = get_kube_api()
global terminating_pvcs

if refresh:
Expand All @@ -328,7 +326,7 @@ def delete_pvcs(kube: KubeBackendOptions, pvc_names, log: logging.Logger, force=
(session and pvc.lifecycle == Lifecycle.session):
log.info(f"deleting PVC '{pvc.name}'")
try:
resp = kube_api.delete_namespaced_persistent_volume_claim(name=pvc.name, namespace=kube.namespace)
resp = kube_api.delete_namespaced_persistent_volume_claim(name=pvc.name, namespace=namespace)
except ApiException as exc:
body = json.loads(exc.body)
log_exception(BackendError(f"k8s API error while deleting PVC '{pvc.name}'", (exc, body)),
Expand All @@ -341,17 +339,21 @@ def delete_pvcs(kube: KubeBackendOptions, pvc_names, log: logging.Logger, force=

def close(backend: StimelaBackendOptions, log: logging.Logger):
kube = backend.kube
context, namespace = get_context_namespace()
if context is None:
return

klog.info("closing kube backend")

# release PVCs
delete_pvcs(kube, list(active_pvcs.keys()), log=klog, session=True, step=True, refresh=False)

# cleanup pods, if any
if kube.infrastructure.on_exit.cleanup_pods:
kube_api, _ = run_kube.get_kube_api()
namespace, kube_api, _ = run_kube.get_kube_api()

try:
pods = kube_api.list_namespaced_pod(namespace=kube.namespace,
pods = kube_api.list_namespaced_pod(namespace=namespace,
label_selector=f"stimela_session_id={session_id}")
except ApiException as exc:
body = json.loads(exc.body)
Expand All @@ -367,7 +369,7 @@ def close(backend: StimelaBackendOptions, log: logging.Logger):
klog.warning(f"you have {len(running_pods)} pod(s) still pending or running from this session")
klog.warning(f"since kube.infrastructure.on_exit.cleanup_pods is set, these will be terminated")
for podname in running_pods:
_delete_pod(kube_api, podname, kube.namespace, klog)
_delete_pod(kube_api, podname, namespace, klog)

atexit.unregister(close)

15 changes: 7 additions & 8 deletions stimela/backends/kube/kube_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,14 @@ def apply_pod_spec(kps, pod_spec: Dict[str, Any], predefined_pod_specs: Dict[str
return pod_spec

class StatusReporter(object):
def __init__(self, namespace: str, log: logging.Logger,
def __init__(self, log: logging.Logger,
podname: str,
kube: KubeBackendOptions,
event_handler: None,
update_interval: float = 1,
enable_metrics: bool = True):
self.kube = kube
self.kube_api, self.custom_api = get_kube_api()
self.namespace = namespace
self.namespace, self.kube_api, self.custom_api = get_kube_api()
self.log = log
self.podname = podname
self.label_selector = f"stimela_job={podname}"
Expand Down Expand Up @@ -161,17 +160,17 @@ def log_events(self):
try:
self.event_handler(event)
except Exception as exc:
self.log.error(self.kube.verbose_event_format.format(event=event))
self.log.error(self.kube.debug.event_format.format(event=event))
raise
# no error from handler, report event if configured to
if self.kube.verbose_events:
color = self.kube.verbose_event_colors.get(event.type.lower()) \
or self.kube.verbose_event_colors.get("default")
if self.kube.debug.log_events:
color = self.kube.debug.event_colors.get(event.type.lower()) \
or self.kube.debug.event_colors.get("default")
# escape console markup on string fields
for key, value in event.__dict__.items():
if type(value) is str:
setattr(event, key, escape(value))
self.log.info(self.kube.verbose_event_format.format(event=event),
self.log.info(self.kube.debug.event_format.format(event=event),
extra=dict(color=color) if color else {})

def update(self):
Expand Down
16 changes: 8 additions & 8 deletions stimela/backends/kube/pod_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
class PodProxy(object):

def __init__(self, kube: KubeBackendOptions, podname: str, image_name: str, command: str, log: logging.Logger):
self.kube_api, _ = get_kube_api()
self.namespace, self.kube_api, _ = get_kube_api()
self.kube = kube
self.name = podname
self.log = log
Expand Down Expand Up @@ -177,7 +177,7 @@ def dispatch_container_logs(self, style: str, job: bool = True):
for cont in containers:
contname = cont['name']
try:
loglines = self.kube_api.read_namespaced_pod_log(name=self.name, namespace=self.kube.namespace, container=contname)
loglines = self.kube_api.read_namespaced_pod_log(name=self.name, namespace=self.namespace, container=contname)
for line in loglines.split("\n"):
if line:
dispatch_to_log(self.log, line, contname, "stdout", prefix=f"{contname}#",
Expand All @@ -195,7 +195,7 @@ def _print_logs(self, name, style, container=None):
try:
# rich.print(f"[yellow]started log thread for {name}[/yellow]")
# Open a stream to the logs
stream = self.kube_api.read_namespaced_pod_log(name=name, namespace=self.kube.namespace, container=container,
stream = self.kube_api.read_namespaced_pod_log(name=name, namespace=self.namespace, container=container,
_preload_content=False, follow=True)
for line in stream.stream():
dispatch_to_log(self.log, line.decode().rstrip(), name, "stdout", prefix=prefix, style=style, output_wrangler=None)
Expand All @@ -215,12 +215,12 @@ def add_file_check_commands(self, params: Dict[str, Parameter], cab: Cab):
cont = self.step_init_container
for path in mkdir_list:
error = f"VALIDATION ERROR: mkdir {path} failed"
self.add_init_container_command(cont, f"if mkdir -p {path}; then echo Created directory {path}; else echo {error}; exit 1; fi; ")
self.add_init_container_command(cont, f"ls {path} >/dev/null; if mkdir -p {path}; then echo Created directory {path}; else echo {error}; exit 1; fi; ")
for path in must_exist_list:
error = f"VALIDATION ERROR: {path} doesn\\'t exist"
self.add_init_container_command(cont, f"if test -e '{path}'; then echo Checking {path}: exists; else echo {error}; exit 1; fi; ")
self.add_init_container_command(cont, f"ls {path} >/dev/null; if test -e '{path}'; then echo Checking {path}: exists; else echo {error}; exit 1; fi; ")
for path in remove_if_exists_list:
self.add_init_container_command(cont, f"if test -e {path}; then echo Removing {path}; rm -fr {path}; true; fi; ")
self.add_init_container_command(cont, f"ls {path} >/dev/null; if test -e {path}; then echo Removing {path}; rm -fr {path}; true; fi; ")
# make sure the relevant mounts are provided inside init container
for mount in active_mounts:
self.add_init_container_mount(cont, self._mounts[mount], mount)
Expand All @@ -247,7 +247,7 @@ def cleanup_elapsed():
if thread.is_alive() and aux_pod:
self.log.info(f"{desc} alive, trying to delete associated pod")
try:
self.kube_api.delete_namespaced_pod(name=aux_pod, namespace=self.kube.namespace)
self.kube_api.delete_namespaced_pod(name=aux_pod, namespace=self.namespace)
except ApiException as exc:
self.log.warning(f"deleting pod {aux_pod} failed: {exc}")
pass
Expand All @@ -264,7 +264,7 @@ def run_pod_command(self, command, cmdname, input=None, wrangler=None):
command = ["/bin/sh", "-c", command]
has_input = bool(input)

resp = stream(self.kube_api.connect_get_namespaced_pod_exec, self.name, self.kube.namespace,
resp = stream(self.kube_api.connect_get_namespaced_pod_exec, self.name, self.namespace,
command=command,
stderr=True, stdin=has_input,
stdout=True, tty=False,
Expand Down
Loading
Loading