From 746aa888091eee0959d1e597d9b28cf4ba907ede Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Fri, 15 Sep 2023 23:51:22 +0200 Subject: [PATCH 1/4] Fix parsing KubernetesPodOperator multiline logs --- .../cncf/kubernetes/utils/pod_manager.py | 32 ++++++++++++------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 142528ddad639..08cd1460efc64 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -413,14 +413,30 @@ def consume_logs( follow=follow, post_termination_timeout=termination_timeout, ) + message_to_log = None + message_timestamp = None for raw_line in logs: line = raw_line.decode("utf-8", errors="backslashreplace") line_timestamp, message = self.parse_log_line(line) - if self._progress_callback: - self._progress_callback(line) - if line_timestamp is not None: - last_captured_timestamp = line_timestamp - self.log.info("[%s] %s", container_name, message) + if line_timestamp: # detect new log line + if message_to_log is None: # first line in the log + message_to_log = message + message_timestamp = line_timestamp + else: # previous log line is complete + if self._progress_callback: + self._progress_callback(message_to_log or "") + self.log.info("[%s] %s", container_name, message_to_log) + last_captured_timestamp = message_timestamp + message_to_log = message + message_timestamp = line_timestamp + else: # continuation of the previous log line + message_to_log = f"{message_to_log}\n{message}" + + # log the last line and update the last_captured_timestamp + if self._progress_callback: + self._progress_callback(message_to_log or "") + self.log.info("[%s] %s", container_name, message_to_log) + last_captured_timestamp = message_timestamp except BaseHTTPError as e: self.log.warning( "Reading of logs interrupted for container %r with error %r; will retry. " @@ -560,16 +576,10 @@ def parse_log_line(self, line: str) -> tuple[DateTime | None, str]: """ timestamp, sep, message = line.strip().partition(" ") if not sep: - self.log.error( - "Error parsing timestamp (no timestamp in message %r). " - "Will continue execution but won't update timestamp", - line, - ) return None, line try: last_log_time = cast(DateTime, pendulum.parse(timestamp)) except ParserError: - self.log.error("Error parsing timestamp. Will continue execution but won't update timestamp") return None, line return last_log_time, message From 1fa57ea464f60bc25ee3a68613b411f16de275b8 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sat, 16 Sep 2023 22:03:23 +0200 Subject: [PATCH 2/4] make it b/c with progress_callback --- airflow/providers/cncf/kubernetes/utils/pod_manager.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 08cd1460efc64..92fc9f711be15 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -415,6 +415,7 @@ def consume_logs( ) message_to_log = None message_timestamp = None + progress_callback_lines = [] for raw_line in logs: line = raw_line.decode("utf-8", errors="backslashreplace") line_timestamp, message = self.parse_log_line(line) @@ -422,19 +423,24 @@ def consume_logs( if message_to_log is None: # first line in the log message_to_log = message message_timestamp = line_timestamp + progress_callback_lines.append(line) else: # previous log line is complete if self._progress_callback: - self._progress_callback(message_to_log or "") + for line in progress_callback_lines: + self._progress_callback(line) self.log.info("[%s] %s", container_name, message_to_log) last_captured_timestamp = message_timestamp message_to_log = message message_timestamp = line_timestamp + progress_callback_lines = [line] else: # continuation of the previous log line message_to_log = f"{message_to_log}\n{message}" + progress_callback_lines.append(line) # log the last line and update the last_captured_timestamp if self._progress_callback: - self._progress_callback(message_to_log or "") + for line in progress_callback_lines: + self._progress_callback(line) self.log.info("[%s] %s", container_name, message_to_log) last_captured_timestamp = message_timestamp except BaseHTTPError as e: From 7af486c85d930282323880da41e121171fd42ed8 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sat, 16 Sep 2023 22:03:58 +0200 Subject: [PATCH 3/4] add a unit test --- .../cncf/kubernetes/utils/test_pod_manager.py | 27 ++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index dfe06d9a74648..304029114fb9b 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -284,11 +284,30 @@ def test_fetch_container_logs_invoke_progress_callback( self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock(), follow=True) self.mock_progress_callback.assert_has_calls([mock.call(message), mock.call(no_ts_message)]) - def test_parse_invalid_log_line(self, caplog): + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs") + def test_parse_multi_line_logs(self, mock_read_pod_logs, mock_container_is_running, caplog): + log = ( + "2020-10-08T14:16:17.793417674Z message1 line1\n" + "message1 line2\n" + "message1 line3\n" + "2020-10-08T14:16:18.793417674Z message2 line1\n" + "message2 line2\n" + "2020-10-08T14:16:19.793417674Z message3 line1\n" + ) + mock_read_pod_logs.return_value = [bytes(log_line, "utf-8") for log_line in log.split("\n")] + mock_container_is_running.return_value = False + with caplog.at_level(logging.INFO): - self.pod_manager.parse_log_line("2020-10-08T14:16:17.793417674ZInvalidmessage\n") - assert "Invalidmessage" in caplog.text - assert "no timestamp in message" in caplog.text + self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock(), follow=True) + + assert "message1 line1" in caplog.text + assert "message1 line2" in caplog.text + assert "message1 line3" in caplog.text + assert "message2 line1" in caplog.text + assert "message2 line2" in caplog.text + assert "message3 line1" in caplog.text + assert "ERROR" not in caplog.text @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async") def test_start_pod_retries_on_409_error(self, mock_run_pod_async): From 905b36a64461c2ae0772810b6bd12c0ecdb4d2f2 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 17 Sep 2023 16:33:29 +0200 Subject: [PATCH 4/4] fix integration test --- kubernetes_tests/test_kubernetes_pod_operator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index 899bcf1f6f36d..26afb70a97c15 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -518,7 +518,7 @@ def test_volume_mount(self, mock_get_connection): ) context = create_context(k) k.execute(context=context) - mock_logger.info.assert_any_call("[%s] %s", "base", "retrieved from mount") + mock_logger.info.assert_any_call("[%s] %s", "base", "retrieved from mount\n") actual_pod = self.api_client.sanitize_for_serialization(k.pod) self.expected_pod["spec"]["containers"][0]["args"] = args self.expected_pod["spec"]["containers"][0]["volumeMounts"] = [