From 06a1957313f49a031cf481a13e50fd9becf9a863 Mon Sep 17 00:00:00 2001 From: Ashwin Agate Date: Fri, 23 Jun 2023 17:32:18 -0700 Subject: [PATCH 1/5] Fix KubernetesPodOperator validate xcom json and add retries - Added retries to extract_xcom to guard against intermittent network connectivity failures. - xcom json is validated to make sure entire json was retrieved. - xcom sidecar is killed only if xcom json that was retrieved was valid. --- .../cncf/kubernetes/utils/pod_manager.py | 44 ++++++++++++++++- .../cncf/kubernetes/utils/test_pod_manager.py | 48 +++++++++++++++++++ 2 files changed, 91 insertions(+), 1 deletion(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 3bacb95f4ff76..995bdfbc04547 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -544,6 +544,21 @@ def await_xcom_sidecar_container_start(self, pod: V1Pod) -> None: def extract_xcom(self, pod: V1Pod) -> str: """Retrieves XCom value and kills xcom sidecar container.""" + try: + result = self.extract_xcom_json(pod) + return result + except Exception as ex: + raise ex + finally: + self.extract_xcom_kill(pod) + + @tenacity.retry( + stop=tenacity.stop_after_attempt(5), + wait=tenacity.wait_exponential(multiplier=1, min=4, max=10), + reraise=True, + ) + def extract_xcom_json(self, pod: V1Pod) -> str: + """Retrieves XCom value and also checks if xcom json is valid.""" with closing( kubernetes_stream( self._client.connect_get_namespaced_pod_exec, @@ -562,11 +577,38 @@ def extract_xcom(self, pod: V1Pod) -> str: resp, f"if [ -s {PodDefaults.XCOM_MOUNT_PATH}/return.json ]; then cat {PodDefaults.XCOM_MOUNT_PATH}/return.json; else echo __airflow_xcom_result_empty__; fi", # noqa ) - self._exec_pod_command(resp, "kill -s SIGINT 1") + if isinstance(result, str) and result.rstrip() != "__airflow_xcom_result_empty__": + try: + json.loads(result) + except Exception as ex: + raise ex if result is None: raise AirflowException(f"Failed to extract xcom from pod: {pod.metadata.name}") return result + @tenacity.retry( + stop=tenacity.stop_after_attempt(5), + wait=tenacity.wait_exponential(multiplier=1, min=4, max=10), + reraise=True, + ) + def extract_xcom_kill(self, pod: V1Pod): + """Kills xcom sidecar container.""" + with closing( + kubernetes_stream( + self._client.connect_get_namespaced_pod_exec, + pod.metadata.name, + pod.metadata.namespace, + container=PodDefaults.SIDECAR_CONTAINER_NAME, + command=["/bin/sh"], + stdin=True, + stdout=True, + stderr=True, + tty=False, + _preload_content=False, + ) + ) as resp: + self._exec_pod_command(resp, "kill -s SIGINT 1") + def _exec_pod_command(self, resp, command: str) -> str | None: res = None if resp.is_open(): diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index a55be38a5e227..25bcaf0feede6 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -18,6 +18,7 @@ import logging from datetime import datetime +from json.decoder import JSONDecodeError from unittest import mock from unittest.mock import MagicMock @@ -370,6 +371,53 @@ def test_container_is_terminated_with_waiting_state(self, container_state, expec pod_info.status.container_statuses = [container_status] assert container_is_terminated(pod_info, "base") == expected_is_terminated + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.kubernetes_stream") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager._exec_pod_command") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.extract_xcom_kill") + def test_extract_xcom_success(self, mock_exec_xcom_kill, mock_exec_pod_command, mock_kubernetes_stream): + """test when valid json is retrieved from xcom sidecar container.""" + xcom_json = """{"a": "true"}""" + mock_pod = MagicMock() + mock_exec_pod_command.return_value = xcom_json + ret = self.pod_manager.extract_xcom(pod=mock_pod) + assert ret == xcom_json + assert mock_exec_xcom_kill.call_count == 1 + + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.kubernetes_stream") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager._exec_pod_command") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.extract_xcom_kill") + def test_extract_xcom_failure(self, mock_exec_xcom_kill, mock_exec_pod_command, mock_kubernetes_stream): + """test when invalid json is retrieved from xcom sidecar container.""" + with pytest.raises(JSONDecodeError): + xcom_json = """{"a": "tru""" + mock_pod = MagicMock() + mock_exec_pod_command.return_value = xcom_json + self.pod_manager.extract_xcom(pod=mock_pod) + assert mock_exec_xcom_kill.call_count == 1 + + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.kubernetes_stream") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager._exec_pod_command") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.extract_xcom_kill") + def test_extract_xcom_empty(self, mock_exec_xcom_kill, mock_exec_pod_command, mock_kubernetes_stream): + """test when None is retrieved from xcom sidecar container.""" + mock_pod = MagicMock() + xcom_result = "__airflow_xcom_result_empty__" + mock_exec_pod_command.return_value = xcom_result + ret = self.pod_manager.extract_xcom(pod=mock_pod) + assert ret == xcom_result + assert mock_exec_xcom_kill.call_count == 1 + + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.kubernetes_stream") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager._exec_pod_command") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.extract_xcom_kill") + def test_extract_xcom_none(self, mock_exec_xcom_kill, mock_exec_pod_command, mock_kubernetes_stream): + """test when None is retrieved from xcom sidecar container.""" + with pytest.raises(AirflowException): + mock_pod = MagicMock() + mock_exec_pod_command.return_value = None + self.pod_manager.extract_xcom(pod=mock_pod) + assert mock_exec_xcom_kill.call_count == 1 + def params_for_test_container_is_running(): """The `container_is_running` method is designed to handle an assortment of bad objects From 3f797bc9abb9c85130d1be3177410305b90626b8 Mon Sep 17 00:00:00 2001 From: Ashwin Agate Date: Wed, 28 Jun 2023 16:51:46 -0700 Subject: [PATCH 2/5] Fix KubernetesPodOperator validate xcom json and add retries --- airflow/providers/cncf/kubernetes/utils/pod_manager.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 995bdfbc04547..a7e9064f82719 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -547,8 +547,6 @@ def extract_xcom(self, pod: V1Pod) -> str: try: result = self.extract_xcom_json(pod) return result - except Exception as ex: - raise ex finally: self.extract_xcom_kill(pod) @@ -577,11 +575,9 @@ def extract_xcom_json(self, pod: V1Pod) -> str: resp, f"if [ -s {PodDefaults.XCOM_MOUNT_PATH}/return.json ]; then cat {PodDefaults.XCOM_MOUNT_PATH}/return.json; else echo __airflow_xcom_result_empty__; fi", # noqa ) - if isinstance(result, str) and result.rstrip() != "__airflow_xcom_result_empty__": - try: - json.loads(result) - except Exception as ex: - raise ex + if result and result.rstrip() != "__airflow_xcom_result_empty__": + json.loads(result) + if result is None: raise AirflowException(f"Failed to extract xcom from pod: {pod.metadata.name}") return result From d42d10bd81b45bb798e9eba8f6e11284f49e12a7 Mon Sep 17 00:00:00 2001 From: Ashwin Agate Date: Thu, 29 Jun 2023 16:40:10 -0700 Subject: [PATCH 3/5] Fix KubernetesPodOperator validate xcom json and add retries --- airflow/providers/cncf/kubernetes/utils/pod_manager.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index a7e9064f82719..14e70580c1d9c 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -576,6 +576,8 @@ def extract_xcom_json(self, pod: V1Pod) -> str: f"if [ -s {PodDefaults.XCOM_MOUNT_PATH}/return.json ]; then cat {PodDefaults.XCOM_MOUNT_PATH}/return.json; else echo __airflow_xcom_result_empty__; fi", # noqa ) if result and result.rstrip() != "__airflow_xcom_result_empty__": + # Note: result string is parsed to check if its valid json. + # This function still returns a string which is converted into json dict in the calling method. json.loads(result) if result is None: From 0728c505336034493137591da20e20f49338a9f7 Mon Sep 17 00:00:00 2001 From: Ashwin Agate Date: Thu, 29 Jun 2023 20:01:32 -0700 Subject: [PATCH 4/5] Fix KubernetesPodOperator validate xcom json and add retries --- tests/providers/cncf/kubernetes/utils/test_pod_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index 25bcaf0feede6..8f28d33dfdea5 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -399,7 +399,7 @@ def test_extract_xcom_failure(self, mock_exec_xcom_kill, mock_exec_pod_command, @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager._exec_pod_command") @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.extract_xcom_kill") def test_extract_xcom_empty(self, mock_exec_xcom_kill, mock_exec_pod_command, mock_kubernetes_stream): - """test when None is retrieved from xcom sidecar container.""" + """test when __airflow_xcom_result_empty__ is retrieved from xcom sidecar container.""" mock_pod = MagicMock() xcom_result = "__airflow_xcom_result_empty__" mock_exec_pod_command.return_value = xcom_result From ea71471329bb57ebd6100acd5c706d7a28dbd0b8 Mon Sep 17 00:00:00 2001 From: Ashwin Agate Date: Fri, 30 Jun 2023 07:13:29 -0700 Subject: [PATCH 5/5] Fix KubernetesPodOperator validate xcom json and add retries --- airflow/providers/cncf/kubernetes/utils/pod_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 14e70580c1d9c..a531316666ca5 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -577,7 +577,7 @@ def extract_xcom_json(self, pod: V1Pod) -> str: ) if result and result.rstrip() != "__airflow_xcom_result_empty__": # Note: result string is parsed to check if its valid json. - # This function still returns a string which is converted into json dict in the calling method. + # This function still returns a string which is converted into json in the calling method. json.loads(result) if result is None: