Skip to content
Closed
4 changes: 2 additions & 2 deletions airflow-core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ exclude = [
"../shared/module_loading/src/airflow_shared/module_loading" = "src/airflow/_shared/module_loading"
"../shared/observability/src/airflow_shared/observability" = "src/airflow/_shared/observability"
"../shared/secrets_backend/src/airflow_shared/secrets_backend" = "src/airflow/_shared/secrets_backend"
"../shared/secrets_masker/src/airflow_shared/secrets_masker" = "src/airflow/_shared/secrets_masker"
"../shared/timezones/src/airflow_shared/timezones" = "src/airflow/_shared/timezones"
"../shared/secrets_masker/src/airflow_shared/secrets_masker" = "src/airflow/_shared/secrets_masker"
"../shared/timezones/src/airflow_shared/timezones" = "src/airflow/_shared/timezones"
Comment on lines -232 to +233
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do not implement a shared module then you should revert this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done sir , i have reverted the changes


[tool.hatch.build.targets.custom]
path = "./hatch_build.py"
Expand Down
41 changes: 32 additions & 9 deletions airflow-core/src/airflow/serialization/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from __future__ import annotations

import contextlib
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Union
from collections.abc import Sequence

from airflow._shared.secrets_masker import redact
from airflow.configuration import conf
Expand All @@ -29,6 +30,34 @@
from airflow.timetables.base import Timetable as CoreTimetable


def truncate_rendered_value(rendered: Any, max_length: int) -> str:
"""
Truncate rendered value with a reasonable minimum length to avoid edge cases.

Args:
rendered: The rendered value to truncate
max_length: The maximum allowed length for the output

Returns:
Truncated string that respects the max_length constraint
"""
# Set a reasonable minimum to avoid complex edge cases with very small values
if max_length < 100:
max_length = 100

prefix = "Truncated. You can change this behaviour in [core]max_templated_field_length. "
suffix = "... "

# Apply repr() to the content first to account for quotes that will be added
content_repr = repr(rendered)
available_length = max_length - len(prefix) - len(suffix)
if available_length <= 0:
return (prefix + suffix)[:max_length]

content_part = content_repr[:available_length]
return f"{prefix}{content_part}{suffix}"


def serialize_template_field(template_field: Any, name: str) -> str | dict | list | int | float:
"""
Return a serializable representation of the templated field.
Expand Down Expand Up @@ -74,10 +103,7 @@ def sort_dict_recursively(obj: Any) -> Any:
serialized = str(template_field)
if len(serialized) > max_length:
rendered = redact(serialized, name)
return (
"Truncated. You can change this behaviour in [core]max_templated_field_length. "
f"{rendered[: max_length - 79]!r}... "
)
return truncate_rendered_value(rendered, max_length)
return serialized
if not template_field and not isinstance(template_field, tuple):
# Avoid unnecessary serialization steps for empty fields unless they are tuples
Expand All @@ -91,10 +117,7 @@ def sort_dict_recursively(obj: Any) -> Any:
serialized = str(template_field)
if len(serialized) > max_length:
rendered = redact(serialized, name)
return (
"Truncated. You can change this behaviour in [core]max_templated_field_length. "
f"{rendered[: max_length - 79]!r}... "
)
return truncate_rendered_value(rendered, max_length)
return template_field


Expand Down
8 changes: 4 additions & 4 deletions task-sdk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,10 @@ path = "src/airflow/sdk/__init__.py"
"../shared/dagnode/src/airflow_shared/dagnode" = "src/airflow/sdk/_shared/dagnode"
"../shared/logging/src/airflow_shared/logging" = "src/airflow/sdk/_shared/logging"
"../shared/module_loading/src/airflow_shared/module_loading" = "src/airflow/sdk/_shared/module_loading"
"../shared/observability/src/airflow_shared/observability" = "src/airflow/_shared/observability"
"../shared/secrets_backend/src/airflow_shared/secrets_backend" = "src/airflow/sdk/_shared/secrets_backend"
"../shared/secrets_masker/src/airflow_shared/secrets_masker" = "src/airflow/sdk/_shared/secrets_masker"
"../shared/timezones/src/airflow_shared/timezones" = "src/airflow/sdk/_shared/timezones"
"../shared/observability/src/airflow_shared/observability" = "src/airflow/sdk/_shared/observability"
"../shared/secrets_backend/src/airflow_shared/secrets_backend" = "src/airflow/sdk/_shared/secrets_backend"
"../shared/secrets_masker/src/airflow_shared/secrets_masker" = "src/airflow/sdk/_shared/secrets_masker"
"../shared/timezones/src/airflow_shared/timezones" = "src/airflow/sdk/_shared/timezones"
Comment on lines -122 to +125
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do not implement a shared module then you should revert this.


[tool.hatch.build.targets.wheel]
packages = ["src/airflow"]
Expand Down
46 changes: 35 additions & 11 deletions task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import os
import sys
import time
from collections.abc import Callable, Iterable, Iterator, Mapping
from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence
from contextlib import suppress
from datetime import datetime, timezone
from itertools import product
from pathlib import Path
from typing import TYPE_CHECKING, Annotated, Any, Literal
from typing import TYPE_CHECKING, Annotated, Any, Literal, Union
from urllib.parse import quote

import attrs
Expand Down Expand Up @@ -104,6 +104,7 @@
TriggerDagRun,
ValidateInletsAndOutlets,
)

from airflow.sdk.execution_time.context import (
ConnectionAccessor,
InletEventsAccessors,
Expand Down Expand Up @@ -136,6 +137,35 @@ class TaskRunnerMarker:
"""Marker for listener hooks, to properly detect from which component they are called."""


def truncate_rendered_value(rendered: Any, max_length: int) -> str:
"""
Truncate rendered value with a reasonable minimum length to avoid edge cases.

Args:
rendered: The rendered value to truncate
max_length: The maximum allowed length for the output

Returns:
Truncated string that respects the max_length constraint
"""
# Set a reasonable minimum to avoid complex edge cases with very small values
if max_length < 100:
max_length = 100

prefix = "Truncated. You can change this behaviour in [core]max_templated_field_length. "
suffix = "... "

# Apply repr() to the content first to account for quotes that will be added
content_repr = repr(rendered)
available_length = max_length - len(prefix) - len(suffix)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done sir

if available_length <= 0:
return (prefix + suffix)[:max_length]

content_part = content_repr[:available_length]
return f"{prefix}{content_part}{suffix}"



# TODO: Move this entire class into a separate file:
# `airflow/sdk/execution_time/task_instance.py`
# or `airflow/sdk/execution_time/runtime_ti.py`
Expand Down Expand Up @@ -877,10 +907,7 @@ def sort_dict_recursively(obj: Any) -> Any:
serialized = str(template_field)
if len(serialized) > max_length:
rendered = redact(serialized, name)
return (
"Truncated. You can change this behaviour in [core]max_templated_field_length. "
f"{rendered[: max_length - 79]!r}... "
)
return truncate_rendered_value(rendered, max_length)
return serialized
if not template_field and not isinstance(template_field, tuple):
# Avoid unnecessary serialization steps for empty fields unless they are tuples
Expand All @@ -894,10 +921,7 @@ def sort_dict_recursively(obj: Any) -> Any:
serialized = str(template_field)
if len(serialized) > max_length:
rendered = redact(serialized, name)
return (
"Truncated. You can change this behaviour in [core]max_templated_field_length. "
f"{rendered[: max_length - 79]!r}... "
)
return truncate_rendered_value(rendered, max_length)
return template_field


Expand Down Expand Up @@ -1683,4 +1707,4 @@ def reinit_supervisor_comms() -> None:


if __name__ == "__main__":
main()
main()