Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 10 additions & 65 deletions airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import logging
import math
import re
import shlex
import string
import warnings
from collections.abc import Container
Expand All @@ -35,8 +34,7 @@
import kubernetes
import tenacity
from deprecated import deprecated
from kubernetes.client import CoreV1Api, V1Pod, models as k8s
from kubernetes.stream import stream
from kubernetes.client import CoreV1Api, models as k8s
from urllib3.exceptions import HTTPError

from airflow.configuration import conf
Expand Down Expand Up @@ -80,6 +78,7 @@
PodPhase,
check_exception_is_kubernetes_api_unauthorized,
container_is_succeeded,
get_container_status,
get_container_termination_message,
)
from airflow.settings import pod_mutation_hook
Expand Down Expand Up @@ -233,8 +232,6 @@ class KubernetesPodOperator(BaseOperator):

# This field can be overloaded at the instance level via base_container_name
BASE_CONTAINER_NAME = "base"
ISTIO_CONTAINER_NAME = "istio-proxy"
KILL_ISTIO_PROXY_SUCCESS_MSG = "HTTP/1.1 200"
POD_CHECKED_KEY = "already_checked"
POST_TERMINATION_TIMEOUT = 120

Expand Down Expand Up @@ -631,10 +628,7 @@ def execute_sync(self, context: Context):
if self.do_xcom_push:
self.pod_manager.await_xcom_sidecar_container_start(pod=self.pod)
result = self.extract_xcom(pod=self.pod)
istio_enabled = self.is_istio_enabled(self.pod)
self.remote_pod = self.pod_manager.await_pod_completion(
self.pod, istio_enabled, self.base_container_name
)
self.remote_pod = self.pod_manager.await_pod_completion(self.pod, self.base_container_name)
finally:
pod_to_clean = self.pod or self.pod_request_obj
self.cleanup(
Expand Down Expand Up @@ -784,13 +778,10 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:
def _clean(self, event: dict[str, Any]) -> None:
if event["status"] == "running":
return
istio_enabled = self.is_istio_enabled(self.pod)
# Skip await_pod_completion when the event is 'timeout' due to the pod can hang
# on the ErrImagePull or ContainerCreating step and it will never complete
if event["status"] != "timeout":
self.pod = self.pod_manager.await_pod_completion(
self.pod, istio_enabled, self.base_container_name
)
self.pod = self.pod_manager.await_pod_completion(self.pod, self.base_container_name)
if self.pod is not None:
self.post_complete_action(
pod=self.pod,
Expand Down Expand Up @@ -842,30 +833,20 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
if self._killed or not remote_pod:
return

istio_enabled = self.is_istio_enabled(remote_pod)
pod_phase = remote_pod.status.phase if hasattr(remote_pod, "status") else None
# Only consider pod failed if the base container within the pod has not succeeded
failed = not container_is_succeeded(remote_pod, self.base_container_name)

# if the pod fails or success, but we don't want to delete it
if pod_phase != PodPhase.SUCCEEDED or self.on_finish_action == OnFinishAction.KEEP_POD:
if failed or self.on_finish_action == OnFinishAction.KEEP_POD:
self.patch_already_checked(remote_pod, reraise=False)

failed = (pod_phase != PodPhase.SUCCEEDED and not istio_enabled) or (
istio_enabled and not container_is_succeeded(remote_pod, self.base_container_name)
)

if failed:
if self.log_events_on_failure:
self._read_pod_events(pod, reraise=False)
if failed and self.log_events_on_failure:
self._read_pod_events(pod, reraise=False)

self.process_pod_deletion(remote_pod, reraise=False)

if self.skip_on_exit_code:
container_statuses = (
remote_pod.status.container_statuses if remote_pod and remote_pod.status else None
) or []
base_container_status = next(
(x for x in container_statuses if x.name == self.base_container_name), None
)
base_container_status = get_container_status(remote_pod, self.base_container_name)
exit_code = (
base_container_status.state.terminated.exit_code
if base_container_status
Expand Down Expand Up @@ -902,42 +883,6 @@ def _read_pod_events(self, pod, *, reraise=True) -> None:
else:
self.log.error("Pod Event: %s - %s", event.reason, event.message)

def is_istio_enabled(self, pod: V1Pod) -> bool:
"""Check if istio is enabled for the namespace of the pod by inspecting the namespace labels."""
if not pod:
return False

remote_pod = self.pod_manager.read_pod(pod)

return any(container.name == self.ISTIO_CONTAINER_NAME for container in remote_pod.spec.containers)

def kill_istio_sidecar(self, pod: V1Pod) -> None:
command = "/bin/sh -c 'curl -fsI -X POST http://localhost:15020/quitquitquit'"
command_to_container = shlex.split(command)
resp = stream(
self.client.connect_get_namespaced_pod_exec,
name=pod.metadata.name,
namespace=pod.metadata.namespace,
container=self.ISTIO_CONTAINER_NAME,
command=command_to_container,
stderr=True,
stdin=True,
stdout=True,
tty=False,
_preload_content=False,
)
output = []
while resp.is_open():
if resp.peek_stdout():
output.append(resp.read_stdout())

resp.close()
output_str = "".join(output)
self.log.info("Output of curl command to kill istio: %s", output_str)
resp.close()
if self.KILL_ISTIO_PROXY_SUCCESS_MSG not in output_str:
raise AirflowException("Error while deleting istio-proxy sidecar: %s", output_str)

def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True):
with _optionally_suppress(reraise=reraise):
if pod is not None:
Expand Down
8 changes: 3 additions & 5 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,13 +603,10 @@ def await_container_completion(self, pod: V1Pod, container_name: str) -> None:
self.log.info("Waiting for container '%s' state to be completed", container_name)
time.sleep(1)

def await_pod_completion(
self, pod: V1Pod, istio_enabled: bool = False, container_name: str = "base"
) -> V1Pod:
def await_pod_completion(self, pod: V1Pod, container_name: str = "base") -> V1Pod:
"""
Monitor a pod and return the final state.

:param istio_enabled: whether istio is enabled in the namespace
:param pod: pod spec that will be monitored
:param container_name: name of the container within the pod
:return: tuple[State, str | None]
Expand All @@ -618,7 +615,8 @@ def await_pod_completion(
remote_pod = self.read_pod(pod)
if remote_pod.status.phase in PodPhase.terminal_states:
break
if istio_enabled and container_is_completed(remote_pod, container_name):
if container_is_completed(remote_pod, container_name):
self.log.info("Base container %s has completed", container_name)
break
self.log.info("Pod %s has phase %s", pod.metadata.name, remote_pod.status.phase)
time.sleep(2)
Expand Down
37 changes: 17 additions & 20 deletions tests/providers/cncf/kubernetes/operators/test_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import pendulum
import pytest
from kubernetes.client import ApiClient, V1Pod, V1PodSecurityContext, V1PodStatus, models as k8s
from kubernetes.client import ApiClient, V1Pod, V1PodSecurityContext, models as k8s
from kubernetes.client.rest import ApiException
from urllib3 import HTTPResponse

Expand All @@ -44,7 +44,7 @@
)
from airflow.providers.cncf.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction, PodLoggingStatus, PodPhase
from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction, PodLoggingStatus
from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults
from airflow.utils import timezone
from airflow.utils.session import create_session
Expand Down Expand Up @@ -699,14 +699,12 @@ def test_termination_message_policy_default_value_correctly_set(self):
],
)
@patch(f"{POD_MANAGER_CLASS}.delete_pod")
@patch(f"{KPO_MODULE}.KubernetesPodOperator.is_istio_enabled")
@patch(f"{POD_MANAGER_CLASS}.await_pod_completion")
@patch(f"{KPO_MODULE}.KubernetesPodOperator.find_pod")
def test_pod_with_istio_delete_after_await_container_error(
def test_pod_with_sidecar_container_delete_after_await_container_error(
self,
find_pod_mock,
await_pod_completion_mock,
is_istio_enabled_mock,
delete_pod_mock,
task_kwargs,
base_container_fail,
Expand All @@ -716,13 +714,6 @@ def test_pod_with_istio_delete_after_await_container_error(
When KPO fails unexpectedly during await_container, we should still try to delete the pod,
and the pod we try to delete should be the one returned from find_pod earlier.
"""
sidecar = MagicMock()
sidecar.name = "istio-proxy"
sidecar.namespace = "default"
sidecar.image = "istio/proxyv2:1.18.2"
sidecar.args = []
sidecar.state.running = True

cont_status_1 = MagicMock()
cont_status_1.name = "base"
cont_status_1.state.running = False
Expand All @@ -732,20 +723,20 @@ def test_pod_with_istio_delete_after_await_container_error(
cont_status_1.state.terminated.message = "my-failure"

cont_status_2 = MagicMock()
cont_status_2.name = "istio-proxy"
cont_status_2.name = "sidecar"
cont_status_2.state.running = True
cont_status_2.state.terminated = False

await_pod_completion_mock.return_value.spec.containers = [sidecar, cont_status_1, cont_status_2]
await_pod_completion_mock.return_value.spec.containers = [cont_status_1, cont_status_2]
await_pod_completion_mock.return_value.status.phase = "Running"
await_pod_completion_mock.return_value.status.container_statuses = [cont_status_1, cont_status_2]
await_pod_completion_mock.return_value.metadata.name = "pod-with-istio-sidecar"
await_pod_completion_mock.return_value.metadata.name = "pod-with-sidecar"
await_pod_completion_mock.return_value.metadata.namespace = "default"

find_pod_mock.return_value.spec.containers = [sidecar, cont_status_1, cont_status_2]
find_pod_mock.return_value.spec.containers = [cont_status_1, cont_status_2]
find_pod_mock.return_value.status.phase = "Running"
find_pod_mock.return_value.status.container_statuses = [cont_status_1, cont_status_2]
find_pod_mock.return_value.metadata.name = "pod-with-istio-sidecar"
find_pod_mock.return_value.metadata.name = "pod-with-sidecar"
find_pod_mock.return_value.metadata.namespace = "default"

k = KubernetesPodOperator(task_id="task", **task_kwargs)
Expand All @@ -761,7 +752,6 @@ def test_pod_with_istio_delete_after_await_container_error(
k.execute(context=context)

if expect_to_delete_pod:
assert k.is_istio_enabled(find_pod_mock.return_value)
delete_pod_mock.assert_called_with(await_pod_completion_mock.return_value)
else:
delete_pod_mock.assert_not_called()
Expand Down Expand Up @@ -2069,10 +2059,17 @@ def test_async_write_logs_should_execute_successfully(
(False, r"Pod task-.* returned a failure.(?!\nremote_pod:)"),
],
)
def test_cleanup_log_pod_spec_on_failure(self, log_pod_spec_on_failure, expect_match):
@patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_succeeded")
def test_cleanup_log_pod_spec_on_failure(
self,
log_pod_spec_on_failure,
expect_match,
container_is_succeeded,
):
container_is_succeeded.return_value = False

k = KubernetesPodOperator(task_id="task", log_pod_spec_on_failure=log_pod_spec_on_failure)
pod = k.build_pod_request_obj(create_context(k))
pod.status = V1PodStatus(phase=PodPhase.FAILED)
with pytest.raises(AirflowException, match=expect_match):
k.cleanup(pod, pod)

Expand Down