From 4d9eeac12982e9e11f5cb897a75dd57744647a74 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Wed, 1 May 2024 18:41:10 +0530 Subject: [PATCH 1/5] Fetch intermediate log in async GKEStartPod --- .../google/cloud/operators/kubernetes_engine.py | 13 ++++++++----- .../google/cloud/triggers/kubernetes_engine.py | 2 ++ .../google/cloud/triggers/test_kubernetes_engine.py | 2 ++ 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py b/airflow/providers/google/cloud/operators/kubernetes_engine.py index 0a28809dd1597..260a42e567558 100644 --- a/airflow/providers/google/cloud/operators/kubernetes_engine.py +++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py @@ -73,6 +73,7 @@ if TYPE_CHECKING: from kubernetes.client.models import V1Job, V1Pod + from pendulum import DateTime from airflow.utils.context import Context @@ -773,16 +774,16 @@ def fetch_cluster_info(self) -> tuple[str, str | None]: self._ssl_ca_cert = cluster.master_auth.cluster_ca_certificate return self._cluster_url, self._ssl_ca_cert - def invoke_defer_method(self): + def invoke_defer_method(self, last_log_time: DateTime | None = None): """Redefine triggers which are being used in child classes.""" trigger_start_time = utcnow() self.defer( trigger=GKEStartPodTrigger( - pod_name=self.pod.metadata.name, - pod_namespace=self.pod.metadata.namespace, + pod_name=self.pod.metadata.name, # type: ignore[union-attr] + pod_namespace=self.pod.metadata.namespace, # type: ignore[union-attr] trigger_start_time=trigger_start_time, - cluster_url=self._cluster_url, - ssl_ca_cert=self._ssl_ca_cert, + cluster_url=self._cluster_url, # type: ignore[arg-type] + ssl_ca_cert=self._ssl_ca_cert, # type: ignore[arg-type] get_logs=self.get_logs, startup_timeout=self.startup_timeout_seconds, cluster_context=self.cluster_context, @@ -792,6 +793,8 @@ def invoke_defer_method(self): on_finish_action=self.on_finish_action, gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, + logging_interval=self.logging_interval, + last_log_time=last_log_time, ), method_name="execute_complete", kwargs={"cluster_url": self._cluster_url, "ssl_ca_cert": self._ssl_ca_cert}, diff --git a/airflow/providers/google/cloud/triggers/kubernetes_engine.py b/airflow/providers/google/cloud/triggers/kubernetes_engine.py index 8557bea082410..f05bb0dc6c731 100644 --- a/airflow/providers/google/cloud/triggers/kubernetes_engine.py +++ b/airflow/providers/google/cloud/triggers/kubernetes_engine.py @@ -142,6 +142,8 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "on_finish_action": self.on_finish_action.value, "gcp_conn_id": self.gcp_conn_id, "impersonation_chain": self.impersonation_chain, + "logging_interval": self.logging_interval, + "last_log_time": self.last_log_time, }, ) diff --git a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py index 8a18dfcb902af..8a43f3627c03d 100644 --- a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py +++ b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py @@ -125,6 +125,8 @@ def test_serialize_should_execute_successfully(self, trigger): "should_delete_pod": SHOULD_DELETE_POD, "gcp_conn_id": GCP_CONN_ID, "impersonation_chain": IMPERSONATION_CHAIN, + "last_log_time": None, + "logging_interval": None, } @pytest.mark.asyncio From e7a9af21b0bdb123a44e6cf10c7e27b8f58c59f2 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Sun, 5 May 2024 00:20:48 +0530 Subject: [PATCH 2/5] Add tests --- .../cloud/operators/test_kubernetes_engine.py | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/tests/providers/google/cloud/operators/test_kubernetes_engine.py b/tests/providers/google/cloud/operators/test_kubernetes_engine.py index c71cc99c7e64b..fd9a7b41e7185 100644 --- a/tests/providers/google/cloud/operators/test_kubernetes_engine.py +++ b/tests/providers/google/cloud/operators/test_kubernetes_engine.py @@ -673,6 +673,7 @@ def setup_method(self): namespace=NAMESPACE, image=IMAGE, deferrable=True, + on_finish_action="delete_pod", ) self.gke_op.pod = mock.MagicMock( name=TASK_NAME, @@ -703,6 +704,56 @@ def test_async_create_pod_should_execute_successfully( fetch_cluster_info_mock.assert_called_once() assert isinstance(exc.value.trigger, GKEStartPodTrigger) + @pytest.mark.parametrize("status", ["error", "failed", "timeout"]) + @mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod") + @mock.patch(KUB_OP_PATH.format("_clean")) + @mock.patch(KUB_OP_PATH.format("write_logs")) + def test_execute_complete_failure(self, mock_write_logs, mock_clean, mock_get_pod, status): + self.gke_op._cluster_url = CLUSTER_URL + self.gke_op._ssl_ca_cert = SSL_CA_CERT + with pytest.raises(AirflowException): + self.gke_op.execute_complete( + context=mock.MagicMock(), + event={"name": "test", "status": status, "namespace": "default", "message": ""}, + cluster_url=self.gke_op._cluster_url, + ssl_ca_cert=self.gke_op._ssl_ca_cert, + ) + mock_write_logs.assert_called_once() + + @mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod") + @mock.patch(KUB_OP_PATH.format("_clean")) + @mock.patch(KUB_OP_PATH.format("write_logs")) + def test_execute_complete_success(self, mock_write_logs, mock_clean, mock_get_pod): + self.gke_op._cluster_url = CLUSTER_URL + self.gke_op._ssl_ca_cert = SSL_CA_CERT + self.gke_op.execute_complete( + context=mock.MagicMock(), + event={"name": "test", "status": "success", "namespace": "default"}, + cluster_url=self.gke_op._cluster_url, + ssl_ca_cert=self.gke_op._ssl_ca_cert, + ) + mock_write_logs.assert_called_once() + + @mock.patch(KUB_OP_PATH.format("pod_manager")) + @mock.patch( + "airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator.invoke_defer_method" + ) + @mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod") + @mock.patch(KUB_OP_PATH.format("_clean")) + def test_execute_complete_running( + self, mock_clean, mock_get_pod, mock_invoke_defer_method, mock_pod_manager + ): + self.gke_op._cluster_url = CLUSTER_URL + self.gke_op._ssl_ca_cert = SSL_CA_CERT + self.gke_op.execute_complete( + context=mock.MagicMock(), + event={"name": "test", "status": "running", "namespace": "default"}, + cluster_url=self.gke_op._cluster_url, + ssl_ca_cert=self.gke_op._ssl_ca_cert, + ) + mock_pod_manager.fetch_container_logs.assert_called_once() + mock_invoke_defer_method.assert_called_once() + class TestGKEStartJobOperator: def setup_method(self): From 3c6013299c036749799e77ac44e3072e5c325b64 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Sat, 25 May 2024 01:09:59 +0530 Subject: [PATCH 3/5] Remove deprecated code --- airflow/providers/google/cloud/operators/kubernetes_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py b/airflow/providers/google/cloud/operators/kubernetes_engine.py index 260a42e567558..f9a75d8b28b1d 100644 --- a/airflow/providers/google/cloud/operators/kubernetes_engine.py +++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py @@ -805,7 +805,7 @@ def execute_complete(self, context: Context, event: dict, **kwargs): self._cluster_url = kwargs["cluster_url"] self._ssl_ca_cert = kwargs["ssl_ca_cert"] - return super().execute_complete(context, event, **kwargs) + return super().trigger_reentry(context, event) class GKEStartJobOperator(KubernetesJobOperator): From c4a2163ca82eca0389f97a9e419eeab384b6a84b Mon Sep 17 00:00:00 2001 From: Pankaj Date: Sat, 25 May 2024 01:43:58 +0530 Subject: [PATCH 4/5] Fix tests --- .../google/cloud/operators/test_kubernetes_engine.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/providers/google/cloud/operators/test_kubernetes_engine.py b/tests/providers/google/cloud/operators/test_kubernetes_engine.py index fd9a7b41e7185..ab0df22d79a8c 100644 --- a/tests/providers/google/cloud/operators/test_kubernetes_engine.py +++ b/tests/providers/google/cloud/operators/test_kubernetes_engine.py @@ -707,7 +707,7 @@ def test_async_create_pod_should_execute_successfully( @pytest.mark.parametrize("status", ["error", "failed", "timeout"]) @mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod") @mock.patch(KUB_OP_PATH.format("_clean")) - @mock.patch(KUB_OP_PATH.format("write_logs")) + @mock.patch(KUB_OP_PATH.format("_write_logs")) def test_execute_complete_failure(self, mock_write_logs, mock_clean, mock_get_pod, status): self.gke_op._cluster_url = CLUSTER_URL self.gke_op._ssl_ca_cert = SSL_CA_CERT @@ -722,7 +722,7 @@ def test_execute_complete_failure(self, mock_write_logs, mock_clean, mock_get_po @mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod") @mock.patch(KUB_OP_PATH.format("_clean")) - @mock.patch(KUB_OP_PATH.format("write_logs")) + @mock.patch(KUB_OP_PATH.format("_write_logs")) def test_execute_complete_success(self, mock_write_logs, mock_clean, mock_get_pod): self.gke_op._cluster_url = CLUSTER_URL self.gke_op._ssl_ca_cert = SSL_CA_CERT @@ -740,8 +740,9 @@ def test_execute_complete_success(self, mock_write_logs, mock_clean, mock_get_po ) @mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod") @mock.patch(KUB_OP_PATH.format("_clean")) + @mock.patch("airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator.hook") def test_execute_complete_running( - self, mock_clean, mock_get_pod, mock_invoke_defer_method, mock_pod_manager + self, mock_gke_hook, mock_clean, mock_get_pod, mock_invoke_defer_method, mock_pod_manager ): self.gke_op._cluster_url = CLUSTER_URL self.gke_op._ssl_ca_cert = SSL_CA_CERT From 58e159b7d0d03435edd5c73000d72ffd76715d12 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Sat, 25 May 2024 01:59:12 +0530 Subject: [PATCH 5/5] Fix tests --- .../google/cloud/operators/test_kubernetes_engine.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/providers/google/cloud/operators/test_kubernetes_engine.py b/tests/providers/google/cloud/operators/test_kubernetes_engine.py index ab0df22d79a8c..2e27db59b3de6 100644 --- a/tests/providers/google/cloud/operators/test_kubernetes_engine.py +++ b/tests/providers/google/cloud/operators/test_kubernetes_engine.py @@ -707,8 +707,9 @@ def test_async_create_pod_should_execute_successfully( @pytest.mark.parametrize("status", ["error", "failed", "timeout"]) @mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod") @mock.patch(KUB_OP_PATH.format("_clean")) + @mock.patch("airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator.hook") @mock.patch(KUB_OP_PATH.format("_write_logs")) - def test_execute_complete_failure(self, mock_write_logs, mock_clean, mock_get_pod, status): + def test_execute_complete_failure(self, mock_write_logs, mock_gke_hook, mock_clean, mock_get_pod, status): self.gke_op._cluster_url = CLUSTER_URL self.gke_op._ssl_ca_cert = SSL_CA_CERT with pytest.raises(AirflowException): @@ -720,10 +721,11 @@ def test_execute_complete_failure(self, mock_write_logs, mock_clean, mock_get_po ) mock_write_logs.assert_called_once() + @mock.patch("airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator.hook") @mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod") @mock.patch(KUB_OP_PATH.format("_clean")) @mock.patch(KUB_OP_PATH.format("_write_logs")) - def test_execute_complete_success(self, mock_write_logs, mock_clean, mock_get_pod): + def test_execute_complete_success(self, mock_write_logs, mock_clean, mock_get_pod, mock_gke_hook): self.gke_op._cluster_url = CLUSTER_URL self.gke_op._ssl_ca_cert = SSL_CA_CERT self.gke_op.execute_complete(