diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 0cb08033b4d78..4742d3a1538ca 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -23,7 +23,6 @@ import logging import math import re -import shlex import string import warnings from collections.abc import Container @@ -35,8 +34,7 @@ import kubernetes import tenacity from deprecated import deprecated -from kubernetes.client import CoreV1Api, V1Pod, models as k8s -from kubernetes.stream import stream +from kubernetes.client import CoreV1Api, models as k8s from urllib3.exceptions import HTTPError from airflow.configuration import conf @@ -80,6 +78,7 @@ PodPhase, check_exception_is_kubernetes_api_unauthorized, container_is_succeeded, + get_container_status, get_container_termination_message, ) from airflow.settings import pod_mutation_hook @@ -233,8 +232,6 @@ class KubernetesPodOperator(BaseOperator): # This field can be overloaded at the instance level via base_container_name BASE_CONTAINER_NAME = "base" - ISTIO_CONTAINER_NAME = "istio-proxy" - KILL_ISTIO_PROXY_SUCCESS_MSG = "HTTP/1.1 200" POD_CHECKED_KEY = "already_checked" POST_TERMINATION_TIMEOUT = 120 @@ -631,10 +628,7 @@ def execute_sync(self, context: Context): if self.do_xcom_push: self.pod_manager.await_xcom_sidecar_container_start(pod=self.pod) result = self.extract_xcom(pod=self.pod) - istio_enabled = self.is_istio_enabled(self.pod) - self.remote_pod = self.pod_manager.await_pod_completion( - self.pod, istio_enabled, self.base_container_name - ) + self.remote_pod = self.pod_manager.await_pod_completion(self.pod, self.base_container_name) finally: pod_to_clean = self.pod or self.pod_request_obj self.cleanup( @@ -784,13 +778,10 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: def _clean(self, event: dict[str, Any]) -> None: if event["status"] == "running": return - istio_enabled = self.is_istio_enabled(self.pod) # Skip await_pod_completion when the event is 'timeout' due to the pod can hang # on the ErrImagePull or ContainerCreating step and it will never complete if event["status"] != "timeout": - self.pod = self.pod_manager.await_pod_completion( - self.pod, istio_enabled, self.base_container_name - ) + self.pod = self.pod_manager.await_pod_completion(self.pod, self.base_container_name) if self.pod is not None: self.post_complete_action( pod=self.pod, @@ -842,30 +833,20 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): if self._killed or not remote_pod: return - istio_enabled = self.is_istio_enabled(remote_pod) - pod_phase = remote_pod.status.phase if hasattr(remote_pod, "status") else None + # Only consider pod failed if the base container within the pod has not succeeded + failed = not container_is_succeeded(remote_pod, self.base_container_name) # if the pod fails or success, but we don't want to delete it - if pod_phase != PodPhase.SUCCEEDED or self.on_finish_action == OnFinishAction.KEEP_POD: + if failed or self.on_finish_action == OnFinishAction.KEEP_POD: self.patch_already_checked(remote_pod, reraise=False) - failed = (pod_phase != PodPhase.SUCCEEDED and not istio_enabled) or ( - istio_enabled and not container_is_succeeded(remote_pod, self.base_container_name) - ) - - if failed: - if self.log_events_on_failure: - self._read_pod_events(pod, reraise=False) + if failed and self.log_events_on_failure: + self._read_pod_events(pod, reraise=False) self.process_pod_deletion(remote_pod, reraise=False) if self.skip_on_exit_code: - container_statuses = ( - remote_pod.status.container_statuses if remote_pod and remote_pod.status else None - ) or [] - base_container_status = next( - (x for x in container_statuses if x.name == self.base_container_name), None - ) + base_container_status = get_container_status(remote_pod, self.base_container_name) exit_code = ( base_container_status.state.terminated.exit_code if base_container_status @@ -902,42 +883,6 @@ def _read_pod_events(self, pod, *, reraise=True) -> None: else: self.log.error("Pod Event: %s - %s", event.reason, event.message) - def is_istio_enabled(self, pod: V1Pod) -> bool: - """Check if istio is enabled for the namespace of the pod by inspecting the namespace labels.""" - if not pod: - return False - - remote_pod = self.pod_manager.read_pod(pod) - - return any(container.name == self.ISTIO_CONTAINER_NAME for container in remote_pod.spec.containers) - - def kill_istio_sidecar(self, pod: V1Pod) -> None: - command = "/bin/sh -c 'curl -fsI -X POST http://localhost:15020/quitquitquit'" - command_to_container = shlex.split(command) - resp = stream( - self.client.connect_get_namespaced_pod_exec, - name=pod.metadata.name, - namespace=pod.metadata.namespace, - container=self.ISTIO_CONTAINER_NAME, - command=command_to_container, - stderr=True, - stdin=True, - stdout=True, - tty=False, - _preload_content=False, - ) - output = [] - while resp.is_open(): - if resp.peek_stdout(): - output.append(resp.read_stdout()) - - resp.close() - output_str = "".join(output) - self.log.info("Output of curl command to kill istio: %s", output_str) - resp.close() - if self.KILL_ISTIO_PROXY_SUCCESS_MSG not in output_str: - raise AirflowException("Error while deleting istio-proxy sidecar: %s", output_str) - def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True): with _optionally_suppress(reraise=reraise): if pod is not None: diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 5668462ad6df4..033da88bf90c3 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -603,13 +603,10 @@ def await_container_completion(self, pod: V1Pod, container_name: str) -> None: self.log.info("Waiting for container '%s' state to be completed", container_name) time.sleep(1) - def await_pod_completion( - self, pod: V1Pod, istio_enabled: bool = False, container_name: str = "base" - ) -> V1Pod: + def await_pod_completion(self, pod: V1Pod, container_name: str = "base") -> V1Pod: """ Monitor a pod and return the final state. - :param istio_enabled: whether istio is enabled in the namespace :param pod: pod spec that will be monitored :param container_name: name of the container within the pod :return: tuple[State, str | None] @@ -618,7 +615,8 @@ def await_pod_completion( remote_pod = self.read_pod(pod) if remote_pod.status.phase in PodPhase.terminal_states: break - if istio_enabled and container_is_completed(remote_pod, container_name): + if container_is_completed(remote_pod, container_name): + self.log.info("Base container %s has completed", container_name) break self.log.info("Pod %s has phase %s", pod.metadata.name, remote_pod.status.phase) time.sleep(2) diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index 8b7c238e9c143..9cf96e5e21c42 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -24,7 +24,7 @@ import pendulum import pytest -from kubernetes.client import ApiClient, V1Pod, V1PodSecurityContext, V1PodStatus, models as k8s +from kubernetes.client import ApiClient, V1Pod, V1PodSecurityContext, models as k8s from kubernetes.client.rest import ApiException from urllib3 import HTTPResponse @@ -44,7 +44,7 @@ ) from airflow.providers.cncf.kubernetes.secret import Secret from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger -from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction, PodLoggingStatus, PodPhase +from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction, PodLoggingStatus from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults from airflow.utils import timezone from airflow.utils.session import create_session @@ -699,14 +699,12 @@ def test_termination_message_policy_default_value_correctly_set(self): ], ) @patch(f"{POD_MANAGER_CLASS}.delete_pod") - @patch(f"{KPO_MODULE}.KubernetesPodOperator.is_istio_enabled") @patch(f"{POD_MANAGER_CLASS}.await_pod_completion") @patch(f"{KPO_MODULE}.KubernetesPodOperator.find_pod") - def test_pod_with_istio_delete_after_await_container_error( + def test_pod_with_sidecar_container_delete_after_await_container_error( self, find_pod_mock, await_pod_completion_mock, - is_istio_enabled_mock, delete_pod_mock, task_kwargs, base_container_fail, @@ -716,13 +714,6 @@ def test_pod_with_istio_delete_after_await_container_error( When KPO fails unexpectedly during await_container, we should still try to delete the pod, and the pod we try to delete should be the one returned from find_pod earlier. """ - sidecar = MagicMock() - sidecar.name = "istio-proxy" - sidecar.namespace = "default" - sidecar.image = "istio/proxyv2:1.18.2" - sidecar.args = [] - sidecar.state.running = True - cont_status_1 = MagicMock() cont_status_1.name = "base" cont_status_1.state.running = False @@ -732,20 +723,20 @@ def test_pod_with_istio_delete_after_await_container_error( cont_status_1.state.terminated.message = "my-failure" cont_status_2 = MagicMock() - cont_status_2.name = "istio-proxy" + cont_status_2.name = "sidecar" cont_status_2.state.running = True cont_status_2.state.terminated = False - await_pod_completion_mock.return_value.spec.containers = [sidecar, cont_status_1, cont_status_2] + await_pod_completion_mock.return_value.spec.containers = [cont_status_1, cont_status_2] await_pod_completion_mock.return_value.status.phase = "Running" await_pod_completion_mock.return_value.status.container_statuses = [cont_status_1, cont_status_2] - await_pod_completion_mock.return_value.metadata.name = "pod-with-istio-sidecar" + await_pod_completion_mock.return_value.metadata.name = "pod-with-sidecar" await_pod_completion_mock.return_value.metadata.namespace = "default" - find_pod_mock.return_value.spec.containers = [sidecar, cont_status_1, cont_status_2] + find_pod_mock.return_value.spec.containers = [cont_status_1, cont_status_2] find_pod_mock.return_value.status.phase = "Running" find_pod_mock.return_value.status.container_statuses = [cont_status_1, cont_status_2] - find_pod_mock.return_value.metadata.name = "pod-with-istio-sidecar" + find_pod_mock.return_value.metadata.name = "pod-with-sidecar" find_pod_mock.return_value.metadata.namespace = "default" k = KubernetesPodOperator(task_id="task", **task_kwargs) @@ -761,7 +752,6 @@ def test_pod_with_istio_delete_after_await_container_error( k.execute(context=context) if expect_to_delete_pod: - assert k.is_istio_enabled(find_pod_mock.return_value) delete_pod_mock.assert_called_with(await_pod_completion_mock.return_value) else: delete_pod_mock.assert_not_called() @@ -2069,10 +2059,17 @@ def test_async_write_logs_should_execute_successfully( (False, r"Pod task-.* returned a failure.(?!\nremote_pod:)"), ], ) - def test_cleanup_log_pod_spec_on_failure(self, log_pod_spec_on_failure, expect_match): + @patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_succeeded") + def test_cleanup_log_pod_spec_on_failure( + self, + log_pod_spec_on_failure, + expect_match, + container_is_succeeded, + ): + container_is_succeeded.return_value = False + k = KubernetesPodOperator(task_id="task", log_pod_spec_on_failure=log_pod_spec_on_failure) pod = k.build_pod_request_obj(create_context(k)) - pod.status = V1PodStatus(phase=PodPhase.FAILED) with pytest.raises(AirflowException, match=expect_match): k.cleanup(pod, pod)