diff --git a/airflow-core/pyproject.toml b/airflow-core/pyproject.toml index 3f3cfd996e49e..588cb536ca720 100644 --- a/airflow-core/pyproject.toml +++ b/airflow-core/pyproject.toml @@ -229,8 +229,8 @@ exclude = [ "../shared/module_loading/src/airflow_shared/module_loading" = "src/airflow/_shared/module_loading" "../shared/observability/src/airflow_shared/observability" = "src/airflow/_shared/observability" "../shared/secrets_backend/src/airflow_shared/secrets_backend" = "src/airflow/_shared/secrets_backend" -"../shared/secrets_masker/src/airflow_shared/secrets_masker" = "src/airflow/_shared/secrets_masker" -"../shared/timezones/src/airflow_shared/timezones" = "src/airflow/_shared/timezones" + "../shared/secrets_masker/src/airflow_shared/secrets_masker" = "src/airflow/_shared/secrets_masker" + "../shared/timezones/src/airflow_shared/timezones" = "src/airflow/_shared/timezones" [tool.hatch.build.targets.custom] path = "./hatch_build.py" diff --git a/airflow-core/src/airflow/serialization/helpers.py b/airflow-core/src/airflow/serialization/helpers.py index 723c113709a87..c0f3a2e59d056 100644 --- a/airflow-core/src/airflow/serialization/helpers.py +++ b/airflow-core/src/airflow/serialization/helpers.py @@ -19,7 +19,8 @@ from __future__ import annotations import contextlib -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Union +from collections.abc import Sequence from airflow._shared.secrets_masker import redact from airflow.configuration import conf @@ -29,6 +30,34 @@ from airflow.timetables.base import Timetable as CoreTimetable +def truncate_rendered_value(rendered: Any, max_length: int) -> str: + """ + Truncate rendered value with a reasonable minimum length to avoid edge cases. + + Args: + rendered: The rendered value to truncate + max_length: The maximum allowed length for the output + + Returns: + Truncated string that respects the max_length constraint + """ + # Set a reasonable minimum to avoid complex edge cases with very small values + if max_length < 100: + max_length = 100 + + prefix = "Truncated. You can change this behaviour in [core]max_templated_field_length. " + suffix = "... " + + # Apply repr() to the content first to account for quotes that will be added + content_repr = repr(rendered) + available_length = max_length - len(prefix) - len(suffix) + if available_length <= 0: + return (prefix + suffix)[:max_length] + + content_part = content_repr[:available_length] + return f"{prefix}{content_part}{suffix}" + + def serialize_template_field(template_field: Any, name: str) -> str | dict | list | int | float: """ Return a serializable representation of the templated field. @@ -74,10 +103,7 @@ def sort_dict_recursively(obj: Any) -> Any: serialized = str(template_field) if len(serialized) > max_length: rendered = redact(serialized, name) - return ( - "Truncated. You can change this behaviour in [core]max_templated_field_length. " - f"{rendered[: max_length - 79]!r}... " - ) + return truncate_rendered_value(rendered, max_length) return serialized if not template_field and not isinstance(template_field, tuple): # Avoid unnecessary serialization steps for empty fields unless they are tuples @@ -91,10 +117,7 @@ def sort_dict_recursively(obj: Any) -> Any: serialized = str(template_field) if len(serialized) > max_length: rendered = redact(serialized, name) - return ( - "Truncated. You can change this behaviour in [core]max_templated_field_length. " - f"{rendered[: max_length - 79]!r}... " - ) + return truncate_rendered_value(rendered, max_length) return template_field diff --git a/task-sdk/pyproject.toml b/task-sdk/pyproject.toml index 7c26f1babb58a..8f671094fb811 100644 --- a/task-sdk/pyproject.toml +++ b/task-sdk/pyproject.toml @@ -119,10 +119,10 @@ path = "src/airflow/sdk/__init__.py" "../shared/dagnode/src/airflow_shared/dagnode" = "src/airflow/sdk/_shared/dagnode" "../shared/logging/src/airflow_shared/logging" = "src/airflow/sdk/_shared/logging" "../shared/module_loading/src/airflow_shared/module_loading" = "src/airflow/sdk/_shared/module_loading" -"../shared/observability/src/airflow_shared/observability" = "src/airflow/_shared/observability" -"../shared/secrets_backend/src/airflow_shared/secrets_backend" = "src/airflow/sdk/_shared/secrets_backend" -"../shared/secrets_masker/src/airflow_shared/secrets_masker" = "src/airflow/sdk/_shared/secrets_masker" -"../shared/timezones/src/airflow_shared/timezones" = "src/airflow/sdk/_shared/timezones" +"../shared/observability/src/airflow_shared/observability" = "src/airflow/sdk/_shared/observability" + "../shared/secrets_backend/src/airflow_shared/secrets_backend" = "src/airflow/sdk/_shared/secrets_backend" + "../shared/secrets_masker/src/airflow_shared/secrets_masker" = "src/airflow/sdk/_shared/secrets_masker" + "../shared/timezones/src/airflow_shared/timezones" = "src/airflow/sdk/_shared/timezones" [tool.hatch.build.targets.wheel] packages = ["src/airflow"] diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 703cecf509cb2..77238dcc01d0d 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -25,12 +25,12 @@ import os import sys import time -from collections.abc import Callable, Iterable, Iterator, Mapping +from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence from contextlib import suppress from datetime import datetime, timezone from itertools import product from pathlib import Path -from typing import TYPE_CHECKING, Annotated, Any, Literal +from typing import TYPE_CHECKING, Annotated, Any, Literal, Union from urllib.parse import quote import attrs @@ -104,6 +104,7 @@ TriggerDagRun, ValidateInletsAndOutlets, ) + from airflow.sdk.execution_time.context import ( ConnectionAccessor, InletEventsAccessors, @@ -136,6 +137,35 @@ class TaskRunnerMarker: """Marker for listener hooks, to properly detect from which component they are called.""" +def truncate_rendered_value(rendered: Any, max_length: int) -> str: + """ + Truncate rendered value with a reasonable minimum length to avoid edge cases. + + Args: + rendered: The rendered value to truncate + max_length: The maximum allowed length for the output + + Returns: + Truncated string that respects the max_length constraint + """ + # Set a reasonable minimum to avoid complex edge cases with very small values + if max_length < 100: + max_length = 100 + + prefix = "Truncated. You can change this behaviour in [core]max_templated_field_length. " + suffix = "... " + + # Apply repr() to the content first to account for quotes that will be added + content_repr = repr(rendered) + available_length = max_length - len(prefix) - len(suffix) + if available_length <= 0: + return (prefix + suffix)[:max_length] + + content_part = content_repr[:available_length] + return f"{prefix}{content_part}{suffix}" + + + # TODO: Move this entire class into a separate file: # `airflow/sdk/execution_time/task_instance.py` # or `airflow/sdk/execution_time/runtime_ti.py` @@ -877,10 +907,7 @@ def sort_dict_recursively(obj: Any) -> Any: serialized = str(template_field) if len(serialized) > max_length: rendered = redact(serialized, name) - return ( - "Truncated. You can change this behaviour in [core]max_templated_field_length. " - f"{rendered[: max_length - 79]!r}... " - ) + return truncate_rendered_value(rendered, max_length) return serialized if not template_field and not isinstance(template_field, tuple): # Avoid unnecessary serialization steps for empty fields unless they are tuples @@ -894,10 +921,7 @@ def sort_dict_recursively(obj: Any) -> Any: serialized = str(template_field) if len(serialized) > max_length: rendered = redact(serialized, name) - return ( - "Truncated. You can change this behaviour in [core]max_templated_field_length. " - f"{rendered[: max_length - 79]!r}... " - ) + return truncate_rendered_value(rendered, max_length) return template_field @@ -1683,4 +1707,4 @@ def reinit_supervisor_comms() -> None: if __name__ == "__main__": - main() + main() \ No newline at end of file