diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index 4dc9051c02559..0b53a1c6f8522 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -23,11 +23,12 @@ import io import logging import os +import time from collections.abc import Callable, Generator, Iterator from contextlib import suppress from datetime import datetime from enum import Enum -from itertools import chain, islice +from itertools import chain from pathlib import Path from types import GeneratorType from typing import IO, TYPE_CHECKING, TypedDict, cast @@ -40,7 +41,6 @@ from airflow.configuration import conf from airflow.executors.executor_loader import ExecutorLoader from airflow.utils.helpers import parse_template_string, render_template -from airflow.utils.log.log_stream_accumulator import LogStreamAccumulator from airflow.utils.log.logging_mixin import SetContextPropagate from airflow.utils.log.non_caching_file_handler import NonCachingRotatingFileHandler from airflow.utils.session import NEW_SESSION, provide_session @@ -65,6 +65,9 @@ """ HEAP_DUMP_SIZE = 5000 HALF_HEAP_DUMP_SIZE = HEAP_DUMP_SIZE // 2 +# Time-based flushing configuration +FLUSH_INTERVAL_SECONDS = 2.0 # Flush logs every 2 seconds if there are any records +MIN_RECORDS_FOR_TIME_FLUSH = 1 # Minimum records to trigger time-based flush # These types are similar, but have distinct names to make processing them less error prone LogMessages: TypeAlias = list[str] @@ -103,7 +106,7 @@ class LogMetadata(TypedDict): """Metadata about the log fetching process, including `end_of_log` and `log_pos`.""" end_of_log: bool - log_pos: NotRequired[int] + log_pos: NotRequired[int] # Remove a # the following attributes are used for Elasticsearch and OpenSearch log handlers offset: NotRequired[str | int] # Ensure a string here. Large offset numbers will get JSON.parsed incorrectly @@ -198,6 +201,41 @@ def _parse_timestamp(line: str): return pendulum.parse(timestamp_str.strip("[]")) +def stream_file_until_close( + file_path: Path, poll_interval: float = 0.1, idle_timeout: float = 10.0 +) -> RawLogStream: + """ + Stream lines from a file until it is closed. + + :param file_path: Path to the file to stream. + :param poll_interval: how often to check for new data + :param idle_timeout: how long to wait with no growth before stopping + """ + with open(file_path) as f: + last_size = os.stat(file_path).st_size + idle_start = None + + while True: + line = f.readline() + if line: + yield line + idle_start = None + last_size = os.stat(file_path).st_size + else: + time.sleep(poll_interval) + st = os.stat(file_path) + + # If file stopped growing, start idle timer + if st.st_size == last_size: + if idle_start is None: + idle_start = time.time() + elif time.time() - idle_start >= idle_timeout: + break # no growth for too long, assume writer closed + else: + idle_start = None + last_size = st.st_size + + def _stream_lines_by_chunk( log_io: IO[str], ) -> RawLogStream: @@ -357,6 +395,9 @@ def _interleave_logs(*log_streams: RawLogStream) -> StructuredLogStream: By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we can reduce the chance of messing up the global order. Since there are multiple log streams, we can't guarantee that the records are in global order. + Additionally, implements time-based flushing to prevent frontend delays when only a few records + are added over long time intervals. + e.g. log_stream1: ---------- @@ -374,15 +415,33 @@ def _interleave_logs(*log_streams: RawLogStream) -> StructuredLogStream: parsed_log_streams: dict[int, ParsedLogStream] = { idx: _log_stream_to_parsed_log_stream(log_stream) for idx, log_stream in enumerate(log_streams) } + last_flush_time = time.time() # keep adding records from logs until all logs are empty last_log_container: list[StructuredLogMessage | None] = [None] while parsed_log_streams: _add_log_from_parsed_log_streams_to_heap(heap, parsed_log_streams) - # yield HALF_HEAP_DUMP_SIZE records when heap size exceeds HEAP_DUMP_SIZE - if len(heap) >= HEAP_DUMP_SIZE: + current_time = time.time() + time_since_last_flush = current_time - last_flush_time + + # Check if we should flush based on heap size or time interval + should_flush_by_size = len(heap) >= HEAP_DUMP_SIZE + should_flush_by_time = ( + len(heap) >= MIN_RECORDS_FOR_TIME_FLUSH and time_since_last_flush >= FLUSH_INTERVAL_SECONDS + ) + + if should_flush_by_size: + # Size-based flush: yield HALF_HEAP_DUMP_SIZE records when heap size exceeds HEAP_DUMP_SIZE yield from _flush_logs_out_of_heap(heap, HALF_HEAP_DUMP_SIZE, last_log_container) + last_flush_time = current_time + elif should_flush_by_time: + # Time-based flush: yield all available records to prevent frontend delays + flush_count = min( + len(heap), HALF_HEAP_DUMP_SIZE + ) # Don't flush more than half to maintain ordering + yield from _flush_logs_out_of_heap(heap, flush_count, last_log_container) + last_flush_time = current_time # yield remaining records yield from _flush_logs_out_of_heap(heap, len(heap), last_log_container) @@ -586,16 +645,11 @@ def _read( :param metadata: log metadata, can be used for steaming log reading and auto-tailing. Following attributes are used: - log_pos: (absolute) Char position to which the log - which was retrieved in previous calls, this - part will be skipped and only following test - returned to be added to tail. :return: log message as a string and metadata. Following attributes are used in metadata: end_of_log: Boolean, True if end of log is reached or False if further calls might get more log text. This is determined by the status of the TaskInstance - log_pos: (absolute) Char position to which the log is retrieved """ # Task instance here might be different from task instance when # initializing the handler. Thus explicitly getting log location @@ -613,10 +667,7 @@ def _read( remote_logs = [] elif isinstance(logs, list) and isinstance(logs[0], str): # If the logs are in legacy format, convert them to a generator of log lines - remote_logs = [ - # We don't need to use the log_pos here, as we are using the metadata to track the position - _get_compatible_log_stream(cast("list[str]", logs)) - ] + remote_logs = [_get_compatible_log_stream(cast("list[str]", logs))] elif isinstance(logs, list) and _is_logs_stream_like(logs[0]): # If the logs are already in a stream-like format, we can use them directly remote_logs = cast("list[RawLogStream]", logs) @@ -669,23 +720,10 @@ def _read( TaskInstanceState.DEFERRED, ) - with LogStreamAccumulator(out_stream, HEAP_DUMP_SIZE) as stream_accumulator: - log_pos = stream_accumulator.total_lines - out_stream = stream_accumulator.stream + return chain(header, out_stream), { + "end_of_log": end_of_log, + } - # skip log stream until the last position - if metadata and "log_pos" in metadata: - islice(out_stream, metadata["log_pos"]) - else: - # first time reading log, add messages before interleaved log stream - out_stream = chain(header, out_stream) - - return out_stream, { - "end_of_log": end_of_log, - "log_pos": log_pos, - } - - @staticmethod @staticmethod def _get_pod_namespace(ti: TaskInstance | TaskInstanceHistory): pod_override = ti.executor_config.get("pod_override") @@ -857,7 +895,7 @@ def _read_from_local( for path in paths: sources.append(os.fspath(path)) # Read the log file and yield lines - log_streams.append(_stream_lines_by_chunk(open(path, encoding="utf-8"))) + log_streams.append(stream_file_until_close(path)) return sources, log_streams def _read_from_logs_server( diff --git a/airflow-core/src/airflow/utils/log/log_reader.py b/airflow-core/src/airflow/utils/log/log_reader.py index 3dc61342afaf9..5431ae87d9559 100644 --- a/airflow-core/src/airflow/utils/log/log_reader.py +++ b/airflow-core/src/airflow/utils/log/log_reader.py @@ -93,7 +93,7 @@ def read_log_stream( if try_number is None: try_number = ti.try_number - for key in ("end_of_log", "max_offset", "offset", "log_pos"): + for key in ("end_of_log", "max_offset", "offset"): # https://mypy.readthedocs.io/en/stable/typed_dict.html#supported-operations metadata.pop(key, None) # type: ignore[misc] empty_iterations = 0