diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py index ad61a139086c3..d863999617810 100644 --- a/airflow/utils/log/log_reader.py +++ b/airflow/utils/log/log_reader.py @@ -39,6 +39,9 @@ class TaskLogReader: STREAM_LOOP_SLEEP_SECONDS = 1 """Time to sleep between loops while waiting for more logs""" + STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS = 5 + """Number of empty loop iterations before stopping the stream""" + def read_log_chunks( self, ti: TaskInstance, try_number: int | None, metadata ) -> tuple[list[tuple[tuple[str, str]]], dict[str, str]]: @@ -83,6 +86,7 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di metadata.pop("max_offset", None) metadata.pop("offset", None) metadata.pop("log_pos", None) + empty_iterations = 0 while True: logs, metadata = self.read_log_chunks(ti, current_try_number, metadata) for host, log in logs[0]: @@ -91,10 +95,17 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di not metadata["end_of_log"] and ti.state not in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) ): - if not logs[0]: + if logs[0]: + empty_iterations = 0 + else: # we did not receive any logs in this loop # sleeping to conserve resources / limit requests on external services time.sleep(self.STREAM_LOOP_SLEEP_SECONDS) + empty_iterations += 1 + if empty_iterations >= self.STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS: + # we have not received any logs for a while, so we stop the stream + yield "\n(Log stream stopped - End of log marker not found; logs may be incomplete.)\n" + break else: break diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py index d4417bfba8220..598ed8bcbba35 100644 --- a/tests/utils/log/test_log_reader.py +++ b/tests/utils/log/test_log_reader.py @@ -225,6 +225,27 @@ def test_read_log_stream_should_read_each_try_in_turn(self, mock_read): any_order=False, ) + @mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.read") + def test_read_log_stream_no_end_of_log_marker(self, mock_read): + mock_read.side_effect = [ + ([[("", "hello")]], [{"end_of_log": False}]), + ([[]], [{"end_of_log": False}]), + ([[]], [{"end_of_log": False}]), + ([[]], [{"end_of_log": False}]), + ([[]], [{"end_of_log": False}]), + ([[]], [{"end_of_log": False}]), + ] + + self.ti.state = TaskInstanceState.SUCCESS + task_log_reader = TaskLogReader() + task_log_reader.STREAM_LOOP_SLEEP_SECONDS = 0.001 # to speed up the test + log_stream = task_log_reader.read_log_stream(ti=self.ti, try_number=1, metadata={}) + assert list(log_stream) == [ + "\nhello\n", + "\n(Log stream stopped - End of log marker not found; logs may be incomplete.)\n", + ] + assert mock_read.call_count == 6 + def test_supports_external_link(self): task_log_reader = TaskLogReader()