|
19 | 19 | from __future__ import print_function
|
20 | 20 |
|
21 | 21 | import datetime
|
| 22 | +import json |
22 | 23 | import os
|
23 | 24 | import re
|
24 | 25 | import shutil
|
|
59 | 60 | from tfx.types import component_spec
|
60 | 61 | from tfx.types import standard_artifacts
|
61 | 62 | from tfx.types.standard_artifacts import Model
|
| 63 | +from tfx.utils import kube_utils |
62 | 64 |
|
63 | 65 |
|
| 66 | +# TODO(jiyongjung): Merge with kube_utils.PodStatus |
64 | 67 | # Various execution status of a KFP pipeline.
|
65 | 68 | KFP_RUNNING_STATUS = 'running'
|
66 | 69 | KFP_SUCCESS_STATUS = 'succeeded'
|
@@ -113,7 +116,7 @@ def poll_kfp_with_retry(host: Text, run_id: Text, retry_limit: int,
|
113 | 116 | # https://github.com/kubeflow/pipelines/issues/3669
|
114 | 117 | # by wait-and-retry when ApiException is hit.
|
115 | 118 | try:
|
116 |
| - get_run_response = client._run_api.get_run(run_id=run_id) # pylint: disable=protected-access |
| 119 | + get_run_response = client.get_run(run_id=run_id) |
117 | 120 | except rest.ApiException as api_err:
|
118 | 121 | # If get_run failed with ApiException, wait _POLLING_INTERVAL and retry.
|
119 | 122 | if retry_count < retry_limit:
|
@@ -144,6 +147,39 @@ def poll_kfp_with_retry(host: Text, run_id: Text, retry_limit: int,
|
144 | 147 | time.sleep(polling_interval)
|
145 | 148 |
|
146 | 149 |
|
| 150 | +def print_failure_log_for_run(host: Text, run_id: Text, namespace: Text): |
| 151 | + """Prints logs of failed components of a run. |
| 152 | +
|
| 153 | + Prints execution logs for failed componentsusing `logging.info`. |
| 154 | + This resembles the behavior of `argo logs` but uses K8s API directly. |
| 155 | + Don't print anything if the run was successful. |
| 156 | +
|
| 157 | + Args: |
| 158 | + host: address of the KFP deployment. |
| 159 | + run_id: id of the execution of the pipeline. |
| 160 | + namespace: namespace of K8s cluster. |
| 161 | + """ |
| 162 | + client = kfp.Client(host=host) |
| 163 | + run = client.get_run(run_id=run_id) |
| 164 | + workflow_manifest = json.loads(run.pipeline_runtime.workflow_manifest) |
| 165 | + if kube_utils.PodPhase( |
| 166 | + workflow_manifest['status']['phase']) != kube_utils.PodPhase.FAILED: |
| 167 | + return |
| 168 | + |
| 169 | + k8s_client = kube_utils.make_core_v1_api() |
| 170 | + pods = [i for i in workflow_manifest['status']['nodes'] if i['type'] == 'Pod'] |
| 171 | + for pod in pods: |
| 172 | + if kube_utils.PodPhase(pod['phase']) != kube_utils.PodPhase.FAILED: |
| 173 | + continue |
| 174 | + display_name = pod['displayName'] |
| 175 | + pod_id = pod['id'] |
| 176 | + |
| 177 | + log = k8s_client.read_namespaced_pod_log( |
| 178 | + pod_id, namespace=namespace, container='main') |
| 179 | + for line in log.splitlines(): |
| 180 | + logging.info('%s:%s', display_name, line) |
| 181 | + |
| 182 | + |
147 | 183 | # Custom component definitions for testing purpose.
|
148 | 184 | class _HelloWorldSpec(component_spec.ComponentSpec):
|
149 | 185 | INPUTS = {}
|
|
0 commit comments