diff --git a/airflow-core/src/airflow/serialization/helpers.py b/airflow-core/src/airflow/serialization/helpers.py index 723c113709a87..021bf35f73d7c 100644 --- a/airflow-core/src/airflow/serialization/helpers.py +++ b/airflow-core/src/airflow/serialization/helpers.py @@ -29,6 +29,36 @@ from airflow.timetables.base import Timetable as CoreTimetable +def _truncate_rendered_value(rendered: str, max_length: int) -> str: + if max_length <= 0: + return "" + + prefix = ( + "Truncated. You can change this behaviour in " + "[core]max_templated_field_length. " + ) + suffix = "..." + value = str(rendered) + + trunc_only = f"{prefix}{suffix}" + + if max_length < len(trunc_only): + return trunc_only + + overhead = len(prefix) + 2 + len(suffix) + available = max_length - overhead + + if available <= 0: + return trunc_only + + return f"{prefix}'{value[:available]}'{suffix}" + + + +def _safe_truncate_rendered_value(rendered: Any, max_length: int) -> str: + return _truncate_rendered_value(str(rendered), max_length) + + def serialize_template_field(template_field: Any, name: str) -> str | dict | list | int | float: """ Return a serializable representation of the templated field. @@ -74,10 +104,7 @@ def sort_dict_recursively(obj: Any) -> Any: serialized = str(template_field) if len(serialized) > max_length: rendered = redact(serialized, name) - return ( - "Truncated. You can change this behaviour in [core]max_templated_field_length. " - f"{rendered[: max_length - 79]!r}... " - ) + return _safe_truncate_rendered_value(rendered, max_length) return serialized if not template_field and not isinstance(template_field, tuple): # Avoid unnecessary serialization steps for empty fields unless they are tuples @@ -91,10 +118,7 @@ def sort_dict_recursively(obj: Any) -> Any: serialized = str(template_field) if len(serialized) > max_length: rendered = redact(serialized, name) - return ( - "Truncated. You can change this behaviour in [core]max_templated_field_length. " - f"{rendered[: max_length - 79]!r}... " - ) + return _safe_truncate_rendered_value(rendered, max_length) return template_field diff --git a/airflow-core/tests/unit/serialization/test_helpers.py b/airflow-core/tests/unit/serialization/test_helpers.py new file mode 100644 index 0000000000000..27111dc7e7dbf --- /dev/null +++ b/airflow-core/tests/unit/serialization/test_helpers.py @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + + +def test_serialize_template_field_with_very_small_max_length(monkeypatch): + monkeypatch.setenv("AIRFLOW__CORE__MAX_TEMPLATED_FIELD_LENGTH", "1") + + from airflow.serialization.helpers import serialize_template_field + + result = serialize_template_field("This is a long string", "test") + + assert result + assert len(result) <= 1 diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 85fd83551d4d8..95e213afa745b 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -826,6 +826,32 @@ def startup() -> tuple[RuntimeTaskInstance, Context, Logger]: return ti, ti.get_template_context(), log +def _truncate_rendered_value(rendered: str, max_length: int) -> str: + if max_length <= 0: + return "" + + prefix = ( + "Truncated. You can change this behaviour in " + "[core]max_templated_field_length. " + ) + suffix = "..." + value = str(rendered) + + trunc_only = f"{prefix}{suffix}" + + if max_length < len(trunc_only): + return trunc_only + + overhead = len(prefix) + 2 + len(suffix) + available = max_length - overhead + + if available <= 0: + return trunc_only + + return f"{prefix}'{value[:available]}'{suffix}" +def _safe_truncate_rendered_value(rendered: Any, max_length: int) -> str: + return _truncate_rendered_value(str(rendered), max_length) + def _serialize_template_field(template_field: Any, name: str) -> str | dict | list | int | float: """ @@ -877,10 +903,7 @@ def sort_dict_recursively(obj: Any) -> Any: serialized = str(template_field) if len(serialized) > max_length: rendered = redact(serialized, name) - return ( - "Truncated. You can change this behaviour in [core]max_templated_field_length. " - f"{rendered[: max_length - 79]!r}... " - ) + return _safe_truncate_rendered_value(rendered, max_length) return serialized if not template_field and not isinstance(template_field, tuple): # Avoid unnecessary serialization steps for empty fields unless they are tuples @@ -894,10 +917,7 @@ def sort_dict_recursively(obj: Any) -> Any: serialized = str(template_field) if len(serialized) > max_length: rendered = redact(serialized, name) - return ( - "Truncated. You can change this behaviour in [core]max_templated_field_length. " - f"{rendered[: max_length - 79]!r}... " - ) + return _safe_truncate_rendered_value(rendered, max_length) return template_field