diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 8a701b675a19d..70f8bc2252bfb 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -29,6 +29,7 @@ from functools import cached_property from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence +import kubernetes from kubernetes.client import CoreV1Api, V1Pod, models as k8s from kubernetes.stream import stream from urllib3.exceptions import HTTPError @@ -380,6 +381,7 @@ def __init__( self._config_dict: dict | None = None # TODO: remove it when removing convert_config_file_to_dict self._progress_callback = progress_callback + self._killed: bool = False @cached_property def _incluster_namespace(self): @@ -577,6 +579,7 @@ def execute_sync(self, context: Context): pod=self.pod or self.pod_request_obj, remote_pod=self.remote_pod, ) + if self.do_xcom_push: return result @@ -676,6 +679,11 @@ def post_complete_action(self, *, pod, remote_pod, **kwargs): ) def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): + # If a task got marked as failed, "on_kill" method would be called and the pod will be cleaned up + # there. Cleaning it up again will raise an exception (which might cause retry). + if self._killed: + return + istio_enabled = self.is_istio_enabled(remote_pod) pod_phase = remote_pod.status.phase if hasattr(remote_pod, "status") else None @@ -818,6 +826,7 @@ def patch_already_checked(self, pod: k8s.V1Pod, *, reraise=True): ) def on_kill(self) -> None: + self._killed = True if self.pod: pod = self.pod kwargs = { @@ -826,7 +835,11 @@ def on_kill(self) -> None: } if self.termination_grace_period is not None: kwargs.update(grace_period_seconds=self.termination_grace_period) - self.client.delete_namespaced_pod(**kwargs) + + try: + self.client.delete_namespaced_pod(**kwargs) + except kubernetes.client.exceptions.ApiException: + self.log.exception("Unable to delete pod %s", self.pod.metadata.name) def build_pod_request_obj(self, context: Context | None = None) -> k8s.V1Pod: """