From b9d8a02dc858f17d18466f4cfa311818e01b640e Mon Sep 17 00:00:00 2001 From: Joshua Date: Wed, 28 Sep 2022 12:13:16 +0800 Subject: [PATCH 1/2] Added IstioKubernetesPodOperator and IstioPodManager --- .../operators/istio_kubernetes_pod.py | 58 ++++++++ .../kubernetes/utils/istio_pod_manager.py | 65 +++++++++ .../cncf/kubernetes/utils/pod_manager.py | 30 +++++ .../cncf/kubernetes/utils/test_pod_manager.py | 124 +++++++++++++++++- 4 files changed, 276 insertions(+), 1 deletion(-) create mode 100644 airflow/providers/cncf/kubernetes/operators/istio_kubernetes_pod.py create mode 100644 airflow/providers/cncf/kubernetes/utils/istio_pod_manager.py diff --git a/airflow/providers/cncf/kubernetes/operators/istio_kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/istio_kubernetes_pod.py new file mode 100644 index 0000000000000..ab6401f65cd3a --- /dev/null +++ b/airflow/providers/cncf/kubernetes/operators/istio_kubernetes_pod.py @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Executes task in a Kubernetes POD with Istio enabled""" +from __future__ import annotations + +from kubernetes.client import models as k8s + +from airflow import AirflowException +from airflow.compat.functools import cached_property +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator, _suppress +from airflow.providers.cncf.kubernetes.utils.istio_pod_manager import IstioPodManager +from airflow.providers.cncf.kubernetes.utils.pod_manager import PodPhase + + +class IstioKubernetesPodOperator(KubernetesPodOperator): + """ + Execute a task in a Kubernetes Pod with Istio enabled + All parameters are the same with KubernetesPodOperator, the only difference is the cleanup part + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:KubernetesPodOperator` + """ + + @cached_property + def pod_manager(self) -> IstioPodManager: + return IstioPodManager(kube_client=self.client) + + def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): + pod_phase = remote_pod.status.phase if hasattr(remote_pod, 'status') else None + if pod_phase != PodPhase.SUCCEEDED and not self.pod_manager.container_is_succeeded(pod, 'base'): + if self.log_events_on_failure: + with _suppress(Exception): + for event in self.pod_manager.read_pod_events(pod).items: + self.log.error("Pod Event: %s - %s", event.reason, event.message) + if not self.is_delete_operator_pod: + with _suppress(Exception): + self.patch_already_checked(pod) + with _suppress(Exception): + self.process_pod_deletion(pod) + raise AirflowException(f'Pod {pod and pod.metadata.name} returned a failure: {remote_pod}') + else: + with _suppress(Exception): + self.process_pod_deletion(pod) diff --git a/airflow/providers/cncf/kubernetes/utils/istio_pod_manager.py b/airflow/providers/cncf/kubernetes/utils/istio_pod_manager.py new file mode 100644 index 0000000000000..2248d565181f0 --- /dev/null +++ b/airflow/providers/cncf/kubernetes/utils/istio_pod_manager.py @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import time + +from kubernetes.client.models.v1_pod import V1Pod + +from airflow.providers.cncf.kubernetes.utils.pod_manager import ( + PodManager, + PodPhase, + container_is_completed, + container_is_succeeded, +) + + +class IstioPodManager(PodManager): + """ + Helper class for checking the status of base container in Kubernetes pods + for use with the IstioKubernetesPodOperator + """ + + def container_is_completed(self, pod: V1Pod, container_name: str) -> bool: + """Reads pod and checks if container is completed""" + remote_pod = self.read_pod(pod) + return container_is_completed(pod=remote_pod, container_name=container_name) + + def container_is_succeeded(self, pod: V1Pod, container_name: str) -> bool: + """Reads pod and checks if container is succeeded""" + remote_pod = self.read_pod(pod) + return container_is_succeeded(pod=remote_pod, container_name=container_name) + + def await_pod_completion(self, pod: V1Pod) -> V1Pod: + """ + Monitors a pod and returns the final state + (neglect sidecar state e.g. istio-proxy, vault-agent) + :param pod: pod spec that will be monitored + :return: Tuple[State, Optional[str]] + """ + while True: + remote_pod = self.read_pod(pod) + if remote_pod.status.phase in PodPhase.terminal_states: + break + if remote_pod.status.phase == PodPhase.RUNNING and self.container_is_completed( + remote_pod, 'base' + ): + break + self.log.info('Pod %s has phase %s', pod.metadata.name, remote_pod.status.phase) + time.sleep(2) + return remote_pod diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index c65150b4181d4..7aa1fda519937 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -84,6 +84,36 @@ def container_is_running(pod: V1Pod, container_name: str) -> bool: return container_status.state.running is not None +def container_is_completed(pod: V1Pod, container_name: str) -> bool: + """ + Examines V1Pod ``pod`` to determine whether ``container_name`` is completed. + If that container is present and completed, returns True. Returns False otherwise. + """ + container_statuses = pod.status.container_statuses if pod and pod.status else None + if not container_statuses: + return False + container_status = next(iter([x for x in container_statuses if x.name == container_name]), None) + if not container_status: + return False + return container_status.state.terminated is not None + + +def container_is_succeeded(pod: V1Pod, container_name: str) -> bool: + """ + Examines V1Pod ``pod`` to determine whether ``container_name`` is completed and succeeded. + If that container is present and completed and succeeded, returns True. Returns False otherwise. + """ + if not container_is_completed(pod, container_name): + return False + container_statuses = pod.status.container_statuses if pod and pod.status else None + if not container_statuses: + return False + container_status = next(iter([x for x in container_statuses if x.name == container_name]), None) + if not container_status: + return False + return container_status.state.terminated.exit_code == 0 + + def get_container_termination_message(pod: V1Pod, container_name: str): try: container_statuses = pod.status.container_statuses diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index 5299b6a4be778..5e42ea9cd564c 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -28,7 +28,13 @@ from urllib3.exceptions import HTTPError as BaseHTTPError from airflow.exceptions import AirflowException -from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager, PodPhase, container_is_running +from airflow.providers.cncf.kubernetes.utils.pod_manager import ( + PodManager, + PodPhase, + container_is_completed, + container_is_running, + container_is_succeeded, +) class TestPodManager: @@ -388,3 +394,119 @@ def test_container_is_running(remote_pod, result): an object `e` such that `e.status.container_statuses` is None, and so on. This test verifies the expected behavior.""" assert container_is_running(remote_pod, 'base') is result + + +def params_for_test_container_is_completed(): + """The `container_is_completed` method is designed to handle an assortment of bad objects + returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, + an object `e` such that `e.status.container_statuses` is None, and so on. This function + emits params used in `test_container_is_completed` to verify this behavior. + + We create mock classes not derived from MagicMock because with an instance `e` of MagicMock, + tests like `e.hello is not None` are always True. + """ + + class RemotePodMock: + pass + + class ContainerStatusMock: + def __init__(self, name): + self.name = name + + def remote_pod(completed=None, not_completed=None): + e = RemotePodMock() + e.status = RemotePodMock() + e.status.container_statuses = [] + for r in not_completed or []: + e.status.container_statuses.append(container(r, False)) + for r in completed or []: + e.status.container_statuses.append(container(r, True)) + return e + + def container(name, completed): + c = ContainerStatusMock(name) + c.state = RemotePodMock() + c.state.terminated = {'a': 'b'} if completed else None + return c + + pod_mock_list = [] + pod_mock_list.append(pytest.param(None, False, id='None remote_pod')) + p = RemotePodMock() + p.status = None + pod_mock_list.append(pytest.param(p, False, id='None remote_pod.status')) + p = RemotePodMock() + p.status = RemotePodMock() + p.status.container_statuses = [] + pod_mock_list.append(pytest.param(p, False, id='empty remote_pod.status.container_statuses')) + pod_mock_list.append(pytest.param(remote_pod(), False, id='filter empty')) + pod_mock_list.append(pytest.param(remote_pod(None, ['base']), False, id='filter 0 completed')) + pod_mock_list.append(pytest.param(remote_pod(['hello'], ['base']), False, id='filter 1 not completed')) + pod_mock_list.append(pytest.param(remote_pod(['base'], ['hello']), True, id='filter 1 completed')) + return pod_mock_list + + +@pytest.mark.parametrize('remote_pod, result', params_for_test_container_is_completed()) +def test_container_is_completed(remote_pod, result): + """The `container_is_completed` function is designed to handle an assortment of bad objects + returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, + an object `e` such that `e.status.container_statuses` is None, and so on. This test + verifies the expected behavior.""" + assert container_is_completed(remote_pod, 'base') is result + + +def params_for_test_container_is_succeeded(): + """The `container_is_succeeded` method is designed to handle an assortment of bad objects + returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, + an object `e` such that `e.status.container_statuses` is None, and so on. This function + emits params used in `test_container_is_succeeded` to verify this behavior. + + We create mock classes not derived from MagicMock because with an instance `e` of MagicMock, + tests like `e.hello is not None` are always True. + """ + + class RemotePodMock: + pass + + class ContainerStatusMock: + def __init__(self, name): + self.name = name + + def remote_pod(succeeded=None, not_succeeded=None): + e = RemotePodMock() + e.status = RemotePodMock() + e.status.container_statuses = [] + for r in not_succeeded or []: + e.status.container_statuses.append(container(r, False)) + for r in succeeded or []: + e.status.container_statuses.append(container(r, True)) + return e + + def container(name, succeeded): + c = ContainerStatusMock(name) + c.state = RemotePodMock() + c.state.terminated = {'exit_code': 0} if succeeded else None + return c + + pod_mock_list = [] + pod_mock_list.append(pytest.param(None, False, id='None remote_pod')) + p = RemotePodMock() + p.status = None + pod_mock_list.append(pytest.param(p, False, id='None remote_pod.status')) + p = RemotePodMock() + p.status = RemotePodMock() + p.status.container_statuses = [] + pod_mock_list.append(pytest.param(p, False, id='empty remote_pod.status.container_statuses')) + pod_mock_list.append(pytest.param(remote_pod(), False, id='filter empty')) + pod_mock_list.append(pytest.param(remote_pod(None, ['base']), False, id='filter 0 succeeded')) + pod_mock_list.append(pytest.param(remote_pod(['hello'], ['base']), False, id='filter 1 not succeeded')) + pod_mock_list.append(pytest.param(remote_pod(['base'], ['hello']), True, id='filter 1 succeeded')) + return pod_mock_list + + +@pytest.mark.parametrize('remote_pod, result', params_for_test_container_is_succeeded()) +def test_container_is_succeeded(remote_pod, result): + """The `container_is_succeeded` function is designed to handle an assortment of bad objects + returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, + an object `e` such that `e.status.container_statuses` is None, and so on. This test + verifies the expected behavior.""" + assert container_is_succeeded(remote_pod, 'base') is result From 3a8103d78eba2e5d161113c80e00d3d106a5c48e Mon Sep 17 00:00:00 2001 From: Joshua Date: Fri, 21 Oct 2022 14:31:03 +0800 Subject: [PATCH 2/2] Merged into KubernetesPodOperator(istio_enabled=True) --- .../operators/istio_kubernetes_pod.py | 58 ----------------- .../kubernetes/operators/kubernetes_pod.py | 6 +- .../kubernetes/utils/istio_pod_manager.py | 65 ------------------- .../cncf/kubernetes/utils/pod_manager.py | 16 +++++ 4 files changed, 20 insertions(+), 125 deletions(-) delete mode 100644 airflow/providers/cncf/kubernetes/operators/istio_kubernetes_pod.py delete mode 100644 airflow/providers/cncf/kubernetes/utils/istio_pod_manager.py diff --git a/airflow/providers/cncf/kubernetes/operators/istio_kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/istio_kubernetes_pod.py deleted file mode 100644 index ab6401f65cd3a..0000000000000 --- a/airflow/providers/cncf/kubernetes/operators/istio_kubernetes_pod.py +++ /dev/null @@ -1,58 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""Executes task in a Kubernetes POD with Istio enabled""" -from __future__ import annotations - -from kubernetes.client import models as k8s - -from airflow import AirflowException -from airflow.compat.functools import cached_property -from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator, _suppress -from airflow.providers.cncf.kubernetes.utils.istio_pod_manager import IstioPodManager -from airflow.providers.cncf.kubernetes.utils.pod_manager import PodPhase - - -class IstioKubernetesPodOperator(KubernetesPodOperator): - """ - Execute a task in a Kubernetes Pod with Istio enabled - All parameters are the same with KubernetesPodOperator, the only difference is the cleanup part - - .. seealso:: - For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:KubernetesPodOperator` - """ - - @cached_property - def pod_manager(self) -> IstioPodManager: - return IstioPodManager(kube_client=self.client) - - def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): - pod_phase = remote_pod.status.phase if hasattr(remote_pod, 'status') else None - if pod_phase != PodPhase.SUCCEEDED and not self.pod_manager.container_is_succeeded(pod, 'base'): - if self.log_events_on_failure: - with _suppress(Exception): - for event in self.pod_manager.read_pod_events(pod).items: - self.log.error("Pod Event: %s - %s", event.reason, event.message) - if not self.is_delete_operator_pod: - with _suppress(Exception): - self.patch_already_checked(pod) - with _suppress(Exception): - self.process_pod_deletion(pod) - raise AirflowException(f'Pod {pod and pod.metadata.name} returned a failure: {remote_pod}') - else: - with _suppress(Exception): - self.process_pod_deletion(pod) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index e4fc78b179a66..a23acd3a05a6d 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -215,6 +215,7 @@ def __init__( termination_grace_period: int | None = None, configmaps: list[str] | None = None, resources: dict[str, Any] | None = None, + istio_enabled: bool = False, **kwargs, ) -> None: @@ -287,6 +288,7 @@ def __init__( self.termination_grace_period = termination_grace_period self.pod_request_obj: k8s.V1Pod | None = None self.pod: k8s.V1Pod | None = None + self.istio_enabled = istio_enabled def _render_nested_template_fields( self, @@ -342,7 +344,7 @@ def _get_ti_pod_labels(context: Context | None = None, include_try_number: bool @cached_property def pod_manager(self) -> PodManager: - return PodManager(kube_client=self.client) + return PodManager(kube_client=self.client, istio_enabled=self.istio_enabled) def get_hook(self): warnings.warn("get_hook is deprecated. Please use hook instead.", DeprecationWarning, stacklevel=2) @@ -453,7 +455,7 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): if not self.is_delete_operator_pod: with _suppress(Exception): self.patch_already_checked(remote_pod) - if pod_phase != PodPhase.SUCCEEDED: + if (not self.istio_enabled and pod_phase != PodPhase.SUCCEEDED) or (self.istio_enabled and pod_phase != PodPhase.SUCCEEDED and not self.pod_manager.container_is_succeeded(pod, 'base')): if self.log_events_on_failure: with _suppress(Exception): for event in self.pod_manager.read_pod_events(pod).items: diff --git a/airflow/providers/cncf/kubernetes/utils/istio_pod_manager.py b/airflow/providers/cncf/kubernetes/utils/istio_pod_manager.py deleted file mode 100644 index 2248d565181f0..0000000000000 --- a/airflow/providers/cncf/kubernetes/utils/istio_pod_manager.py +++ /dev/null @@ -1,65 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from __future__ import annotations - -import time - -from kubernetes.client.models.v1_pod import V1Pod - -from airflow.providers.cncf.kubernetes.utils.pod_manager import ( - PodManager, - PodPhase, - container_is_completed, - container_is_succeeded, -) - - -class IstioPodManager(PodManager): - """ - Helper class for checking the status of base container in Kubernetes pods - for use with the IstioKubernetesPodOperator - """ - - def container_is_completed(self, pod: V1Pod, container_name: str) -> bool: - """Reads pod and checks if container is completed""" - remote_pod = self.read_pod(pod) - return container_is_completed(pod=remote_pod, container_name=container_name) - - def container_is_succeeded(self, pod: V1Pod, container_name: str) -> bool: - """Reads pod and checks if container is succeeded""" - remote_pod = self.read_pod(pod) - return container_is_succeeded(pod=remote_pod, container_name=container_name) - - def await_pod_completion(self, pod: V1Pod) -> V1Pod: - """ - Monitors a pod and returns the final state - (neglect sidecar state e.g. istio-proxy, vault-agent) - :param pod: pod spec that will be monitored - :return: Tuple[State, Optional[str]] - """ - while True: - remote_pod = self.read_pod(pod) - if remote_pod.status.phase in PodPhase.terminal_states: - break - if remote_pod.status.phase == PodPhase.RUNNING and self.container_is_completed( - remote_pod, 'base' - ): - break - self.log.info('Pod %s has phase %s', pod.metadata.name, remote_pod.status.phase) - time.sleep(2) - return remote_pod diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 7aa1fda519937..528e4597a78d0 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -142,6 +142,7 @@ def __init__( kube_client: client.CoreV1Api = None, in_cluster: bool = True, cluster_context: str | None = None, + istio_enabled: bool = False, ): """ Creates the launcher. @@ -153,6 +154,7 @@ def __init__( super().__init__() self._client = kube_client or get_kube_client(in_cluster=in_cluster, cluster_context=cluster_context) self._watch = watch.Watch() + self.istio_enabled = istio_enabled def run_pod_async(self, pod: V1Pod, **kwargs) -> V1Pod: """Runs POD asynchronously""" @@ -302,6 +304,10 @@ def await_pod_completion(self, pod: V1Pod) -> V1Pod: remote_pod = self.read_pod(pod) if remote_pod.status.phase in PodPhase.terminal_states: break + if self.istio_enabled and remote_pod.status.phase == PodPhase.RUNNING and self.container_is_completed( + remote_pod, 'base' + ): + break self.log.info('Pod %s has phase %s', pod.metadata.name, remote_pod.status.phase) time.sleep(2) return remote_pod @@ -336,6 +342,16 @@ def container_is_running(self, pod: V1Pod, container_name: str) -> bool: remote_pod = self.read_pod(pod) return container_is_running(pod=remote_pod, container_name=container_name) + def container_is_completed(self, pod: V1Pod, container_name: str) -> bool: + """Reads pod and checks if container is completed""" + remote_pod = self.read_pod(pod) + return container_is_completed(pod=remote_pod, container_name=container_name) + + def container_is_succeeded(self, pod: V1Pod, container_name: str) -> bool: + """Reads pod and checks if container is succeeded""" + remote_pod = self.read_pod(pod) + return container_is_succeeded(pod=remote_pod, container_name=container_name) + @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def read_pod_logs( self,