From 49f2d3cdc161434fd041741afcefe46539f4cbe3 Mon Sep 17 00:00:00 2001 From: Freddy Demiane Date: Fri, 18 Aug 2023 16:28:14 +0000 Subject: [PATCH 1/4] Fix kpo logging not capturing the latest timestamp --- .../cncf/kubernetes/utils/pod_manager.py | 8 ++++--- .../cncf/kubernetes/utils/test_pod_manager.py | 21 +++++++++++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 650be681d3ee8..93bcc31a0911d 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -383,7 +383,7 @@ def consume_logs( Returns the last timestamp observed in logs. """ - timestamp = None + last_captured_timestamp = None try: logs = self.read_pod_logs( pod=pod, @@ -397,7 +397,9 @@ def consume_logs( ) for raw_line in logs: line = raw_line.decode("utf-8", errors="backslashreplace") - timestamp, message = self.parse_log_line(line) + line_timestamp, message = self.parse_log_line(line) + if line_timestamp is not None: + last_captured_timestamp = line_timestamp self.log.info("[%s] %s", container_name, message) except BaseHTTPError as e: self.log.warning( @@ -411,7 +413,7 @@ def consume_logs( pod.metadata.name, exc_info=True, ) - return timestamp or since_time + return last_captured_timestamp or since_time # note: `read_pod_logs` follows the logs, so we shouldn't necessarily *need* to # loop as we do here. But in a long-running process we might temporarily lose connectivity. diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index c915a03a4aa08..d117530e0398a 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -19,6 +19,7 @@ import logging from datetime import datetime from json.decoder import JSONDecodeError +from typing import cast from unittest import mock from unittest.mock import MagicMock @@ -252,6 +253,26 @@ def test_parse_log_line(self): assert timestamp == pendulum.parse(real_timestamp) assert line == log_message + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs") + def test_fetch_container_logs_returning_last_timestamp(self, mock_read_pod_logs): + timestamp_string = "2020-10-08T14:16:17.793417674Z" + + mock_read_pod_logs.return_value = [bytes(f"{timestamp_string} message", "utf-8"), b"notimestamp"] + + self.first_time = True + + def mock_container_is_running(pod, container_name): + if self.first_time: + self.first_time = False + return True + return False + + self.pod_manager.container_is_running = mock_container_is_running + + status = self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock()) + + assert status.last_log_time == cast(DateTime, pendulum.parse(timestamp_string)) + def test_parse_invalid_log_line(self, caplog): with caplog.at_level(logging.INFO): self.pod_manager.parse_log_line("2020-10-08T14:16:17.793417674ZInvalidmessage\n") From b5b281884bbea8349d98614e2b37c37f2cf06abf Mon Sep 17 00:00:00 2001 From: Freddy Demiane Date: Tue, 22 Aug 2023 09:09:50 +0000 Subject: [PATCH 2/4] Fix PR comments --- .../cncf/kubernetes/utils/test_pod_manager.py | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index d117530e0398a..c7932e895bf25 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -253,21 +253,21 @@ def test_parse_log_line(self): assert timestamp == pendulum.parse(real_timestamp) assert line == log_message + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs") - def test_fetch_container_logs_returning_last_timestamp(self, mock_read_pod_logs): - timestamp_string = "2020-10-08T14:16:17.793417674Z" - - mock_read_pod_logs.return_value = [bytes(f"{timestamp_string} message", "utf-8"), b"notimestamp"] - - self.first_time = True - - def mock_container_is_running(pod, container_name): - if self.first_time: - self.first_time = False + def test_fetch_container_logs_returning_last_timestamp( + self, mock_read_pod_logs, mock_container_is_running + ): + def mock_container_is_running_func(pod, container_name): + if mock_container_is_running.first_time: + mock_container_is_running.first_time = False return True return False - self.pod_manager.container_is_running = mock_container_is_running + mock_container_is_running.first_time = True + mock_container_is_running.side_effect = mock_container_is_running_func + timestamp_string = "2020-10-08T14:16:17.793417674Z" + mock_read_pod_logs.return_value = [bytes(f"{timestamp_string} message", "utf-8"), b"notimestamp"] status = self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock()) From 2a7d8038d99903b864fe9777acacd0de1b09ee2a Mon Sep 17 00:00:00 2001 From: Freddy Demiane Date: Tue, 22 Aug 2023 11:13:11 +0000 Subject: [PATCH 3/4] Fix PR comments --- .../providers/cncf/kubernetes/utils/test_pod_manager.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index c7932e895bf25..abcad9df1b1ff 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -258,16 +258,9 @@ def test_parse_log_line(self): def test_fetch_container_logs_returning_last_timestamp( self, mock_read_pod_logs, mock_container_is_running ): - def mock_container_is_running_func(pod, container_name): - if mock_container_is_running.first_time: - mock_container_is_running.first_time = False - return True - return False - - mock_container_is_running.first_time = True - mock_container_is_running.side_effect = mock_container_is_running_func timestamp_string = "2020-10-08T14:16:17.793417674Z" mock_read_pod_logs.return_value = [bytes(f"{timestamp_string} message", "utf-8"), b"notimestamp"] + mock_container_is_running.side_effect = [True, False] status = self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock()) From 6d1d49e94016d0b86a5a91bbb2e377ce9be396b5 Mon Sep 17 00:00:00 2001 From: Freddy Demiane Date: Tue, 22 Aug 2023 11:47:29 +0000 Subject: [PATCH 4/4] Fix PR comments --- tests/providers/cncf/kubernetes/utils/test_pod_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index abcad9df1b1ff..d11c8b7d41d3c 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -262,7 +262,7 @@ def test_fetch_container_logs_returning_last_timestamp( mock_read_pod_logs.return_value = [bytes(f"{timestamp_string} message", "utf-8"), b"notimestamp"] mock_container_is_running.side_effect = [True, False] - status = self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock()) + status = self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock(), follow=True) assert status.last_log_time == cast(DateTime, pendulum.parse(timestamp_string))