From 823ea4a5407961e84aa82c9fae4b0f8a8e226c4f Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Wed, 17 May 2023 12:23:04 +0200 Subject: [PATCH 1/4] Wait for a pod terminated state before triggering a success event --- .../providers/cncf/kubernetes/triggers/pod.py | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py b/airflow/providers/cncf/kubernetes/triggers/pod.py index faac3e30bc30c..27307ee6b0228 100644 --- a/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -133,15 +133,19 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] self.log.debug("Container %s status: %s", self.base_container_name, container_state) if container_state == ContainerState.TERMINATED: - yield TriggerEvent( - { - "name": self.pod_name, - "namespace": self.pod_namespace, - "status": "success", - "message": "All containers inside pod have started successfully.", - } - ) - return + if pod_status not in PodPhase.terminal_states: + self.log.info("Pod %s is still running. Sleeping for %s seconds.", self.poll_interval) + await asyncio.sleep(self.poll_interval) + else: + yield TriggerEvent( + { + "name": self.pod_name, + "namespace": self.pod_namespace, + "status": "success", + "message": "All containers inside pod have started successfully.", + } + ) + return elif self.should_wait(pod_phase=pod_status, container_state=container_state): self.log.info("Container is not completed and still working.") From 215848e5062a038f47f7d70d7281e2529a0630d4 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Wed, 17 May 2023 12:49:46 +0200 Subject: [PATCH 2/4] fix a bug in log format and fix tests --- airflow/providers/cncf/kubernetes/triggers/pod.py | 6 +++++- tests/providers/cncf/kubernetes/triggers/test_pod.py | 3 ++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py b/airflow/providers/cncf/kubernetes/triggers/pod.py index 27307ee6b0228..6c7036effaa45 100644 --- a/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -134,7 +134,11 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] if container_state == ContainerState.TERMINATED: if pod_status not in PodPhase.terminal_states: - self.log.info("Pod %s is still running. Sleeping for %s seconds.", self.poll_interval) + self.log.info( + "Pod %s is still running. Sleeping for %s seconds.", + self.pod_name, + self.poll_interval, + ) await asyncio.sleep(self.poll_interval) else: yield TriggerEvent( diff --git a/tests/providers/cncf/kubernetes/triggers/test_pod.py b/tests/providers/cncf/kubernetes/triggers/test_pod.py index 4f568addbae4f..451e0c61fbc52 100644 --- a/tests/providers/cncf/kubernetes/triggers/test_pod.py +++ b/tests/providers/cncf/kubernetes/triggers/test_pod.py @@ -100,7 +100,8 @@ def test_serialize(self, trigger): @mock.patch(f"{TRIGGER_PATH}.define_container_state") @mock.patch(f"{TRIGGER_PATH}._get_async_hook") async def test_run_loop_return_success_event(self, mock_hook, mock_method, trigger): - mock_hook.return_value.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) + pod_mock = mock.MagicMock(**{"status.phase": "Succeeded"}) + mock_hook.return_value.get_pod.return_value = self._mock_pod_result(pod_mock) mock_method.return_value = ContainerState.TERMINATED expected_event = TriggerEvent( From 002edd1903239a788d3f9d4923f13497c11ce680 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Wed, 17 May 2023 13:28:03 +0200 Subject: [PATCH 3/4] add a unit test for the new waiting condition --- .../cncf/kubernetes/triggers/test_pod.py | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/tests/providers/cncf/kubernetes/triggers/test_pod.py b/tests/providers/cncf/kubernetes/triggers/test_pod.py index 451e0c61fbc52..f339fbab62958 100644 --- a/tests/providers/cncf/kubernetes/triggers/test_pod.py +++ b/tests/providers/cncf/kubernetes/triggers/test_pod.py @@ -116,6 +116,35 @@ async def test_run_loop_return_success_event(self, mock_hook, mock_method, trigg assert actual_event == expected_event + @pytest.mark.asyncio + @mock.patch(f"{TRIGGER_PATH}.define_container_state") + @mock.patch(f"{TRIGGER_PATH}._get_async_hook") + async def test_run_loop_wait_pod_termination_before_returning_success_event( + self, mock_hook, mock_method, trigger + ): + running_state = mock.MagicMock(**{"status.phase": "Running"}) + succeeded_state = mock.MagicMock(**{"status.phase": "Succeeded"}) + mock_hook.return_value.get_pod.side_effect = [ + self._mock_pod_result(running_state), + self._mock_pod_result(running_state), + self._mock_pod_result(succeeded_state), + ] + mock_method.return_value = ContainerState.TERMINATED + + expected_event = TriggerEvent( + { + "name": POD_NAME, + "namespace": NAMESPACE, + "status": "success", + "message": "All containers inside pod have started successfully.", + } + ) + with mock.patch.object(asyncio, "sleep") as mock_sleep: + actual_event = await (trigger.run()).asend(None) + + assert actual_event == expected_event + assert mock_sleep.call_count == 2 + @pytest.mark.asyncio @mock.patch(f"{TRIGGER_PATH}.define_container_state") @mock.patch(f"{TRIGGER_PATH}._get_async_hook") From 2c6693fe242036ede44feff568059064a78d9983 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Wed, 17 May 2023 19:29:00 +0200 Subject: [PATCH 4/4] fix tests and test the new change --- .../google/cloud/triggers/test_kubernetes_engine.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py index 3f78fb3a049f3..4c472d3c9d673 100644 --- a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py +++ b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py @@ -113,7 +113,13 @@ def test_serialize_should_execute_successfully(self, trigger): async def test_run_loop_return_success_event_should_execute_successfully( self, mock_hook, mock_method, trigger ): - mock_hook.return_value.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) + running_state = mock.MagicMock(**{"status.phase": "Running"}) + succeeded_state = mock.MagicMock(**{"status.phase": "Succeeded"}) + mock_hook.return_value.get_pod.side_effect = [ + self._mock_pod_result(running_state), + self._mock_pod_result(running_state), + self._mock_pod_result(succeeded_state), + ] mock_method.return_value = ContainerState.TERMINATED expected_event = TriggerEvent( @@ -124,9 +130,11 @@ async def test_run_loop_return_success_event_should_execute_successfully( "message": "All containers inside pod have started successfully.", } ) - actual_event = await (trigger.run()).asend(None) + with mock.patch.object(asyncio, "sleep") as mock_sleep: + actual_event = await (trigger.run()).asend(None) assert actual_event == expected_event + assert mock_sleep.call_count == 2 @pytest.mark.asyncio @mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state")