From f6ea80a2e96abdd4853eb726d245935e84806c64 Mon Sep 17 00:00:00 2001 From: Jonathan Ostrander Date: Fri, 17 May 2024 14:53:32 -0400 Subject: [PATCH 1/4] fix: delete pod when base container completes Fixes https://github.com/apache/airflow/issues/39693. Special logic was added to `KubernetesPodOperator`'s lifecycle to handle the case where an istio proxy sidecar is running and preventing the pod from completing, but this logic should have been applied more generally to handle when multiple containers are ran in the pod. The new behavior considers the pod completed if the container matching the base name has completed. --- .../cncf/kubernetes/operators/pod.py | 50 ++----------------- .../cncf/kubernetes/utils/pod_manager.py | 6 +-- .../cncf/kubernetes/operators/test_pod.py | 22 +++----- 3 files changed, 13 insertions(+), 65 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 0cb08033b4d78..56532130ff402 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -233,8 +233,6 @@ class KubernetesPodOperator(BaseOperator): # This field can be overloaded at the instance level via base_container_name BASE_CONTAINER_NAME = "base" - ISTIO_CONTAINER_NAME = "istio-proxy" - KILL_ISTIO_PROXY_SUCCESS_MSG = "HTTP/1.1 200" POD_CHECKED_KEY = "already_checked" POST_TERMINATION_TIMEOUT = 120 @@ -631,9 +629,8 @@ def execute_sync(self, context: Context): if self.do_xcom_push: self.pod_manager.await_xcom_sidecar_container_start(pod=self.pod) result = self.extract_xcom(pod=self.pod) - istio_enabled = self.is_istio_enabled(self.pod) self.remote_pod = self.pod_manager.await_pod_completion( - self.pod, istio_enabled, self.base_container_name + self.pod, self.base_container_name ) finally: pod_to_clean = self.pod or self.pod_request_obj @@ -784,12 +781,11 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: def _clean(self, event: dict[str, Any]) -> None: if event["status"] == "running": return - istio_enabled = self.is_istio_enabled(self.pod) # Skip await_pod_completion when the event is 'timeout' due to the pod can hang # on the ErrImagePull or ContainerCreating step and it will never complete if event["status"] != "timeout": self.pod = self.pod_manager.await_pod_completion( - self.pod, istio_enabled, self.base_container_name + self.pod, self.base_container_name ) if self.pod is not None: self.post_complete_action( @@ -842,16 +838,14 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): if self._killed or not remote_pod: return - istio_enabled = self.is_istio_enabled(remote_pod) pod_phase = remote_pod.status.phase if hasattr(remote_pod, "status") else None # if the pod fails or success, but we don't want to delete it if pod_phase != PodPhase.SUCCEEDED or self.on_finish_action == OnFinishAction.KEEP_POD: self.patch_already_checked(remote_pod, reraise=False) - failed = (pod_phase != PodPhase.SUCCEEDED and not istio_enabled) or ( - istio_enabled and not container_is_succeeded(remote_pod, self.base_container_name) - ) + # Consider pod failed if the pod has not succeeded and the base container within the pod has not succeeded + failed = pod_phase != PodPhase.SUCCEEDED and not container_is_succeeded(remote_pod, self.base_container_name) if failed: if self.log_events_on_failure: @@ -902,42 +896,6 @@ def _read_pod_events(self, pod, *, reraise=True) -> None: else: self.log.error("Pod Event: %s - %s", event.reason, event.message) - def is_istio_enabled(self, pod: V1Pod) -> bool: - """Check if istio is enabled for the namespace of the pod by inspecting the namespace labels.""" - if not pod: - return False - - remote_pod = self.pod_manager.read_pod(pod) - - return any(container.name == self.ISTIO_CONTAINER_NAME for container in remote_pod.spec.containers) - - def kill_istio_sidecar(self, pod: V1Pod) -> None: - command = "/bin/sh -c 'curl -fsI -X POST http://localhost:15020/quitquitquit'" - command_to_container = shlex.split(command) - resp = stream( - self.client.connect_get_namespaced_pod_exec, - name=pod.metadata.name, - namespace=pod.metadata.namespace, - container=self.ISTIO_CONTAINER_NAME, - command=command_to_container, - stderr=True, - stdin=True, - stdout=True, - tty=False, - _preload_content=False, - ) - output = [] - while resp.is_open(): - if resp.peek_stdout(): - output.append(resp.read_stdout()) - - resp.close() - output_str = "".join(output) - self.log.info("Output of curl command to kill istio: %s", output_str) - resp.close() - if self.KILL_ISTIO_PROXY_SUCCESS_MSG not in output_str: - raise AirflowException("Error while deleting istio-proxy sidecar: %s", output_str) - def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True): with _optionally_suppress(reraise=reraise): if pod is not None: diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 5668462ad6df4..e3246635c04bf 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -604,12 +604,11 @@ def await_container_completion(self, pod: V1Pod, container_name: str) -> None: time.sleep(1) def await_pod_completion( - self, pod: V1Pod, istio_enabled: bool = False, container_name: str = "base" + self, pod: V1Pod, container_name: str = "base" ) -> V1Pod: """ Monitor a pod and return the final state. - :param istio_enabled: whether istio is enabled in the namespace :param pod: pod spec that will be monitored :param container_name: name of the container within the pod :return: tuple[State, str | None] @@ -618,7 +617,8 @@ def await_pod_completion( remote_pod = self.read_pod(pod) if remote_pod.status.phase in PodPhase.terminal_states: break - if istio_enabled and container_is_completed(remote_pod, container_name): + if container_is_completed(remote_pod, container_name): + self.log.info("Base container %s has completed", container_name) break self.log.info("Pod %s has phase %s", pod.metadata.name, remote_pod.status.phase) time.sleep(2) diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index 8b7c238e9c143..af4a44c673f83 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -699,14 +699,12 @@ def test_termination_message_policy_default_value_correctly_set(self): ], ) @patch(f"{POD_MANAGER_CLASS}.delete_pod") - @patch(f"{KPO_MODULE}.KubernetesPodOperator.is_istio_enabled") @patch(f"{POD_MANAGER_CLASS}.await_pod_completion") @patch(f"{KPO_MODULE}.KubernetesPodOperator.find_pod") - def test_pod_with_istio_delete_after_await_container_error( + def test_pod_with_sidecar_container_delete_after_await_container_error( self, find_pod_mock, await_pod_completion_mock, - is_istio_enabled_mock, delete_pod_mock, task_kwargs, base_container_fail, @@ -716,13 +714,6 @@ def test_pod_with_istio_delete_after_await_container_error( When KPO fails unexpectedly during await_container, we should still try to delete the pod, and the pod we try to delete should be the one returned from find_pod earlier. """ - sidecar = MagicMock() - sidecar.name = "istio-proxy" - sidecar.namespace = "default" - sidecar.image = "istio/proxyv2:1.18.2" - sidecar.args = [] - sidecar.state.running = True - cont_status_1 = MagicMock() cont_status_1.name = "base" cont_status_1.state.running = False @@ -732,20 +723,20 @@ def test_pod_with_istio_delete_after_await_container_error( cont_status_1.state.terminated.message = "my-failure" cont_status_2 = MagicMock() - cont_status_2.name = "istio-proxy" + cont_status_2.name = "sidecar" cont_status_2.state.running = True cont_status_2.state.terminated = False - await_pod_completion_mock.return_value.spec.containers = [sidecar, cont_status_1, cont_status_2] + await_pod_completion_mock.return_value.spec.containers = [cont_status_1, cont_status_2] await_pod_completion_mock.return_value.status.phase = "Running" await_pod_completion_mock.return_value.status.container_statuses = [cont_status_1, cont_status_2] - await_pod_completion_mock.return_value.metadata.name = "pod-with-istio-sidecar" + await_pod_completion_mock.return_value.metadata.name = "pod-with-sidecar" await_pod_completion_mock.return_value.metadata.namespace = "default" - find_pod_mock.return_value.spec.containers = [sidecar, cont_status_1, cont_status_2] + find_pod_mock.return_value.spec.containers = [cont_status_1, cont_status_2] find_pod_mock.return_value.status.phase = "Running" find_pod_mock.return_value.status.container_statuses = [cont_status_1, cont_status_2] - find_pod_mock.return_value.metadata.name = "pod-with-istio-sidecar" + find_pod_mock.return_value.metadata.name = "pod-with-sidecar" find_pod_mock.return_value.metadata.namespace = "default" k = KubernetesPodOperator(task_id="task", **task_kwargs) @@ -761,7 +752,6 @@ def test_pod_with_istio_delete_after_await_container_error( k.execute(context=context) if expect_to_delete_pod: - assert k.is_istio_enabled(find_pod_mock.return_value) delete_pod_mock.assert_called_with(await_pod_completion_mock.return_value) else: delete_pod_mock.assert_not_called() From 26e734e76a7ceed67f782dfcbbd38bf9119746e0 Mon Sep 17 00:00:00 2001 From: Jonathan Ostrander Date: Tue, 21 May 2024 11:00:05 -0400 Subject: [PATCH 2/4] fix: pre-commit hooks --- .../providers/cncf/kubernetes/operators/pod.py | 16 ++++++---------- .../cncf/kubernetes/utils/pod_manager.py | 4 +--- .../auth_manager/cli_commands/user_command.py | 8 +++----- 3 files changed, 10 insertions(+), 18 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 56532130ff402..d892ef701162c 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -23,7 +23,6 @@ import logging import math import re -import shlex import string import warnings from collections.abc import Container @@ -35,8 +34,7 @@ import kubernetes import tenacity from deprecated import deprecated -from kubernetes.client import CoreV1Api, V1Pod, models as k8s -from kubernetes.stream import stream +from kubernetes.client import CoreV1Api, models as k8s from urllib3.exceptions import HTTPError from airflow.configuration import conf @@ -629,9 +627,7 @@ def execute_sync(self, context: Context): if self.do_xcom_push: self.pod_manager.await_xcom_sidecar_container_start(pod=self.pod) result = self.extract_xcom(pod=self.pod) - self.remote_pod = self.pod_manager.await_pod_completion( - self.pod, self.base_container_name - ) + self.remote_pod = self.pod_manager.await_pod_completion(self.pod, self.base_container_name) finally: pod_to_clean = self.pod or self.pod_request_obj self.cleanup( @@ -784,9 +780,7 @@ def _clean(self, event: dict[str, Any]) -> None: # Skip await_pod_completion when the event is 'timeout' due to the pod can hang # on the ErrImagePull or ContainerCreating step and it will never complete if event["status"] != "timeout": - self.pod = self.pod_manager.await_pod_completion( - self.pod, self.base_container_name - ) + self.pod = self.pod_manager.await_pod_completion(self.pod, self.base_container_name) if self.pod is not None: self.post_complete_action( pod=self.pod, @@ -845,7 +839,9 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): self.patch_already_checked(remote_pod, reraise=False) # Consider pod failed if the pod has not succeeded and the base container within the pod has not succeeded - failed = pod_phase != PodPhase.SUCCEEDED and not container_is_succeeded(remote_pod, self.base_container_name) + failed = pod_phase != PodPhase.SUCCEEDED and not container_is_succeeded( + remote_pod, self.base_container_name + ) if failed: if self.log_events_on_failure: diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index e3246635c04bf..033da88bf90c3 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -603,9 +603,7 @@ def await_container_completion(self, pod: V1Pod, container_name: str) -> None: self.log.info("Waiting for container '%s' state to be completed", container_name) time.sleep(1) - def await_pod_completion( - self, pod: V1Pod, container_name: str = "base" - ) -> V1Pod: + def await_pod_completion(self, pod: V1Pod, container_name: str = "base") -> V1Pod: """ Monitor a pod and return the final state. diff --git a/airflow/providers/fab/auth_manager/cli_commands/user_command.py b/airflow/providers/fab/auth_manager/cli_commands/user_command.py index 3050a9e250e58..8b6db12017876 100644 --- a/airflow/providers/fab/auth_manager/cli_commands/user_command.py +++ b/airflow/providers/fab/auth_manager/cli_commands/user_command.py @@ -212,10 +212,10 @@ def users_import(args): users_created, users_updated = _import_users(users_list) if users_created: - print("Created the following users:\n\t{}".format("\n\t".join(users_created))) + print(f"Created the following users:\n\t{'\\n\\t'.join(users_created)}") if users_updated: - print("Updated the following users:\n\t{}".format("\n\t".join(users_updated))) + print(f"Updated the following users:\n\t{'\\n\\t'.join(users_updated)}") def _import_users(users_list: list[dict[str, Any]]): @@ -231,9 +231,7 @@ def _import_users(users_list: list[dict[str, Any]]): msg.append(f"[Item {row_num}]") for key, value in failure.items(): msg.append(f"\t{key}: {value}") - raise SystemExit( - "Error: Input file didn't pass validation. See below:\n{}".format("\n".join(msg)) - ) + raise SystemExit(f"Error: Input file didn't pass validation. See below:\n{'\\n'.join(msg)}") for user in users_list: roles = [] From 67f720188944ec4f59aac1d8c8704e21907196a8 Mon Sep 17 00:00:00 2001 From: Jonathan Ostrander Date: Tue, 21 May 2024 11:56:19 -0400 Subject: [PATCH 3/4] fix: change failed condition for pods during clean-up to only depend on base container state --- .../cncf/kubernetes/operators/pod.py | 23 ++++++------------- .../cncf/kubernetes/operators/test_pod.py | 15 ++++++++---- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index d892ef701162c..4742d3a1538ca 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -78,6 +78,7 @@ PodPhase, check_exception_is_kubernetes_api_unauthorized, container_is_succeeded, + get_container_status, get_container_termination_message, ) from airflow.settings import pod_mutation_hook @@ -832,30 +833,20 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): if self._killed or not remote_pod: return - pod_phase = remote_pod.status.phase if hasattr(remote_pod, "status") else None + # Only consider pod failed if the base container within the pod has not succeeded + failed = not container_is_succeeded(remote_pod, self.base_container_name) # if the pod fails or success, but we don't want to delete it - if pod_phase != PodPhase.SUCCEEDED or self.on_finish_action == OnFinishAction.KEEP_POD: + if failed or self.on_finish_action == OnFinishAction.KEEP_POD: self.patch_already_checked(remote_pod, reraise=False) - # Consider pod failed if the pod has not succeeded and the base container within the pod has not succeeded - failed = pod_phase != PodPhase.SUCCEEDED and not container_is_succeeded( - remote_pod, self.base_container_name - ) - - if failed: - if self.log_events_on_failure: - self._read_pod_events(pod, reraise=False) + if failed and self.log_events_on_failure: + self._read_pod_events(pod, reraise=False) self.process_pod_deletion(remote_pod, reraise=False) if self.skip_on_exit_code: - container_statuses = ( - remote_pod.status.container_statuses if remote_pod and remote_pod.status else None - ) or [] - base_container_status = next( - (x for x in container_statuses if x.name == self.base_container_name), None - ) + base_container_status = get_container_status(remote_pod, self.base_container_name) exit_code = ( base_container_status.state.terminated.exit_code if base_container_status diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index af4a44c673f83..9cf96e5e21c42 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -24,7 +24,7 @@ import pendulum import pytest -from kubernetes.client import ApiClient, V1Pod, V1PodSecurityContext, V1PodStatus, models as k8s +from kubernetes.client import ApiClient, V1Pod, V1PodSecurityContext, models as k8s from kubernetes.client.rest import ApiException from urllib3 import HTTPResponse @@ -44,7 +44,7 @@ ) from airflow.providers.cncf.kubernetes.secret import Secret from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger -from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction, PodLoggingStatus, PodPhase +from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction, PodLoggingStatus from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults from airflow.utils import timezone from airflow.utils.session import create_session @@ -2059,10 +2059,17 @@ def test_async_write_logs_should_execute_successfully( (False, r"Pod task-.* returned a failure.(?!\nremote_pod:)"), ], ) - def test_cleanup_log_pod_spec_on_failure(self, log_pod_spec_on_failure, expect_match): + @patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_succeeded") + def test_cleanup_log_pod_spec_on_failure( + self, + log_pod_spec_on_failure, + expect_match, + container_is_succeeded, + ): + container_is_succeeded.return_value = False + k = KubernetesPodOperator(task_id="task", log_pod_spec_on_failure=log_pod_spec_on_failure) pod = k.build_pod_request_obj(create_context(k)) - pod.status = V1PodStatus(phase=PodPhase.FAILED) with pytest.raises(AirflowException, match=expect_match): k.cleanup(pod, pod) From ec1c1f5a1f3b199255f07a2f83d4ab455512dd5c Mon Sep 17 00:00:00 2001 From: Jonathan Ostrander Date: Tue, 21 May 2024 12:07:14 -0400 Subject: [PATCH 4/4] fix: revert user_command changes made by pre-commit --- .../fab/auth_manager/cli_commands/user_command.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/airflow/providers/fab/auth_manager/cli_commands/user_command.py b/airflow/providers/fab/auth_manager/cli_commands/user_command.py index 8b6db12017876..3050a9e250e58 100644 --- a/airflow/providers/fab/auth_manager/cli_commands/user_command.py +++ b/airflow/providers/fab/auth_manager/cli_commands/user_command.py @@ -212,10 +212,10 @@ def users_import(args): users_created, users_updated = _import_users(users_list) if users_created: - print(f"Created the following users:\n\t{'\\n\\t'.join(users_created)}") + print("Created the following users:\n\t{}".format("\n\t".join(users_created))) if users_updated: - print(f"Updated the following users:\n\t{'\\n\\t'.join(users_updated)}") + print("Updated the following users:\n\t{}".format("\n\t".join(users_updated))) def _import_users(users_list: list[dict[str, Any]]): @@ -231,7 +231,9 @@ def _import_users(users_list: list[dict[str, Any]]): msg.append(f"[Item {row_num}]") for key, value in failure.items(): msg.append(f"\t{key}: {value}") - raise SystemExit(f"Error: Input file didn't pass validation. See below:\n{'\\n'.join(msg)}") + raise SystemExit( + "Error: Input file didn't pass validation. See below:\n{}".format("\n".join(msg)) + ) for user in users_list: roles = []