Skip to content

Commit 46520b1

Browse files
AutomationDev85AutomationDev85
AutomationDev85
authored and
AutomationDev85
committed
KubernetesPodOperator uses different timeouts to check for schedule timeout and startup timeout
1 parent f3acdd2 commit 46520b1

File tree

3 files changed

+85
-21
lines changed

3 files changed

+85
-21
lines changed

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,9 @@ class KubernetesPodOperator(BaseOperator):
157157
:param reattach_on_restart: if the worker dies while the pod is running, reattach and monitor
158158
during the next try. If False, always create a new pod for each try.
159159
:param labels: labels to apply to the Pod. (templated)
160-
:param startup_timeout_seconds: timeout in seconds to startup the pod.
160+
:param startup_timeout_seconds: timeout in seconds to startup the pod after pod was scheduled.
161161
:param startup_check_interval_seconds: interval in seconds to check if the pod has already started
162+
:param schedule_timeout_seconds: timeout in seconds to schedule pod in cluster.
162163
:param get_logs: get the stdout of the base container as logs of the tasks.
163164
:param init_container_logs: list of init containers whose logs will be published to stdout
164165
Takes a sequence of containers, a single container name or True. If True,
@@ -289,6 +290,7 @@ def __init__(
289290
reattach_on_restart: bool = True,
290291
startup_timeout_seconds: int = 120,
291292
startup_check_interval_seconds: int = 5,
293+
schedule_timeout_seconds: int | None = None,
292294
get_logs: bool = True,
293295
base_container_name: str | None = None,
294296
base_container_status_polling_interval: float = 1,
@@ -347,6 +349,10 @@ def __init__(
347349
self.labels = labels or {}
348350
self.startup_timeout_seconds = startup_timeout_seconds
349351
self.startup_check_interval_seconds = startup_check_interval_seconds
352+
# New parameter startup_timeout_seconds adds breaking change, to handle this as smooth as possible just reuse startup time
353+
self.schedule_timeout_seconds = startup_timeout_seconds
354+
if schedule_timeout_seconds:
355+
self.schedule_timeout_seconds = schedule_timeout_seconds
350356
env_vars = convert_env_vars(env_vars) if env_vars else []
351357
self.env_vars = env_vars
352358
pod_runtime_info_envs = (
@@ -574,8 +580,9 @@ def await_pod_start(self, pod: k8s.V1Pod) -> None:
574580
try:
575581
self.pod_manager.await_pod_start(
576582
pod=pod,
583+
schedule_timeout=self.schedule_timeout_seconds,
577584
startup_timeout=self.startup_timeout_seconds,
578-
startup_check_interval=self.startup_check_interval_seconds,
585+
check_interval=self.startup_check_interval_seconds,
579586
)
580587
except PodLaunchFailedException:
581588
if self.log_events_on_failure:

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
from kubernetes.client.models.core_v1_event_list import CoreV1EventList
4949
from kubernetes.client.models.v1_container_status import V1ContainerStatus
5050
from kubernetes.client.models.v1_pod import V1Pod
51+
from kubernetes.client.models.v1_pod_condition import V1PodCondition
5152
from urllib3.response import HTTPResponse
5253

5354

@@ -375,30 +376,54 @@ def create_pod(self, pod: V1Pod) -> V1Pod:
375376
return self.run_pod_async(pod)
376377

377378
def await_pod_start(
378-
self, pod: V1Pod, startup_timeout: int = 120, startup_check_interval: int = 1
379+
self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int = 120, check_interval: int = 1
379380
) -> None:
380381
"""
381382
Wait for the pod to reach phase other than ``Pending``.
382383
383384
:param pod:
385+
:param schedule_timeout: Timeout (in seconds) for pod stay in schedule state
386+
(if pod is taking to long in schedule state, fails task)
384387
:param startup_timeout: Timeout (in seconds) for startup of the pod
385-
(if pod is pending for too long, fails task)
386-
:param startup_check_interval: Interval (in seconds) between checks
388+
(if pod is pending for too long after being scheduled, fails task)
389+
:param check_interval: Interval (in seconds) between checks
387390
:return:
388391
"""
392+
self.log.info("::group::Waiting until %ss to get the POD scheduled...", schedule_timeout)
393+
pod_was_scheduled = False
389394
curr_time = time.time()
390395
while True:
391396
remote_pod = self.read_pod(pod)
392-
if remote_pod.status.phase != PodPhase.PENDING:
397+
pod_status = remote_pod.status
398+
if pod_status.phase != PodPhase.PENDING:
399+
self.keep_watching_for_events = False
400+
self.log.info("::endgroup::")
393401
break
394-
self.log.warning("Pod not yet started: %s", pod.metadata.name)
395-
if time.time() - curr_time >= startup_timeout:
396-
msg = (
397-
f"Pod took longer than {startup_timeout} seconds to start. "
398-
"Check the pod events in kubernetes to determine why."
399-
)
400-
raise PodLaunchFailedException(msg)
401-
time.sleep(startup_check_interval)
402+
403+
# Check for timeout
404+
pod_conditions: list[V1PodCondition] = pod_status.conditions
405+
if pod_conditions and any(
406+
(condition.type == "PodScheduled" and condition.status == "True")
407+
for condition in pod_conditions
408+
):
409+
if not pod_was_scheduled:
410+
# POD was initially scheduled update timeout for getting POD launched
411+
pod_was_scheduled = True
412+
self.log.info("Waiting %ss to get the POD running...", startup_timeout)
413+
414+
if time.time() - curr_time >= startup_timeout:
415+
self.log.info("::endgroup::")
416+
raise PodLaunchFailedException(
417+
f"Pod took too long to start. More than {startup_timeout}s. Check the pod events in kubernetes."
418+
)
419+
else:
420+
if time.time() - curr_time >= schedule_timeout:
421+
self.log.info("::endgroup::")
422+
raise PodLaunchFailedException(
423+
f"Pod took too long to be scheduled on the cluster, giving up. More than {schedule_timeout}s. Check the pod events in kubernetes."
424+
)
425+
426+
time.sleep(check_interval)
402427

403428
def fetch_container_logs(
404429
self,

providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -392,39 +392,71 @@ def test_start_pod_retries_three_times(self, mock_run_pod_async):
392392

393393
assert mock_run_pod_async.call_count == 3
394394

395-
def test_start_pod_raises_informative_error_on_timeout(self):
395+
def test_start_pod_raises_informative_error_on_scheduled_timeout(self):
396396
pod_response = mock.MagicMock()
397397
pod_response.status.phase = "Pending"
398398
self.mock_kube_client.read_namespaced_pod.return_value = pod_response
399-
expected_msg = "Check the pod events in kubernetes"
399+
expected_msg = "Pod took too long to be scheduled on the cluster, giving up. More than 0s. Check the pod events in kubernetes."
400400
mock_pod = MagicMock()
401401
with pytest.raises(AirflowException, match=expected_msg):
402402
self.pod_manager.await_pod_start(
403403
pod=mock_pod,
404+
schedule_timeout=0,
405+
startup_timeout=0,
406+
)
407+
408+
def test_start_pod_raises_informative_error_on_startup_timeout(self):
409+
pod_response = mock.MagicMock()
410+
pod_response.status.phase = "Pending"
411+
condition = mock.MagicMock()
412+
condition.type = "PodScheduled"
413+
condition.status = "True"
414+
pod_response.status.conditions = [condition]
415+
416+
self.mock_kube_client.read_namespaced_pod.return_value = pod_response
417+
expected_msg = "Pod took too long to start. More than 0s. Check the pod events in kubernetes."
418+
mock_pod = MagicMock()
419+
with pytest.raises(AirflowException, match=expected_msg):
420+
self.pod_manager.await_pod_start(
421+
pod=mock_pod,
422+
schedule_timeout=0,
404423
startup_timeout=0,
405424
)
406425

407426
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.time.sleep")
408-
def test_start_pod_startup_interval_seconds(self, mock_time_sleep):
427+
def test_start_pod_startup_interval_seconds(self, mock_time_sleep, caplog):
428+
condition_scheduled = mock.MagicMock()
429+
condition_scheduled.type = "PodScheduled"
430+
condition_scheduled.status = "True"
431+
409432
pod_info_pending = mock.MagicMock(**{"status.phase": PodPhase.PENDING})
433+
pod_info_pending_scheduled = mock.MagicMock(
434+
**{"status.phase": PodPhase.PENDING, "status.conditions": [condition_scheduled]}
435+
)
410436
pod_info_succeeded = mock.MagicMock(**{"status.phase": PodPhase.SUCCEEDED})
411437

412438
def pod_state_gen():
413439
yield pod_info_pending
414-
yield pod_info_pending
440+
yield pod_info_pending_scheduled
441+
yield pod_info_pending_scheduled
415442
while True:
416443
yield pod_info_succeeded
417444

418445
self.mock_kube_client.read_namespaced_pod.side_effect = pod_state_gen()
419446
startup_check_interval = 10 # Any value is fine, as time.sleep is mocked to do nothing
447+
schedule_timeout = 30
448+
startup_timeout = 60
420449
mock_pod = MagicMock()
421450
self.pod_manager.await_pod_start(
422451
pod=mock_pod,
423-
startup_timeout=60, # Never hit, any value is fine, as time.sleep is mocked to do nothing
424-
startup_check_interval=startup_check_interval,
452+
schedule_timeout=schedule_timeout, # Never hit, any value is fine, as time.sleep is mocked to do nothing
453+
startup_timeout=startup_timeout, # Never hit, any value is fine, as time.sleep is mocked to do nothing
454+
check_interval=startup_check_interval,
425455
)
426456
mock_time_sleep.assert_called_with(startup_check_interval)
427-
assert mock_time_sleep.call_count == 2
457+
assert mock_time_sleep.call_count == 3
458+
assert f"::group::Waiting until {schedule_timeout}s to get the POD scheduled..." in caplog.text
459+
assert f"Waiting {startup_timeout}s to get the POD running..." in caplog.text
428460

429461
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running")
430462
def test_container_is_running(self, container_is_running_mock):

0 commit comments

Comments
 (0)