Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from functools import cached_property
from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence

import kubernetes
from kubernetes.client import CoreV1Api, V1Pod, models as k8s
from kubernetes.stream import stream
from urllib3.exceptions import HTTPError
Expand Down Expand Up @@ -380,6 +381,7 @@ def __init__(

self._config_dict: dict | None = None # TODO: remove it when removing convert_config_file_to_dict
self._progress_callback = progress_callback
self._killed: bool = False

@cached_property
def _incluster_namespace(self):
Expand Down Expand Up @@ -577,6 +579,7 @@ def execute_sync(self, context: Context):
pod=self.pod or self.pod_request_obj,
remote_pod=self.remote_pod,
)

if self.do_xcom_push:
return result

Expand Down Expand Up @@ -676,6 +679,11 @@ def post_complete_action(self, *, pod, remote_pod, **kwargs):
)

def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
# If a task got marked as failed, "on_kill" method would be called and the pod will be cleaned up
# there. Cleaning it up again will raise an exception (which might cause retry).
if self._killed:
return

istio_enabled = self.is_istio_enabled(remote_pod)
pod_phase = remote_pod.status.phase if hasattr(remote_pod, "status") else None

Expand Down Expand Up @@ -818,6 +826,7 @@ def patch_already_checked(self, pod: k8s.V1Pod, *, reraise=True):
)

def on_kill(self) -> None:
self._killed = True
if self.pod:
pod = self.pod
kwargs = {
Expand All @@ -826,7 +835,11 @@ def on_kill(self) -> None:
}
if self.termination_grace_period is not None:
kwargs.update(grace_period_seconds=self.termination_grace_period)
self.client.delete_namespaced_pod(**kwargs)

try:
self.client.delete_namespaced_pod(**kwargs)
except kubernetes.client.exceptions.ApiException:
self.log.exception("Unable to delete pod %s", self.pod.metadata.name)

def build_pod_request_obj(self, context: Context | None = None) -> k8s.V1Pod:
"""
Expand Down