diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 05829a16154fe..65a264eeeeacb 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -408,25 +408,47 @@ def consume_logs( """ last_captured_timestamp = None try: - if not logs: - logs = self.read_pod_logs( - pod=pod, - container_name=container_name, - timestamps=True, - since_seconds=( - math.ceil((pendulum.now() - since_time).total_seconds()) if since_time else None - ), - follow=follow, - post_termination_timeout=post_termination_timeout, - ) - for raw_line in logs: - line = raw_line.decode("utf-8", errors="backslashreplace") - line_timestamp, message = self.parse_log_line(line) + logs = self.read_pod_logs( + pod=pod, + container_name=container_name, + timestamps=True, + since_seconds=( + math.ceil((pendulum.now() - since_time).total_seconds()) if since_time else None + ), + follow=follow, + post_termination_timeout=post_termination_timeout, + ) + message_to_log = None + message_timestamp = None + progress_callback_lines = [] + try: + for raw_line in logs: + line = raw_line.decode("utf-8", errors="backslashreplace") + line_timestamp, message = self.parse_log_line(line) + if line_timestamp: # detect new log line + if message_to_log is None: # first line in the log + message_to_log = message + message_timestamp = line_timestamp + progress_callback_lines.append(line) + else: # previous log line is complete + if self._progress_callback: + for line in progress_callback_lines: + self._progress_callback(line) + self.log.info("[%s] %s", container_name, message_to_log) + last_captured_timestamp = message_timestamp + message_to_log = message + message_timestamp = line_timestamp + progress_callback_lines = [line] + else: # continuation of the previous log line + message_to_log = f"{message_to_log}\n{message}" + progress_callback_lines.append(line) + finally: + # log the last line and update the last_captured_timestamp if self._progress_callback: - self._progress_callback(line) - if line_timestamp is not None: - last_captured_timestamp = line_timestamp - self.log.info("[%s] %s", container_name, message) + for line in progress_callback_lines: + self._progress_callback(line) + self.log.info("[%s] %s", container_name, message_to_log) + last_captured_timestamp = message_timestamp except BaseHTTPError as e: self.log.warning( "Reading of logs interrupted for container %r with error %r; will retry. " @@ -570,16 +592,10 @@ def parse_log_line(self, line: str) -> tuple[DateTime | None, str]: """ timestamp, sep, message = line.strip().partition(" ") if not sep: - self.log.error( - "Error parsing timestamp (no timestamp in message %r). " - "Will continue execution but won't update timestamp", - line, - ) return None, line try: last_log_time = cast(DateTime, pendulum.parse(timestamp)) except ParserError: - self.log.error("Error parsing timestamp. Will continue execution but won't update timestamp") return None, line return last_log_time, message diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index 2a3803d6c1637..6b56bc9cd2a60 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -518,7 +518,7 @@ def test_volume_mount(self, mock_get_connection): ) context = create_context(k) k.execute(context=context) - mock_logger.info.assert_any_call("[%s] %s", "base", "retrieved from mount") + mock_logger.info.assert_any_call("[%s] %s", "base", "retrieved from mount\n") actual_pod = self.api_client.sanitize_for_serialization(k.pod) self.expected_pod["spec"]["containers"][0]["args"] = args self.expected_pod["spec"]["containers"][0]["volumeMounts"] = [ diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index 42e2a5c6104da..d0e54088019cb 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -309,11 +309,30 @@ def consumer_iter(): assert status.last_log_time == cast(DateTime, pendulum.parse(last_timestamp_string)) assert self.mock_progress_callback.call_count == expected_call_count - def test_parse_invalid_log_line(self, caplog): + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs") + def test_parse_multi_line_logs(self, mock_read_pod_logs, mock_container_is_running, caplog): + log = ( + "2020-10-08T14:16:17.793417674Z message1 line1\n" + "message1 line2\n" + "message1 line3\n" + "2020-10-08T14:16:18.793417674Z message2 line1\n" + "message2 line2\n" + "2020-10-08T14:16:19.793417674Z message3 line1\n" + ) + mock_read_pod_logs.return_value = [bytes(log_line, "utf-8") for log_line in log.split("\n")] + mock_container_is_running.return_value = False + with caplog.at_level(logging.INFO): - self.pod_manager.parse_log_line("2020-10-08T14:16:17.793417674ZInvalidmessage\n") - assert "Invalidmessage" in caplog.text - assert "no timestamp in message" in caplog.text + self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock(), follow=True) + + assert "message1 line1" in caplog.text + assert "message1 line2" in caplog.text + assert "message1 line3" in caplog.text + assert "message2 line1" in caplog.text + assert "message2 line2" in caplog.text + assert "message3 line1" in caplog.text + assert "ERROR" not in caplog.text @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async") def test_start_pod_retries_on_409_error(self, mock_run_pod_async):