diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index e4fc78b179a66..a23acd3a05a6d 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -215,6 +215,7 @@ def __init__( termination_grace_period: int | None = None, configmaps: list[str] | None = None, resources: dict[str, Any] | None = None, + istio_enabled: bool = False, **kwargs, ) -> None: @@ -287,6 +288,7 @@ def __init__( self.termination_grace_period = termination_grace_period self.pod_request_obj: k8s.V1Pod | None = None self.pod: k8s.V1Pod | None = None + self.istio_enabled = istio_enabled def _render_nested_template_fields( self, @@ -342,7 +344,7 @@ def _get_ti_pod_labels(context: Context | None = None, include_try_number: bool @cached_property def pod_manager(self) -> PodManager: - return PodManager(kube_client=self.client) + return PodManager(kube_client=self.client, istio_enabled=self.istio_enabled) def get_hook(self): warnings.warn("get_hook is deprecated. Please use hook instead.", DeprecationWarning, stacklevel=2) @@ -453,7 +455,7 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): if not self.is_delete_operator_pod: with _suppress(Exception): self.patch_already_checked(remote_pod) - if pod_phase != PodPhase.SUCCEEDED: + if (not self.istio_enabled and pod_phase != PodPhase.SUCCEEDED) or (self.istio_enabled and pod_phase != PodPhase.SUCCEEDED and not self.pod_manager.container_is_succeeded(pod, 'base')): if self.log_events_on_failure: with _suppress(Exception): for event in self.pod_manager.read_pod_events(pod).items: diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index c65150b4181d4..528e4597a78d0 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -84,6 +84,36 @@ def container_is_running(pod: V1Pod, container_name: str) -> bool: return container_status.state.running is not None +def container_is_completed(pod: V1Pod, container_name: str) -> bool: + """ + Examines V1Pod ``pod`` to determine whether ``container_name`` is completed. + If that container is present and completed, returns True. Returns False otherwise. + """ + container_statuses = pod.status.container_statuses if pod and pod.status else None + if not container_statuses: + return False + container_status = next(iter([x for x in container_statuses if x.name == container_name]), None) + if not container_status: + return False + return container_status.state.terminated is not None + + +def container_is_succeeded(pod: V1Pod, container_name: str) -> bool: + """ + Examines V1Pod ``pod`` to determine whether ``container_name`` is completed and succeeded. + If that container is present and completed and succeeded, returns True. Returns False otherwise. + """ + if not container_is_completed(pod, container_name): + return False + container_statuses = pod.status.container_statuses if pod and pod.status else None + if not container_statuses: + return False + container_status = next(iter([x for x in container_statuses if x.name == container_name]), None) + if not container_status: + return False + return container_status.state.terminated.exit_code == 0 + + def get_container_termination_message(pod: V1Pod, container_name: str): try: container_statuses = pod.status.container_statuses @@ -112,6 +142,7 @@ def __init__( kube_client: client.CoreV1Api = None, in_cluster: bool = True, cluster_context: str | None = None, + istio_enabled: bool = False, ): """ Creates the launcher. @@ -123,6 +154,7 @@ def __init__( super().__init__() self._client = kube_client or get_kube_client(in_cluster=in_cluster, cluster_context=cluster_context) self._watch = watch.Watch() + self.istio_enabled = istio_enabled def run_pod_async(self, pod: V1Pod, **kwargs) -> V1Pod: """Runs POD asynchronously""" @@ -272,6 +304,10 @@ def await_pod_completion(self, pod: V1Pod) -> V1Pod: remote_pod = self.read_pod(pod) if remote_pod.status.phase in PodPhase.terminal_states: break + if self.istio_enabled and remote_pod.status.phase == PodPhase.RUNNING and self.container_is_completed( + remote_pod, 'base' + ): + break self.log.info('Pod %s has phase %s', pod.metadata.name, remote_pod.status.phase) time.sleep(2) return remote_pod @@ -306,6 +342,16 @@ def container_is_running(self, pod: V1Pod, container_name: str) -> bool: remote_pod = self.read_pod(pod) return container_is_running(pod=remote_pod, container_name=container_name) + def container_is_completed(self, pod: V1Pod, container_name: str) -> bool: + """Reads pod and checks if container is completed""" + remote_pod = self.read_pod(pod) + return container_is_completed(pod=remote_pod, container_name=container_name) + + def container_is_succeeded(self, pod: V1Pod, container_name: str) -> bool: + """Reads pod and checks if container is succeeded""" + remote_pod = self.read_pod(pod) + return container_is_succeeded(pod=remote_pod, container_name=container_name) + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def read_pod_logs( self, diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index 5299b6a4be778..5e42ea9cd564c 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -28,7 +28,13 @@ from urllib3.exceptions import HTTPError as BaseHTTPError from airflow.exceptions import AirflowException -from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager, PodPhase, container_is_running +from airflow.providers.cncf.kubernetes.utils.pod_manager import ( + PodManager, + PodPhase, + container_is_completed, + container_is_running, + container_is_succeeded, +) class TestPodManager: @@ -388,3 +394,119 @@ def test_container_is_running(remote_pod, result): an object `e` such that `e.status.container_statuses` is None, and so on. This test verifies the expected behavior.""" assert container_is_running(remote_pod, 'base') is result + + +def params_for_test_container_is_completed(): + """The `container_is_completed` method is designed to handle an assortment of bad objects + returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, + an object `e` such that `e.status.container_statuses` is None, and so on. This function + emits params used in `test_container_is_completed` to verify this behavior. + + We create mock classes not derived from MagicMock because with an instance `e` of MagicMock, + tests like `e.hello is not None` are always True. + """ + + class RemotePodMock: + pass + + class ContainerStatusMock: + def __init__(self, name): + self.name = name + + def remote_pod(completed=None, not_completed=None): + e = RemotePodMock() + e.status = RemotePodMock() + e.status.container_statuses = [] + for r in not_completed or []: + e.status.container_statuses.append(container(r, False)) + for r in completed or []: + e.status.container_statuses.append(container(r, True)) + return e + + def container(name, completed): + c = ContainerStatusMock(name) + c.state = RemotePodMock() + c.state.terminated = {'a': 'b'} if completed else None + return c + + pod_mock_list = [] + pod_mock_list.append(pytest.param(None, False, id='None remote_pod')) + p = RemotePodMock() + p.status = None + pod_mock_list.append(pytest.param(p, False, id='None remote_pod.status')) + p = RemotePodMock() + p.status = RemotePodMock() + p.status.container_statuses = [] + pod_mock_list.append(pytest.param(p, False, id='empty remote_pod.status.container_statuses')) + pod_mock_list.append(pytest.param(remote_pod(), False, id='filter empty')) + pod_mock_list.append(pytest.param(remote_pod(None, ['base']), False, id='filter 0 completed')) + pod_mock_list.append(pytest.param(remote_pod(['hello'], ['base']), False, id='filter 1 not completed')) + pod_mock_list.append(pytest.param(remote_pod(['base'], ['hello']), True, id='filter 1 completed')) + return pod_mock_list + + +@pytest.mark.parametrize('remote_pod, result', params_for_test_container_is_completed()) +def test_container_is_completed(remote_pod, result): + """The `container_is_completed` function is designed to handle an assortment of bad objects + returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, + an object `e` such that `e.status.container_statuses` is None, and so on. This test + verifies the expected behavior.""" + assert container_is_completed(remote_pod, 'base') is result + + +def params_for_test_container_is_succeeded(): + """The `container_is_succeeded` method is designed to handle an assortment of bad objects + returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, + an object `e` such that `e.status.container_statuses` is None, and so on. This function + emits params used in `test_container_is_succeeded` to verify this behavior. + + We create mock classes not derived from MagicMock because with an instance `e` of MagicMock, + tests like `e.hello is not None` are always True. + """ + + class RemotePodMock: + pass + + class ContainerStatusMock: + def __init__(self, name): + self.name = name + + def remote_pod(succeeded=None, not_succeeded=None): + e = RemotePodMock() + e.status = RemotePodMock() + e.status.container_statuses = [] + for r in not_succeeded or []: + e.status.container_statuses.append(container(r, False)) + for r in succeeded or []: + e.status.container_statuses.append(container(r, True)) + return e + + def container(name, succeeded): + c = ContainerStatusMock(name) + c.state = RemotePodMock() + c.state.terminated = {'exit_code': 0} if succeeded else None + return c + + pod_mock_list = [] + pod_mock_list.append(pytest.param(None, False, id='None remote_pod')) + p = RemotePodMock() + p.status = None + pod_mock_list.append(pytest.param(p, False, id='None remote_pod.status')) + p = RemotePodMock() + p.status = RemotePodMock() + p.status.container_statuses = [] + pod_mock_list.append(pytest.param(p, False, id='empty remote_pod.status.container_statuses')) + pod_mock_list.append(pytest.param(remote_pod(), False, id='filter empty')) + pod_mock_list.append(pytest.param(remote_pod(None, ['base']), False, id='filter 0 succeeded')) + pod_mock_list.append(pytest.param(remote_pod(['hello'], ['base']), False, id='filter 1 not succeeded')) + pod_mock_list.append(pytest.param(remote_pod(['base'], ['hello']), True, id='filter 1 succeeded')) + return pod_mock_list + + +@pytest.mark.parametrize('remote_pod, result', params_for_test_container_is_succeeded()) +def test_container_is_succeeded(remote_pod, result): + """The `container_is_succeeded` function is designed to handle an assortment of bad objects + returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, + an object `e` such that `e.status.container_statuses` is None, and so on. This test + verifies the expected behavior.""" + assert container_is_succeeded(remote_pod, 'base') is result