Skip to content

Commit fe4912b

Browse files
authored
Merge pull request #2362 from dhruvesh09/r0.23.0
Wait indefinitely for pod to complete in Kubernetes launcher.
2 parents ed7640c + 588b02c commit fe4912b

File tree

1 file changed

+11
-6
lines changed

1 file changed

+11
-6
lines changed

tfx/orchestration/launcher/kubernetes_component_launcher.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,13 +138,15 @@ def _run_executor(self, execution_id: int,
138138
'Failed to created container executor pod!\nReason: %s\nBody: %s' %
139139
(e.reason, e.body))
140140

141+
# Wait up to 300 seconds for the pod to move from pending to another status.
141142
logging.info('Waiting for pod "%s:%s" to start.', namespace, pod_name)
142143
self._wait_pod(
143144
core_api,
144145
pod_name,
145146
namespace,
146147
exit_condition_lambda=_pod_is_not_pending,
147-
condition_description='non-pending status')
148+
condition_description='non-pending status',
149+
timeout_sec=300)
148150

149151
logging.info('Start log streaming for pod "%s:%s".', namespace, pod_name)
150152
try:
@@ -162,12 +164,14 @@ def _run_executor(self, execution_id: int,
162164
for log in logs:
163165
logging.info(log.decode().rstrip('\n'))
164166

167+
# Wait indefinitely for the pod to complete.
165168
resp = self._wait_pod(
166169
core_api,
167170
pod_name,
168171
namespace,
169172
exit_condition_lambda=_pod_is_done,
170-
condition_description='done state')
173+
condition_description='done state',
174+
timeout_sec=0)
171175

172176
if resp.status.phase == kube_utils.PodPhase.FAILED.value:
173177
raise RuntimeError('Pod "%s:%s" failed with status "%s".' %
@@ -254,7 +258,7 @@ def _wait_pod(self,
254258
namespace: Text,
255259
exit_condition_lambda: Callable[[client.V1Pod], bool],
256260
condition_description: Text,
257-
timeout_sec: int = 300) -> client.V1Pod:
261+
timeout_sec: int) -> client.V1Pod:
258262
"""Wait for a POD to meet an exit condition.
259263
260264
Args:
@@ -265,7 +269,8 @@ def _wait_pod(self,
265269
for a POD to exit. The function returns True to exit.
266270
condition_description: The description of the exit condition which will be
267271
set in the error message if the wait times out.
268-
timeout_sec: The seconds for the function to wait. Defaults to 300s.
272+
timeout_sec: The seconds for the function to wait. Waits indefinitely if
273+
value is 0.
269274
270275
Returns:
271276
The POD object which meets the exit condition.
@@ -279,8 +284,8 @@ def _wait_pod(self,
279284
logging.info(resp.status.phase)
280285
if exit_condition_lambda(resp):
281286
return resp
282-
elapse_time = datetime.datetime.utcnow() - start_time
283-
if elapse_time.seconds >= timeout_sec:
287+
elapsed_time = datetime.datetime.utcnow() - start_time
288+
if timeout_sec != 0 and elapsed_time.seconds >= timeout_sec:
284289
raise RuntimeError(
285290
'Pod "%s:%s" does not reach "%s" within %s seconds.' %
286291
(namespace, pod_name, condition_description, timeout_sec))

0 commit comments

Comments
 (0)