From bebda17380b2e12952f75cff5c398f8a4acf3ca0 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Fri, 16 May 2025 16:27:33 -0600 Subject: [PATCH 1/3] Stop streaming task logs if end of log mark is missing Sometimes, somehow, the end of log mark can be missing, and when that happens the streaming log reader enters an infinite loop. Instead, if the task is in a non-running state and we stop receiving log lines but never get the end of log mark, we assume we won't and stop trying. We do tell emit that we are stopping though. --- airflow/utils/log/log_reader.py | 9 +++++++++ tests/utils/log/test_log_reader.py | 21 +++++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py index ad61a139086c3..ef45d37b67f2e 100644 --- a/airflow/utils/log/log_reader.py +++ b/airflow/utils/log/log_reader.py @@ -39,6 +39,9 @@ class TaskLogReader: STREAM_LOOP_SLEEP_SECONDS = 1 """Time to sleep between loops while waiting for more logs""" + STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS = 5 + """Number of empty loop iterations before stopping the stream""" + def read_log_chunks( self, ti: TaskInstance, try_number: int | None, metadata ) -> tuple[list[tuple[tuple[str, str]]], dict[str, str]]: @@ -83,6 +86,7 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di metadata.pop("max_offset", None) metadata.pop("offset", None) metadata.pop("log_pos", None) + empty_iterations = 0 while True: logs, metadata = self.read_log_chunks(ti, current_try_number, metadata) for host, log in logs[0]: @@ -95,6 +99,11 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di # we did not receive any logs in this loop # sleeping to conserve resources / limit requests on external services time.sleep(self.STREAM_LOOP_SLEEP_SECONDS) + empty_iterations += 1 + if empty_iterations >= self.STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS: + # we have not received any logs for a while, so we stop the stream + yield "\n(Log stream stopped - End of log marker not found, so logs may be incomplete)\n" + break else: break diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py index d4417bfba8220..0f26687ad3536 100644 --- a/tests/utils/log/test_log_reader.py +++ b/tests/utils/log/test_log_reader.py @@ -225,6 +225,27 @@ def test_read_log_stream_should_read_each_try_in_turn(self, mock_read): any_order=False, ) + @mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.read") + def test_read_log_stream_no_end_of_log_marker(self, mock_read): + mock_read.side_effect = [ + ([[("", "hello")]], [{"end_of_log": False}]), + ([[]], [{"end_of_log": False}]), + ([[]], [{"end_of_log": False}]), + ([[]], [{"end_of_log": False}]), + ([[]], [{"end_of_log": False}]), + ([[]], [{"end_of_log": False}]), + ] + + self.ti.state = TaskInstanceState.SUCCESS + task_log_reader = TaskLogReader() + task_log_reader.STREAM_LOOP_SLEEP_SECONDS = 0.001 # to speed up the test + log_stream = task_log_reader.read_log_stream(ti=self.ti, try_number=1, metadata={}) + assert list(log_stream) == [ + "\nhello\n", + "\n(Log stream stopped - End of log marker not found, so logs may be incomplete)\n", + ] + assert mock_read.call_count == 6 + def test_supports_external_link(self): task_log_reader = TaskLogReader() From c0778b698d4787baf2df4b790495f8f087d31015 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Wed, 18 Jun 2025 14:06:31 -0600 Subject: [PATCH 2/3] Reset empty iterations --- airflow/utils/log/log_reader.py | 10 ++++++---- tests/utils/log/test_log_reader.py | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py index ef45d37b67f2e..f52644a78f340 100644 --- a/airflow/utils/log/log_reader.py +++ b/airflow/utils/log/log_reader.py @@ -95,17 +95,19 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di not metadata["end_of_log"] and ti.state not in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) ): - if not logs[0]: + if logs[0]: + empty_iterations = 0 + else: # we did not receive any logs in this loop # sleeping to conserve resources / limit requests on external services time.sleep(self.STREAM_LOOP_SLEEP_SECONDS) empty_iterations += 1 if empty_iterations >= self.STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS: # we have not received any logs for a while, so we stop the stream - yield "\n(Log stream stopped - End of log marker not found, so logs may be incomplete)\n" - break + yield "\n(Log stream stopped - End of log marker not found; logs may be incomplete.)\n" + return else: - break + return @cached_property def log_handler(self): diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py index 0f26687ad3536..598ed8bcbba35 100644 --- a/tests/utils/log/test_log_reader.py +++ b/tests/utils/log/test_log_reader.py @@ -242,7 +242,7 @@ def test_read_log_stream_no_end_of_log_marker(self, mock_read): log_stream = task_log_reader.read_log_stream(ti=self.ti, try_number=1, metadata={}) assert list(log_stream) == [ "\nhello\n", - "\n(Log stream stopped - End of log marker not found, so logs may be incomplete)\n", + "\n(Log stream stopped - End of log marker not found; logs may be incomplete.)\n", ] assert mock_read.call_count == 6 From 093960a6444a79ba295a7f28ad5e5ac1c935fbae Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Thu, 19 Jun 2025 23:16:29 -0600 Subject: [PATCH 3/3] Return != break when we have another loop! --- airflow/utils/log/log_reader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py index f52644a78f340..d863999617810 100644 --- a/airflow/utils/log/log_reader.py +++ b/airflow/utils/log/log_reader.py @@ -105,9 +105,9 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di if empty_iterations >= self.STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS: # we have not received any logs for a while, so we stop the stream yield "\n(Log stream stopped - End of log marker not found; logs may be incomplete.)\n" - return + break else: - return + break @cached_property def log_handler(self):