From 2aa15d8503470c0a30b890acc0aaba45e2926846 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Mon, 15 Dec 2025 14:47:27 +0900 Subject: [PATCH 01/12] add static checker for preventing to increase dag version --- .../src/airflow/dag_processing/collection.py | 10 +- .../src/airflow/dag_processing/manager.py | 6 +- .../src/airflow/dag_processing/processor.py | 12 +- airflow-core/src/airflow/models/dagwarning.py | 1 + .../src/airflow/utils/static_checker.py | 468 +++++++++++ .../routes/public/test_dag_warning.py | 6 +- .../tests/unit/utils/test_static_checker.py | 726 ++++++++++++++++++ 7 files changed, 1219 insertions(+), 10 deletions(-) create mode 100644 airflow-core/src/airflow/utils/static_checker.py create mode 100644 airflow-core/tests/unit/utils/test_static_checker.py diff --git a/airflow-core/src/airflow/dag_processing/collection.py b/airflow-core/src/airflow/dag_processing/collection.py index fd09df8fd36ec..25f554982cb53 100644 --- a/airflow-core/src/airflow/dag_processing/collection.py +++ b/airflow-core/src/airflow/dag_processing/collection.py @@ -257,7 +257,10 @@ def _sync_dag_perms(dag: LazyDeserializedDAG, session: Session): def _update_dag_warnings( - dag_ids: list[str], warnings: set[DagWarning], warning_types: tuple[DagWarningType], session: Session + dag_ids: list[str], + warnings: set[DagWarning], + warning_types: tuple[DagWarningType, DagWarningType], + session: Session, ): from airflow.models.dagwarning import DagWarning @@ -371,7 +374,10 @@ def update_dag_parsing_results_in_db( warnings: set[DagWarning], session: Session, *, - warning_types: tuple[DagWarningType] = (DagWarningType.NONEXISTENT_POOL,), + warning_types: tuple[DagWarningType, DagWarningType] = ( + DagWarningType.NONEXISTENT_POOL, + DagWarningType.PARSING_ERROR, + ), files_parsed: set[tuple[str, str]] | None = None, ): """ diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index bddeedd8ac134..0111a41c7725b 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -1211,13 +1211,17 @@ def process_parse_results( files_parsed = {(bundle_name, relative_fileloc)} files_parsed.update(import_errors.keys()) + warning = parsing_result.warnings + if parsing_result.warnings and isinstance(parsing_result.warnings[0], dict): + warning = [DagWarning(**warn) for warn in parsing_result.warnings] + update_dag_parsing_results_in_db( bundle_name=bundle_name, bundle_version=bundle_version, dags=parsing_result.serialized_dags, import_errors=import_errors, parse_duration=run_duration, - warnings=set(parsing_result.warnings or []), + warnings=set(warning or []), session=session, files_parsed=files_parsed, ) diff --git a/airflow-core/src/airflow/dag_processing/processor.py b/airflow-core/src/airflow/dag_processing/processor.py index 77bc71df3c464..298f1eebe41e5 100644 --- a/airflow-core/src/airflow/dag_processing/processor.py +++ b/airflow-core/src/airflow/dag_processing/processor.py @@ -70,6 +70,7 @@ from airflow.serialization.serialized_objects import DagSerialization, LazyDeserializedDAG from airflow.utils.file import iter_airflow_imports from airflow.utils.state import TaskInstanceState +from airflow.utils.static_checker import check_dag_file_static, get_warning_dag_format_dict if TYPE_CHECKING: from structlog.typing import FilteringBoundLogger @@ -198,6 +199,7 @@ def _parse_file_entrypoint(): log = structlog.get_logger(logger_name="task") result = _parse_file(msg, log) + if result is not None: comms_decoder.send(result) @@ -205,12 +207,15 @@ def _parse_file_entrypoint(): def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileParsingResult | None: # TODO: Set known_pool names on DagBag! + warning_statement = check_dag_file_static(os.fspath(msg.file)) + bag = BundleDagBag( dag_folder=msg.file, bundle_path=msg.bundle_path, bundle_name=msg.bundle_name, load_op_links=False, ) + if msg.callback_requests: # If the request is for callback, we shouldn't serialize the DAGs _execute_callbacks(bag, msg.callback_requests, log) @@ -222,8 +227,7 @@ def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileP fileloc=msg.file, serialized_dags=serialized_dags, import_errors=bag.import_errors, - # TODO: Make `bag.dag_warnings` not return SQLA model objects - warnings=[], + warnings=get_warning_dag_format_dict(warning_statement, dag_ids=bag.dag_ids), ) return result @@ -506,10 +510,6 @@ def start( # type: ignore[override] client: Client, **kwargs, ) -> Self: - logger = kwargs["logger"] - - _pre_import_airflow_modules(os.fspath(path), logger) - proc: Self = super().start(target=target, client=client, **kwargs) proc.had_callbacks = bool(callbacks) # Track if this process had callbacks proc._on_child_started(callbacks, path, bundle_path, bundle_name) diff --git a/airflow-core/src/airflow/models/dagwarning.py b/airflow-core/src/airflow/models/dagwarning.py index eaf5945857108..5ab5c0f904c1d 100644 --- a/airflow-core/src/airflow/models/dagwarning.py +++ b/airflow-core/src/airflow/models/dagwarning.py @@ -103,3 +103,4 @@ class DagWarningType(str, Enum): ASSET_CONFLICT = "asset conflict" NONEXISTENT_POOL = "non-existent pool" + PARSING_ERROR = "parsing error" diff --git a/airflow-core/src/airflow/utils/static_checker.py b/airflow-core/src/airflow/utils/static_checker.py new file mode 100644 index 0000000000000..017bd5370e972 --- /dev/null +++ b/airflow-core/src/airflow/utils/static_checker.py @@ -0,0 +1,468 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import ast +from dataclasses import dataclass +from enum import Enum +from pathlib import Path + + +@dataclass +class RuntimeVaryingValueWarning: + """Warning information for runtime-varying value detection.""" + + line: int + col: int + code: str + message: str + + +class WarningContext(str, Enum): + """Context types for warnings.""" + + TASK_CONSTRUCTOR = "TASK constructor" + DAG_CONSTRUCTOR = "DAG constructor" + + +RUNTIME_VARYING_CALLS = [ + ("datetime", "now"), + ("datetime", "today"), + ("datetime", "utcnow"), + ("date", "today"), + ("time", "time"), + ("time", "localtime"), + ("random", "random"), + ("random", "randint"), + ("random", "choice"), + ("random", "uniform"), + ("uuid", "uuid4"), + ("uuid", "uuid1"), +] + + +class RuntimeVaryingValueAnalyzer: + """ + Analyzer dedicated to tracking and detecting runtime-varying values. + + This analyzer is responsible for identifying if a given AST node + contains values that change on every execution (datetime.now(), random(), etc.). + """ + + def __init__(self, varying_vars: dict, imports: dict, from_imports: dict): + self.varying_vars = varying_vars + self.imports = imports + self.from_imports = from_imports + + def get_varying_source(self, node: ast.expr) -> str | None: + """ + Check if an AST node contains runtime-varying values and return the source. + + Checks: + - Runtime-varying function calls (datetime.now(), etc.) + - Runtime-varying variable references + - Runtime-varying values in f-strings + - Runtime-varying values in expressions/collections + """ + # 1. Direct runtime-varying call + if isinstance(node, ast.Call) and self._is_runtime_varying_call(node): + return ast.unparse(node) + + # 2. Runtime-varying variable reference + if isinstance(node, ast.Name) and node.id in self.varying_vars: + _, source = self.varying_vars[node.id] + return source + + # 3. f-string + if isinstance(node, ast.JoinedStr): + return self._check_fstring_varying(node) + + # 4. Binary operation + if isinstance(node, ast.BinOp): + return self.get_varying_source(node.left) or self.get_varying_source(node.right) + + # 5. Collections (list/tuple/set) + if isinstance(node, (ast.List, ast.Tuple, ast.Set)): + return self._check_collection_varying(node.elts) + + # 6. List comprehension + if isinstance(node, ast.ListComp): + return self.get_varying_source(node.elt) + + # 7. Dictionary + if isinstance(node, ast.Dict): + return self._check_dict_varying(node) + + # 8. Method call chain + if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute): + return self.get_varying_source(node.func.value) + + return None + + def _check_fstring_varying(self, node: ast.JoinedStr) -> str | None: + """Check for runtime-varying values inside f-strings.""" + for value in node.values: + if isinstance(value, ast.FormattedValue): + source = self.get_varying_source(value.value) + if source: + return source + return None + + def _check_collection_varying(self, elements: list) -> str | None: + """Check for runtime-varying values in collection elements.""" + for elt in elements: + source = self.get_varying_source(elt) + if source: + return source + return None + + def _check_dict_varying(self, node: ast.Dict) -> str | None: + """Check for runtime-varying values in dictionary keys/values.""" + for key, value in zip(node.keys, node.values): + if key: + source = self.get_varying_source(key) + if source: + return source + if value: + source = self.get_varying_source(value) + if source: + return source + return None + + def _is_runtime_varying_call(self, node: ast.Call) -> bool: + """ + Check if a call is runtime-varying. + + 1. Is the function itself runtime-varying? + 2. Do the arguments contain runtime-varying values? + """ + # Check if the function itself is runtime-varying + if isinstance(node.func, ast.Attribute): + if self._is_runtime_varying_attribute_call(node.func): + return True + elif isinstance(node.func, ast.Name): + if self._is_runtime_varying_name_call(node.func): + return True + + # Check if arguments contain runtime-varying values + return self._has_varying_arguments(node) + + def _has_varying_arguments(self, node: ast.Call) -> bool: + """Check if function arguments contain runtime-varying values.""" + for arg in node.args: + if self.get_varying_source(arg): + return True + + for kw in node.keywords: + if self.get_varying_source(kw.value): + return True + + return False + + def _is_runtime_varying_attribute_call(self, attr: ast.Attribute) -> bool: + """Check for runtime-varying calls like datetime.now().""" + method_name = attr.attr + + if isinstance(attr.value, ast.Name): + module_or_alias = attr.value.id + actual_module = self.imports.get(module_or_alias, module_or_alias) + + # If imported via "from import" + if module_or_alias in self.from_imports: + _, original_name = self.from_imports[module_or_alias] + actual_module = original_name + + return (actual_module, method_name) in RUNTIME_VARYING_CALLS + + # Nested attribute (e.g., datetime.datetime.now) + if isinstance(attr.value, ast.Attribute): + inner_attr = attr.value + if isinstance(inner_attr.value, ast.Name): + return (inner_attr.attr, method_name) in RUNTIME_VARYING_CALLS + + return False + + def _is_runtime_varying_name_call(self, func: ast.Name) -> bool: + """Check for runtime-varying calls like now() (when imported via 'from import').""" + func_name = func.id + + if func_name in self.from_imports: + module, original_name = self.from_imports[func_name] + module_parts = module.split(".") + + for part in module_parts: + if (part, original_name) in RUNTIME_VARYING_CALLS: + return True + + return False + + +class DAGTaskDetector: + """ + Detector dedicated to identifying DAG and Task constructors. + + This detector identifies when code is creating DAG or Task objects + in Airflow. It needs to handle both traditional class instantiation and decorator styles. + """ + + def __init__(self, from_imports: dict): + self.from_imports = from_imports + self.dag_instances: set[str] = set() + self.is_in_dag_context: bool = False + + def is_dag_constructor(self, node: ast.Call) -> bool: + """Check if a call is a DAG constructor.""" + if not isinstance(node.func, ast.Name): + return False + + func_name = node.func.id + + # "from airflow import DAG" form or "from airflow.decorator import dag" + if func_name in self.from_imports: + module, original = self.from_imports[func_name] + if (module == "airflow" or module.startswith("airflow.")) and original in ("DAG", "dag"): + return True + + return False + + def is_task_constructor(self, node: ast.Call) -> bool: + """ + Check if a call is a Task constructor. + + Criteria: + 1. All calls within a DAG with block + 2. Calls that receive a DAG instance as an argument (dag=...) + """ + # Inside DAG with block + if self.is_in_dag_context: + return True + + # Passing DAG instance as argument + for arg in node.args: + if isinstance(arg, ast.Name) and arg.id in self.dag_instances: + return True + + for keyword in node.keywords: + if keyword.value and isinstance(keyword.value, ast.Name): + if keyword.value.id in self.dag_instances: + return True + + return False + + def register_dag_instance(self, var_name: str): + """Register a DAG instance variable name.""" + self.dag_instances.add(var_name) + + def enter_dag_context(self): + """Enter a DAG with block.""" + self.is_in_dag_context = True + + def exit_dag_context(self): + """Exit a DAG with block.""" + self.is_in_dag_context = False + + +class AirflowRuntimeVaryingValueChecker(ast.NodeVisitor): + """ + Main visitor class to detect runtime-varying value usage in Airflow DAG/Task. + + Main responsibilities: + - Traverse AST and visit nodes + - Detect DAG/Task creation + - Track runtime-varying values and generate warnings + """ + + def __init__(self): + self.warnings: list[RuntimeVaryingValueWarning] = [] + self.imports: dict[str, str] = {} + self.from_imports: dict[str, tuple[str, str]] = {} + self.varying_vars: dict[str, tuple[int, str]] = {} + + # Helper objects + self.value_analyzer = RuntimeVaryingValueAnalyzer(self.varying_vars, self.imports, self.from_imports) + self.dag_detector = DAGTaskDetector(self.from_imports) + + def visit_Import(self, node: ast.Import): + """Process import statements.""" + for alias in node.names: + name = alias.asname or alias.name + self.imports[name] = alias.name + + def visit_ImportFrom(self, node: ast.ImportFrom): + """Process from ... import statements.""" + if node.module: + for alias in node.names: + name = alias.asname or alias.name + self.from_imports[name] = (node.module, alias.name) + + def visit_Assign(self, node: ast.Assign): + """ + Process variable assignments. + + Checks: + 1. DAG instance assignment + 2. Task instance assignment + 3. Runtime-varying value assignment + """ + value = node.value + + # DAG constructor + if isinstance(value, ast.Call) and self.dag_detector.is_dag_constructor(value): + self._register_dag_instances(node.targets) + self._check_and_warn(value, WarningContext.DAG_CONSTRUCTOR) + + # Task constructor + elif isinstance(value, ast.Call) and self.dag_detector.is_task_constructor(value): + self._check_and_warn(value, WarningContext.TASK_CONSTRUCTOR) + + # Track runtime-varying values + else: + self._track_varying_assignment(node) + + def visit_Call(self, node: ast.Call): + """ + Process function calls. + + Check not assign but just call the function or dag definition via decorator. + """ + if self.dag_detector.is_dag_constructor(node): + self._check_and_warn(node, WarningContext.DAG_CONSTRUCTOR) + + elif self.dag_detector.is_task_constructor(node): + self._check_and_warn(node, WarningContext.TASK_CONSTRUCTOR) + + def visit_For(self, node: ast.For): + """ + Process for statements. + + Check if iteration target contains runtime-varying values. + """ + # check the iterator value is runtime-varying + # iter is runtime-varying : for iter in [datetime.now(), 3] + varying_source = self.value_analyzer.get_varying_source(node.iter) + if varying_source: + if isinstance(node.target, ast.Name): + self.varying_vars[node.target.id] = (node.lineno, varying_source) + + for body in node.body: + self.visit(body) + + if varying_source: + if isinstance(node.target, ast.Name): + self.varying_vars.pop(node.target.id) + + def visit_With(self, node: ast.With): + """ + Process with statements. + + Detect DAG context manager. + """ + is_with_dag_context = False + for item in node.items: + # check if the dag instance exists in with context + self.visit(item) + if isinstance(item.context_expr, ast.Call): + if self.dag_detector.is_dag_constructor(item.context_expr): + # check the value defined in with statement to detect entering DAG with block + is_with_dag_context = True + + if is_with_dag_context: + self.dag_detector.enter_dag_context() + + for body in node.body: + self.visit(body) + + # Exit DAG with block + self.dag_detector.exit_dag_context() + + def _register_dag_instances(self, targets: list): + """Register DAG instance variable names.""" + for target in targets: + if isinstance(target, ast.Name): + self.dag_detector.register_dag_instance(target.id) + + def _track_varying_assignment(self, node: ast.Assign): + """Track variable assignments with runtime-varying values.""" + varying_source = self.value_analyzer.get_varying_source(node.value) + if varying_source: + for target in node.targets: + if isinstance(target, ast.Name): + self.varying_vars[target.id] = (node.lineno, varying_source) + + def _check_and_warn(self, call: ast.Call, context: WarningContext): + """Check function call arguments and generate warnings.""" + varying_source = self.value_analyzer.get_varying_source(call) + if varying_source: + self.warnings.append( + RuntimeVaryingValueWarning( + line=call.lineno, + col=call.col_offset, + code=ast.unparse(call), + message=self._get_warning_message(context), + ) + ) + + def _get_warning_message(self, context: WarningContext): + """Get appropriate warning message based on context.""" + if self.dag_detector.is_in_dag_context and context == WarningContext.TASK_CONSTRUCTOR: + return "Don't use runtime-varying values as function arguments within with DAG block" + return f"Don't use runtime-varying value as argument in {context.value}" + + def format_warnings(self) -> str | None: + """Return formatted string of warning list.""" + if not self.warnings: + return None + + lines = [ + "⚠️ This DAG uses runtime-variable values in DAG construction.", + "⚠️ It causes the DAG version to increase as values change on every DAG parse.", + "", + ] + for w in self.warnings: + lines.append(f"Line {w.line}, Col {w.col}") + lines.append(f"Code: {w.code}") + lines.append(f"Issue: {w.message}") + lines.append("") + + return "\n".join(lines) + + +def check_dag_file_static(file_path): + try: + parsed = ast.parse(Path(file_path).read_bytes()) + except Exception: + return None + + checker = AirflowRuntimeVaryingValueChecker() + checker.visit(parsed) + return checker.format_warnings() + + +def get_warning_dag_format_dict(warning_statement, dag_ids): + """Convert warning statement to DAG warning format.""" + from airflow.models.dagwarning import DagWarningType + + if not warning_statement: + return [] + return [ + { + "dag_id": dag_id, + "warning_type": DagWarningType.PARSING_ERROR.value, + "message": warning_statement, + } + for dag_id in dag_ids + ] diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_warning.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_warning.py index 287245127f8cc..f129f6253f46e 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_warning.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_warning.py @@ -113,5 +113,9 @@ def test_should_respond_403(self, unauthorized_test_client): def test_get_dag_warnings_bad_request(self, test_client): response = test_client.get("/dagWarnings", params={"warning_type": "invalid"}) response_json = response.json() + print(response_json) assert response.status_code == 422 - assert response_json["detail"][0]["msg"] == "Input should be 'asset conflict' or 'non-existent pool'" + assert ( + response_json["detail"][0]["msg"] + == "Input should be 'asset conflict', 'non-existent pool' or 'parsing error'" + ) diff --git a/airflow-core/tests/unit/utils/test_static_checker.py b/airflow-core/tests/unit/utils/test_static_checker.py new file mode 100644 index 0000000000000..ff3f7da9e7741 --- /dev/null +++ b/airflow-core/tests/unit/utils/test_static_checker.py @@ -0,0 +1,726 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import ast + +from airflow.utils.static_checker import ( + AirflowRuntimeVaryingValueChecker, + DAGTaskDetector, + RuntimeVaryingValueAnalyzer, + RuntimeVaryingValueWarning, + WarningContext, +) + + +class TestRuntimeVaryingValueAnalyzer: + def setup_method(self): + """Each test gets a fresh analyzer instance.""" + self.varying_vars = {} + self.imports = {} + self.from_imports = {} + self.analyzer = RuntimeVaryingValueAnalyzer(self.varying_vars, self.imports, self.from_imports) + + def test_is_runtime_varying_attribute_call__detects_datetime_now(self): + """datetime.now() should be recognized as runtime-varying.""" + code = "datetime.now()" + call_node = ast.parse(code, mode="eval").body + self.imports["datetime"] = "datetime" + + # The func is an Attribute node: datetime.now + assert isinstance(call_node.func, ast.Attribute) + result = self.analyzer._is_runtime_varying_attribute_call(call_node.func) + + assert result is True + + def test_is_runtime_varying_attribute_call__ignores_static_method(self): + """Static methods like str.upper() should NOT be detected.""" + code = "str.upper('hello')" + call_node = ast.parse(code, mode="eval").body + + assert isinstance(call_node.func, ast.Attribute) + result = self.analyzer._is_runtime_varying_attribute_call(call_node.func) + + assert result is False + + def test_is_runtime_varying_attribute_call__handles_aliased_imports(self): + """ + Should detect runtime-varying calls even with import aliases. + + Example: import datetime as dt; dt.now() + """ + code = "dt.now()" + call_node = ast.parse(code, mode="eval").body + self.imports["dt"] = "datetime" # dt is alias for datetime + + assert isinstance(call_node.func, ast.Attribute) + result = self.analyzer._is_runtime_varying_attribute_call(call_node.func) + + assert result is True + + def test_is_runtime_varying_name_call__detects_uuid4(self): + """Detect uuid4() when imported as "from uuid import uuid4.""" + code = "uuid4()" + call_node = ast.parse(code, mode="eval").body + self.from_imports["uuid4"] = ("uuid", "uuid4") + + assert isinstance(call_node.func, ast.Name) + result = self.analyzer._is_runtime_varying_name_call(call_node.func) + + assert result is True + + def test_is_runtime_varying_name_call__ignores_regular_function(self): + code = "my_function()" + call_node = ast.parse(code, mode="eval").body + + assert isinstance(call_node.func, ast.Name) + result = self.analyzer._is_runtime_varying_name_call(call_node.func) + + assert result is False + + def test_has_varying_arguments__detects_varying_positional_arg(self): + """ + Detect when a positional argument is runtime-varying. + + Example: print(datetime.now()) + """ + code = "print(datetime.now())" + call_node = ast.parse(code, mode="eval").body + self.imports["datetime"] = "datetime" + + result = self.analyzer._has_varying_arguments(call_node) + + assert result is True + + def test_has_varying_arguments__detects_varying_keyword_arg(self): + """ + Detect when a keyword argument is runtime-varying. + + Example: func(param=random.randint(1, 10)) + """ + code = "func(param=func1(random.randint(1, 10)))" + call_node = ast.parse(code, mode="eval").body + self.imports["random"] = "random" + + result = self.analyzer._has_varying_arguments(call_node) + + assert result is True + + def test_has_varying_arguments__returns_false_for_static_args(self): + """ + Static arguments should return False. + + Example: print("hello", 123) + """ + code = 'print("hello", 123)' + call_node = ast.parse(code, mode="eval").body + + result = self.analyzer._has_varying_arguments(call_node) + + assert result is False + + def test_is_runtime_varying_call__true_when_function_itself_varies(self): + """ + Return True when the function call itself is runtime-varying. + + Example: datetime.now() - the function is the varying part + """ + code = "datetime.now()" + call_node = ast.parse(code, mode="eval").body + self.imports["datetime"] = "datetime" + + result = self.analyzer._is_runtime_varying_call(call_node) + + assert result is True + + def test_is_runtime_varying_call__true_when_argument_varies(self): + """ + Return True when arguments contain runtime-varying values. + + Example: print(datetime.now()) - print is static but arg varies + """ + code = "print(datetime.now())" + call_node = ast.parse(code, mode="eval").body + self.imports["datetime"] = "datetime" + + result = self.analyzer._is_runtime_varying_call(call_node) + + assert result is True + + def test_is_runtime_varying_call__false_when_completely_static(self): + """Return False when both function and arguments are static.""" + code = 'print("hello")' + call_node = ast.parse(code, mode="eval").body + + result = self.analyzer._is_runtime_varying_call(call_node) + + assert result is False + + def test_get_varying_source__detects_direct_call(self): + """Detect direct runtime-varying function calls.""" + code = "datetime.now()" + node = ast.parse(code, mode="eval").body + self.imports["datetime"] = "datetime" + + result = self.analyzer.get_varying_source(node) + + assert result == "datetime.now()" + + def test_get_varying_source__detects_variable_reference(self): + """ + Detect when a variable holds a runtime-varying value. + + Example: current_time = datetime.now(); + """ + code = "current_time" + node = ast.parse(code, mode="eval").body + self.varying_vars["current_time"] = (10, "datetime.now()") + + result = self.analyzer.get_varying_source(node) + + assert result == "datetime.now()" + + def test_get_varying_source__detects_in_fstring(self): + """ + Detect runtime-varying values embedded in f-strings. + + Example: f"dag_{datetime.now()}" + """ + code = 'f"dag_{datetime.now()}"' + node = ast.parse(code, mode="eval").body + self.imports["datetime"] = "datetime" + + result = self.analyzer.get_varying_source(node) + + assert result == "datetime.now()" + + def test_get_varying_source__detects_in_list(self): + """ + Detect runtime-varying values inside list literals. + + Example: [1, 2, datetime.now()] + """ + code = "[1, 2, datetime.now()]" + node = ast.parse(code, mode="eval").body + self.imports["datetime"] = "datetime" + + result = self.analyzer.get_varying_source(node) + + assert result == "datetime.now()" + + def test_get_varying_source__detects_in_dict_value(self): + """ + Detect runtime-varying values in dictionary values. + + Example: {"key": datetime.now()} + """ + code = '{"key": datetime.now()}' + node = ast.parse(code, mode="eval").body + self.imports["datetime"] = "datetime" + + result = self.analyzer.get_varying_source(node) + + assert result == "datetime.now()" + + def test_get_varying_source__detects_in_binary_operation(self): + """ + Detect runtime-varying values in binary operations. + + Example: "prefix_" + str(datetime.now()) + """ + code = '"prefix_" + str(datetime.now())' + node = ast.parse(code, mode="eval").body + self.imports["datetime"] = "datetime" + + result = self.analyzer.get_varying_source(node) + + assert result is not None + assert "datetime.now()" in result + + def test_get_varying_source__returns_none_for_static_values(self): + """ + Return None for completely static values. + + Example: "static_string", 123, [1, 2, 3] + """ + static_values = [ + '"static_string"', + "123", + "[1, 2, 3]", + '{"key": "value"}', + ] + + for code in static_values: + node = ast.parse(code, mode="eval").body + result = self.analyzer.get_varying_source(node) + assert result is None, f"Expected None for static value: {code}" + + +class TestDAGTaskDetector: + def setup_method(self): + """Each test gets a fresh detector instance""" + self.from_imports = {} + self.detector = DAGTaskDetector(self.from_imports) + + def test_is_dag_constructor__detects_traditional_dag_call_uppercase(self): + """ + Detect uppercase DAG() when imported. + + Usage: dag = DAG(dag_id="my_dag") + """ + code = 'DAG(dag_id="my_dag")' + call_node = ast.parse(code, mode="eval").body + self.from_imports["DAG"] = ("airflow", "DAG") + + result = self.detector.is_dag_constructor(call_node) + + assert result is True + + def test_is_dag_constructor__detects_dag_generated_by_decorator(self): + """ + Detect Dag generated by decorator. + + Usage: @dag(dag_id="my_dag") + """ + code = 'dag(dag_id="my_dag")' + call_node = ast.parse(code, mode="eval").body + self.from_imports["dag"] = ("airflow.decorators", "dag") + + result = self.detector.is_dag_constructor(call_node) + + assert result is True + + def test_is_dag_constructor__ignores_non_dag_functions(self): + """Regular function calls should not be detected as DAG constructors.""" + code = "my_function()" + call_node = ast.parse(code, mode="eval").body + + result = self.detector.is_dag_constructor(call_node) + + assert result is False + + def test_is_task_constructor__true_when_inside_dag_context(self): + """ + Any function call inside a DAG with-block is considered a task. + + Example: + with DAG() as dag: + PythonOperator() # <- This is a task + """ + code = "PythonOperator(task_id='my_task')" + call_node = ast.parse(code, mode="eval").body + + self.detector.enter_dag_context() + result = self.detector.is_task_constructor(call_node) + + assert result is True + + def test_is_task_constructor__false_when_outside_dag_context(self): + """Same call outside DAG context is NOT automatically a task.""" + code = "PythonOperator(task_id='my_task')" + call_node = ast.parse(code, mode="eval").body + + result = self.detector.is_task_constructor(call_node) + assert result is False + + def test_is_task_constructor__true_when_dag_passed_as_argument(self): + """ + Detect task when dag= parameter references a DAG instance. + + Example: my_dag = DAG(dag_id='dag); task = PythonOperator(dag=my_dag) + """ + code = "PythonOperator(task_id='task', dag=my_dag)" + call_node = ast.parse(code, mode="eval").body + self.detector.register_dag_instance("my_dag") + + result = self.detector.is_task_constructor(call_node) + assert result is True + + def test_is_task_constructor__true_when_dag_in_positional_args(self): + """ + Detect task even when DAG is passed as positional argument. + + Example: my_dag = DAG(dag_id='dag); task = PythonOperator('task_id', my_dag) + """ + code = "PythonOperator('task_id', my_dag)" + call_node = ast.parse(code, mode="eval").body + self.detector.register_dag_instance("my_dag") + + result = self.detector.is_task_constructor(call_node) + assert result is True + + def test_enter_and_exit_dag_context(self): + """Properly track entering and exiting DAG with-blocks.""" + assert self.detector.is_in_dag_context is False + + self.detector.enter_dag_context() + assert self.detector.is_in_dag_context is True + + self.detector.exit_dag_context() + assert self.detector.is_in_dag_context is False + + def test_register_dag_instance(self): + """Remember variable names that hold DAG instances.""" + assert "my_dag" not in self.detector.dag_instances + + self.detector.register_dag_instance("my_dag") + + assert "my_dag" in self.detector.dag_instances + + +class TestAirflowRuntimeVaryingValueChecker: + """Tests for AirflowRuntimeVaryingValueChecker (Main Visitor).""" + + def setup_method(self): + """Each test gets a fresh checker instance""" + self.checker = AirflowRuntimeVaryingValueChecker() + + def test_visit_import__tracks_simple_import(self): + """Remember simple imports like 'import datetime'.""" + code = "import datetime" + tree = ast.parse(code) + + self.checker.visit(tree) + + assert "datetime" in self.checker.imports + assert self.checker.imports["datetime"] == "datetime" + + def test_visit_import__tracks_aliased_import(self): + """Remember import aliases like 'import datetime as dt'.""" + code = "import datetime as dt" + tree = ast.parse(code) + + self.checker.visit(tree) + + assert "dt" in self.checker.imports + assert self.checker.imports["dt"] == "datetime" + + def test_visit_importfrom__tracks_from_import(self): + """Remember 'from X import Y' style imports.""" + code = "from datetime import now" + tree = ast.parse(code) + + self.checker.visit(tree) + + assert "now" in self.checker.from_imports + assert self.checker.from_imports["now"] == ("datetime", "now") + + def test_visit_importfrom__tracks_aliased_from_import(self): + """Remember aliases in from imports.""" + code = "from datetime import now as current_time" + tree = ast.parse(code) + + self.checker.visit(tree) + + assert "current_time" in self.checker.from_imports + assert self.checker.from_imports["current_time"] == ("datetime", "now") + + def test_visit_assign__registers_dag_instance(self): + """When assigning DAG(), remember the variable name.""" + code = """ +from airflow import DAG +my_dag = DAG(dag_id="test") +""" + tree = ast.parse(code) + + self.checker.visit(tree) + + assert "my_dag" in self.checker.dag_detector.dag_instances + + def test_visit_assign__tracks_varying_variable(self): + """When assigning a runtime-varying value, track the variable.""" + code = """ +from datetime import datetime +current_time = datetime.now() +""" + tree = ast.parse(code) + + self.checker.visit(tree) + + assert "current_time" in self.checker.varying_vars + line, source = self.checker.varying_vars["current_time"] + assert "datetime.now()" in source + + def test_visit_assign__warns_on_dag_with_varying_value(self): + """Warn when DAG constructor uses runtime-varying values.""" + code = """ +from airflow import DAG +from datetime import datetime +dag = DAG(dag_id=f"dag_{datetime.now()}") +""" + tree = ast.parse(code) + + self.checker.visit(tree) + + assert len(self.checker.warnings) > 0 + assert any("DAG constructor" in w.message for w in self.checker.warnings) + + def test_visit_call__detects_task_in_dag_context(self): + """Detect task creation inside DAG with block.""" + code = """ +from airflow import DAG +from airflow.operators.python import PythonOperator +from datetime import datetime + +with DAG(dag_id="test") as dag: + task = PythonOperator(task_id=f"task_{datetime.now()}") # !problem +""" + tree = ast.parse(code) + + self.checker.visit(tree) + + assert len(self.checker.warnings) == 1 + assert any("PythonOperator" in w.code for w in self.checker.warnings) + + def test_visit_for__warns_on_varying_range(self): + """Warn when for-loop range is runtime-varying.""" + code = """ +from airflow import DAG +from airflow.operators.bash import BashOperator +from datetime import datetime + +with DAG( + dag_id=dag_id, + schedule_interval='@daily', +) as dag: + for i in [datetime.now(), "3"]: + task = BashOperator( + task_id='print_bash_hello_{i}', + bash_command=f'echo "Hello from DAG {i}!"', # !problem + dag=dag, + ) +""" + tree = ast.parse(code) + + self.checker.visit(tree) + warnings = self.checker.warnings + + assert len(warnings) == 1 + assert any("BashOperator" in w.code for w in warnings) + + def test_check_and_warn__creates_warning_for_varying_arg(self): + """Create a warning when detecting varying positional argument.""" + code = 'DAG(f"dag_{datetime.now()}")' + call_node = ast.parse(code, mode="eval").body + self.checker.from_imports["DAG"] = ("airflow", "DAG") + self.checker.imports["datetime"] = "datetime" + + self.checker._check_and_warn(call_node, WarningContext.DAG_CONSTRUCTOR) + + assert len(self.checker.warnings) == 1 + warning = self.checker.warnings[0] + assert WarningContext.DAG_CONSTRUCTOR.value in warning.message + assert "datetime.now()" in warning.code + + def test_check_and_warn__creates_warning_for_varying_kwarg(self): + """Create a warning when detecting varying keyword argument""" + code = "DAG(dag_id=datetime.now())" + call_node = ast.parse(code, mode="eval").body + self.checker.from_imports["DAG"] = ("airflow", "DAG") + self.checker.imports["datetime"] = "datetime" + + self.checker._check_and_warn(call_node, WarningContext.TASK_CONSTRUCTOR) + + assert len(self.checker.warnings) == 1 + warning = self.checker.warnings[0] + assert "dag_id" in warning.code + assert "datetime.now()" in warning.code + + +class TestIntegrationScenarios: + """ + Integration tests showing real-world Airflow patterns. + Demonstrate actual use cases and why they're problematic. + """ + + def _check_code(self, code: str) -> list[RuntimeVaryingValueWarning]: + """Helper to parse and check code""" + tree = ast.parse(code) + checker = AirflowRuntimeVaryingValueChecker() + checker.visit(tree) + return checker.warnings + + def test_antipattern__dynamic_dag_id_with_timestamp(self): + """ANTI-PATTERN: Using timestamps in DAG IDs.""" + code = """ +from airflow import DAG +from datetime import datetime + +# BAD: DAG ID includes current timestamp +dag = DAG(dag_id=f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}") +""" + warnings = self._check_code(code) + + assert len(warnings) == 1 + assert any("datetime.now()" in w.code for w in warnings) + + def test_define_dag_with_block(self): + code = """ +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.operators.bash import BashOperator +import uuid +from datetime import datetime as dt + +start_date = dt.now() + +default_args = { + 'start_date': start_date +} + +with DAG( + dag_id="my_dag", + default_args=default_args # !problem +) as dag, Test(default_args=default_args) as test: + task1 = PythonOperator( + task_id=f"task_{uuid.uuid4()}", # !problem + python_callable=lambda: None + ) + + task2 = BashOperator( + task_id=f"task_{dt.now()}" # !problem + ) + + task3 = BashOperator( + task_id="task_for_normal_case" + ) + + task1 >> task2 >> task3 +""" + warnings = self._check_code(code) + + assert len(warnings) == 3 + assert any("uuid.uuid4()" in w.code for w in warnings) + assert any("dt.now()" in w.code for w in warnings) + assert any("default_args" in w.code for w in warnings) + + def test_correct_pattern__static_dag_with_runtime_context(self): + code = """ +from airflow import DAG +from airflow.models.param import Param +from airflow.operators.bash import BashOperator +from datetime import datetime +from mydule import test_function + +import time + +# time 모듈 동적 함수들 +current_timestamp = time.time() +local_time = time.localtime() + +dag = DAG( + dag_id='time_module_dag', + start_date=datetime(2024, 1, 1), + schedule_interval='@daily', + params={ + "execution_date": Param( + default=f"manual_run_{datetime.now().isoformat()}", # !problem + description="Unique identifier for the run", + type="string", + minLength=10, + ) + }, +) + +b = test_function(time=current_timestamp) + +task1 = BashOperator( + task_id='time_task', + bash_command=f'echo "Timestamp: {current_timestamp}"', # !problem + dag=dag, +) + +task2 = BashOperator( + task_id='time_task2', + dag=dag, +) + +task1 >> task2 +""" + warnings = self._check_code(code) + + assert len(warnings) == 2 + assert any("Param(default=f'manual_run_{datetime.now().isoformat()}'" in w.code for w in warnings) + assert any("current_timestamp" in w.code for w in warnings) + + def test_dag_decorator_pattern__currently_not_detected(self): + """ + PATTERN: @dag decorator usage + """ + code = """ +from airflow.decorators import dag, task +from datetime import datetime + +@dag(dag_id=f"my_dag_{datetime.now()}") # !problem +def my_dag_function(): + + @task + def my_task(): + return "hello" + + my_task() +""" + warnings = self._check_code(code) + assert len(warnings) == 1 + + def test_dag_generated_in_for_or_function_statement(self): + code = """ +from airflow import DAG +from airflow.operators.bash import BashOperator +from datetime import datetime + +def create_dag(dag_id, task_id): + default_args = { + "depends_on_past": False, + "start_date": datetime.now() + } + + with DAG( + dag_id, + default_args=default_args, # !problem + ) as dag: + task1 = BashOperator( + task_id=task_id + ) + + return dag + +for i in [datetime.now(), "3"]: + dag_id = f"dag_{i}_{random.randint(1, 1000)}" + + dag = DAG( + dag_id=dag_id, # !problem + schedule_interval='@daily', + tags=[f"iteration_{i}_{datetime.now().day}"], # !problem + ) + + task1 = BashOperator( + task_id='print_bash_hello', + bash_command=f'echo "Hello from DAG {i}!"', # !problem + dag=dag, + ) + + task2 = BashOperator( + task_id=f'random_task_{random.randint(1, 100)}', # !problem + bash_command='echo "World"', + dag=dag, + ) + + task1 >> task2 +""" + warnings = self._check_code(code) + assert len(warnings) == 4 From 86a8ec60dfee087258b6f6abe65e5535332edf45 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Mon, 15 Dec 2025 15:46:45 +0900 Subject: [PATCH 02/12] fix test --- .../unit/api_fastapi/core_api/routes/public/test_dag_warning.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_warning.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_warning.py index f129f6253f46e..6ab2948e0609d 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_warning.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_warning.py @@ -113,7 +113,6 @@ def test_should_respond_403(self, unauthorized_test_client): def test_get_dag_warnings_bad_request(self, test_client): response = test_client.get("/dagWarnings", params={"warning_type": "invalid"}) response_json = response.json() - print(response_json) assert response.status_code == 422 assert ( response_json["detail"][0]["msg"] From 12c12d0c5c93345ec503dc97af87e7abd89b565f Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Mon, 15 Dec 2025 18:06:09 +0900 Subject: [PATCH 03/12] fix for test --- .../api_fastapi/core_api/openapi/v2-rest-api-generated.yaml | 1 + airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts | 2 +- airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts | 2 +- airflow-ctl/src/airflowctl/api/datamodels/generated.py | 1 + 4 files changed, 4 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 1651927427f96..27bcff0015e73 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -11085,6 +11085,7 @@ components: enum: - asset conflict - non-existent pool + - parsing error title: DagWarningType description: 'Enum for DAG warning types. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index b44c2883a9551..7df6ebd4e17ec 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -3433,7 +3433,7 @@ export const $DagVersionResponse = { export const $DagWarningType = { type: 'string', - enum: ['asset conflict', 'non-existent pool'], + enum: ['asset conflict', 'non-existent pool', 'parsing error'], title: 'DagWarningType', description: `Enum for DAG warning types. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index af14ba786e646..965d1843f44df 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -877,7 +877,7 @@ export type DagVersionResponse = { * This is the set of allowable values for the ``warning_type`` field * in the DagWarning model. */ -export type DagWarningType = 'asset conflict' | 'non-existent pool'; +export type DagWarningType = 'asset conflict' | 'non-existent pool' | 'parsing error'; /** * Backfill collection serializer for responses in dry-run mode. diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index 6472a722397df..d8cf3acde4e8d 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -453,6 +453,7 @@ class DagWarningType(str, Enum): ASSET_CONFLICT = "asset conflict" NON_EXISTENT_POOL = "non-existent pool" + PARSING_ERROR = "parsing error" class DryRunBackfillResponse(BaseModel): From 24e8474540a45bef0df4c484e746bdbf65eef5ba Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Thu, 18 Dec 2025 00:02:02 +0900 Subject: [PATCH 04/12] fix logic --- .../openapi/v2-rest-api-generated.yaml | 2 +- .../src/airflow/dag_processing/collection.py | 6 +-- .../src/airflow/dag_processing/manager.py | 7 ++- .../src/airflow/dag_processing/processor.py | 4 ++ airflow-core/src/airflow/models/dagwarning.py | 2 +- .../ui/openapi-gen/requests/schemas.gen.ts | 2 +- .../ui/openapi-gen/requests/types.gen.ts | 2 +- .../src/airflow/utils/static_checker.py | 44 +++++++++---------- .../routes/public/test_dag_warning.py | 2 +- .../tests/unit/utils/test_static_checker.py | 4 +- .../airflowctl/api/datamodels/generated.py | 2 +- 11 files changed, 40 insertions(+), 37 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 27bcff0015e73..299cbbbf0a254 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -11085,7 +11085,7 @@ components: enum: - asset conflict - non-existent pool - - parsing error + - runtime varying value title: DagWarningType description: 'Enum for DAG warning types. diff --git a/airflow-core/src/airflow/dag_processing/collection.py b/airflow-core/src/airflow/dag_processing/collection.py index 25f554982cb53..72404b58c7c7b 100644 --- a/airflow-core/src/airflow/dag_processing/collection.py +++ b/airflow-core/src/airflow/dag_processing/collection.py @@ -259,7 +259,7 @@ def _sync_dag_perms(dag: LazyDeserializedDAG, session: Session): def _update_dag_warnings( dag_ids: list[str], warnings: set[DagWarning], - warning_types: tuple[DagWarningType, DagWarningType], + warning_types: tuple[DagWarningType, ...], session: Session, ): from airflow.models.dagwarning import DagWarning @@ -374,9 +374,9 @@ def update_dag_parsing_results_in_db( warnings: set[DagWarning], session: Session, *, - warning_types: tuple[DagWarningType, DagWarningType] = ( + warning_types: tuple[DagWarningType, ...] = ( DagWarningType.NONEXISTENT_POOL, - DagWarningType.PARSING_ERROR, + DagWarningType.RUNTIME_VARYING_VALUE, ), files_parsed: set[tuple[str, str]] | None = None, ): diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 0111a41c7725b..07279f72b267a 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -1211,9 +1211,8 @@ def process_parse_results( files_parsed = {(bundle_name, relative_fileloc)} files_parsed.update(import_errors.keys()) - warning = parsing_result.warnings - if parsing_result.warnings and isinstance(parsing_result.warnings[0], dict): - warning = [DagWarning(**warn) for warn in parsing_result.warnings] + if (warnings := parsing_result.warnings) and isinstance(warnings[0], dict): + warnings = [DagWarning(**warn) for warn in warnings] update_dag_parsing_results_in_db( bundle_name=bundle_name, @@ -1221,7 +1220,7 @@ def process_parse_results( dags=parsing_result.serialized_dags, import_errors=import_errors, parse_duration=run_duration, - warnings=set(warning or []), + warnings=set(warnings or []), session=session, files_parsed=files_parsed, ) diff --git a/airflow-core/src/airflow/dag_processing/processor.py b/airflow-core/src/airflow/dag_processing/processor.py index 298f1eebe41e5..68b7669a19d34 100644 --- a/airflow-core/src/airflow/dag_processing/processor.py +++ b/airflow-core/src/airflow/dag_processing/processor.py @@ -510,6 +510,10 @@ def start( # type: ignore[override] client: Client, **kwargs, ) -> Self: + logger = kwargs["logger"] + + _pre_import_airflow_modules(os.fspath(path), logger) + proc: Self = super().start(target=target, client=client, **kwargs) proc.had_callbacks = bool(callbacks) # Track if this process had callbacks proc._on_child_started(callbacks, path, bundle_path, bundle_name) diff --git a/airflow-core/src/airflow/models/dagwarning.py b/airflow-core/src/airflow/models/dagwarning.py index 5ab5c0f904c1d..032f0fa6f6adf 100644 --- a/airflow-core/src/airflow/models/dagwarning.py +++ b/airflow-core/src/airflow/models/dagwarning.py @@ -103,4 +103,4 @@ class DagWarningType(str, Enum): ASSET_CONFLICT = "asset conflict" NONEXISTENT_POOL = "non-existent pool" - PARSING_ERROR = "parsing error" + RUNTIME_VARYING_VALUE = "runtime varying value" diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 7df6ebd4e17ec..20984479a8439 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -3433,7 +3433,7 @@ export const $DagVersionResponse = { export const $DagWarningType = { type: 'string', - enum: ['asset conflict', 'non-existent pool', 'parsing error'], + enum: ['asset conflict', 'non-existent pool', 'runtime varying value'], title: 'DagWarningType', description: `Enum for DAG warning types. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 965d1843f44df..478be8af1c0f1 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -877,7 +877,7 @@ export type DagVersionResponse = { * This is the set of allowable values for the ``warning_type`` field * in the DagWarning model. */ -export type DagWarningType = 'asset conflict' | 'non-existent pool' | 'parsing error'; +export type DagWarningType = 'asset conflict' | 'non-existent pool' | 'runtime varying value'; /** * Backfill collection serializer for responses in dry-run mode. diff --git a/airflow-core/src/airflow/utils/static_checker.py b/airflow-core/src/airflow/utils/static_checker.py index 017bd5370e972..62e5d4943e1b6 100644 --- a/airflow-core/src/airflow/utils/static_checker.py +++ b/airflow-core/src/airflow/utils/static_checker.py @@ -35,8 +35,8 @@ class RuntimeVaryingValueWarning: class WarningContext(str, Enum): """Context types for warnings.""" - TASK_CONSTRUCTOR = "TASK constructor" - DAG_CONSTRUCTOR = "DAG constructor" + TASK_CONSTRUCTOR = "Task constructor" + DAG_CONSTRUCTOR = "Dag constructor" RUNTIME_VARYING_CALLS = [ @@ -211,11 +211,11 @@ def _is_runtime_varying_name_call(self, func: ast.Name) -> bool: return False -class DAGTaskDetector: +class DagTaskDetector: """ - Detector dedicated to identifying DAG and Task constructors. + Detector dedicated to identifying Dag and Task constructors. - This detector identifies when code is creating DAG or Task objects + This detector identifies when code is creating Dag or Task objects in Airflow. It needs to handle both traditional class instantiation and decorator styles. """ @@ -282,7 +282,7 @@ class AirflowRuntimeVaryingValueChecker(ast.NodeVisitor): Main responsibilities: - Traverse AST and visit nodes - - Detect DAG/Task creation + - Detect Dag/Task creation - Track runtime-varying values and generate warnings """ @@ -294,7 +294,7 @@ def __init__(self): # Helper objects self.value_analyzer = RuntimeVaryingValueAnalyzer(self.varying_vars, self.imports, self.from_imports) - self.dag_detector = DAGTaskDetector(self.from_imports) + self.dag_detector = DagTaskDetector(self.from_imports) def visit_Import(self, node: ast.Import): """Process import statements.""" @@ -314,13 +314,13 @@ def visit_Assign(self, node: ast.Assign): Process variable assignments. Checks: - 1. DAG instance assignment + 1. Dag instance assignment 2. Task instance assignment 3. Runtime-varying value assignment """ value = node.value - # DAG constructor + # Dag constructor if isinstance(value, ast.Call) and self.dag_detector.is_dag_constructor(value): self._register_dag_instances(node.targets) self._check_and_warn(value, WarningContext.DAG_CONSTRUCTOR) @@ -337,7 +337,7 @@ def visit_Call(self, node: ast.Call): """ Process function calls. - Check not assign but just call the function or dag definition via decorator. + Check not assign but just call the function or Dag definition via decorator. """ if self.dag_detector.is_dag_constructor(node): self._check_and_warn(node, WarningContext.DAG_CONSTRUCTOR) @@ -369,15 +369,15 @@ def visit_With(self, node: ast.With): """ Process with statements. - Detect DAG context manager. + Detect Dag context manager. """ is_with_dag_context = False for item in node.items: - # check if the dag instance exists in with context + # check if the Dag instance exists in with context self.visit(item) if isinstance(item.context_expr, ast.Call): if self.dag_detector.is_dag_constructor(item.context_expr): - # check the value defined in with statement to detect entering DAG with block + # check the value defined in with statement to detect entering Dag with block is_with_dag_context = True if is_with_dag_context: @@ -386,11 +386,11 @@ def visit_With(self, node: ast.With): for body in node.body: self.visit(body) - # Exit DAG with block + # Exit Dag with block self.dag_detector.exit_dag_context() def _register_dag_instances(self, targets: list): - """Register DAG instance variable names.""" + """Register Dag instance variable names.""" for target in targets: if isinstance(target, ast.Name): self.dag_detector.register_dag_instance(target.id) @@ -419,7 +419,7 @@ def _check_and_warn(self, call: ast.Call, context: WarningContext): def _get_warning_message(self, context: WarningContext): """Get appropriate warning message based on context.""" if self.dag_detector.is_in_dag_context and context == WarningContext.TASK_CONSTRUCTOR: - return "Don't use runtime-varying values as function arguments within with DAG block" + return "Don't use runtime-varying values as function arguments within with Dag block" return f"Don't use runtime-varying value as argument in {context.value}" def format_warnings(self) -> str | None: @@ -428,8 +428,8 @@ def format_warnings(self) -> str | None: return None lines = [ - "⚠️ This DAG uses runtime-variable values in DAG construction.", - "⚠️ It causes the DAG version to increase as values change on every DAG parse.", + "⚠️ This Dag uses runtime-variable values in Dag construction.", + "⚠️ It causes the Dag version to increase as values change on every Dag parse.", "", ] for w in self.warnings: @@ -441,7 +441,7 @@ def format_warnings(self) -> str | None: return "\n".join(lines) -def check_dag_file_static(file_path): +def check_dag_file_static(file_path) -> str | None: try: parsed = ast.parse(Path(file_path).read_bytes()) except Exception: @@ -452,8 +452,8 @@ def check_dag_file_static(file_path): return checker.format_warnings() -def get_warning_dag_format_dict(warning_statement, dag_ids): - """Convert warning statement to DAG warning format.""" +def get_warning_dag_format_dict(warning_statement: str | None, dag_ids): + """Convert warning statement to Dag warning format.""" from airflow.models.dagwarning import DagWarningType if not warning_statement: @@ -461,7 +461,7 @@ def get_warning_dag_format_dict(warning_statement, dag_ids): return [ { "dag_id": dag_id, - "warning_type": DagWarningType.PARSING_ERROR.value, + "warning_type": DagWarningType.RUNTIME_VARYING_VALUE.value, "message": warning_statement, } for dag_id in dag_ids diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_warning.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_warning.py index 6ab2948e0609d..0d9eb862e29f6 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_warning.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_warning.py @@ -116,5 +116,5 @@ def test_get_dag_warnings_bad_request(self, test_client): assert response.status_code == 422 assert ( response_json["detail"][0]["msg"] - == "Input should be 'asset conflict', 'non-existent pool' or 'parsing error'" + == "Input should be 'asset conflict', 'non-existent pool' or 'runtime varying value'" ) diff --git a/airflow-core/tests/unit/utils/test_static_checker.py b/airflow-core/tests/unit/utils/test_static_checker.py index ff3f7da9e7741..cda8a18b85dc9 100644 --- a/airflow-core/tests/unit/utils/test_static_checker.py +++ b/airflow-core/tests/unit/utils/test_static_checker.py @@ -20,7 +20,7 @@ from airflow.utils.static_checker import ( AirflowRuntimeVaryingValueChecker, - DAGTaskDetector, + DagTaskDetector, RuntimeVaryingValueAnalyzer, RuntimeVaryingValueWarning, WarningContext, @@ -274,7 +274,7 @@ class TestDAGTaskDetector: def setup_method(self): """Each test gets a fresh detector instance""" self.from_imports = {} - self.detector = DAGTaskDetector(self.from_imports) + self.detector = DagTaskDetector(self.from_imports) def test_is_dag_constructor__detects_traditional_dag_call_uppercase(self): """ diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index d8cf3acde4e8d..e00fb0cda0ef7 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -453,7 +453,7 @@ class DagWarningType(str, Enum): ASSET_CONFLICT = "asset conflict" NON_EXISTENT_POOL = "non-existent pool" - PARSING_ERROR = "parsing error" + RUNTIME_VARYING_VALUE = "runtime varying value" class DryRunBackfillResponse(BaseModel): From cd63a38be7e2aac2ab8082eef31e7f8c823dbc70 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Thu, 18 Dec 2025 10:17:51 +0900 Subject: [PATCH 05/12] fix test --- airflow-core/tests/unit/utils/test_static_checker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-core/tests/unit/utils/test_static_checker.py b/airflow-core/tests/unit/utils/test_static_checker.py index cda8a18b85dc9..f66fd4acb73c8 100644 --- a/airflow-core/tests/unit/utils/test_static_checker.py +++ b/airflow-core/tests/unit/utils/test_static_checker.py @@ -466,8 +466,8 @@ def test_visit_assign__warns_on_dag_with_varying_value(self): self.checker.visit(tree) - assert len(self.checker.warnings) > 0 - assert any("DAG constructor" in w.message for w in self.checker.warnings) + assert len(self.checker.warnings) == 1 + assert any("Dag constructor" in w.message for w in self.checker.warnings) def test_visit_call__detects_task_in_dag_context(self): """Detect task creation inside DAG with block.""" From c9dde4636f3df7c0d05898cdb55b76d747d0acb3 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Tue, 23 Dec 2025 19:34:54 +0900 Subject: [PATCH 06/12] add config and fix logics --- .../src/airflow/config_templates/config.yml | 17 +++ .../src/airflow/dag_processing/processor.py | 16 ++- .../src/airflow/utils/static_checker.py | 118 ++++++++++++------ .../unit/dag_processing/test_processor.py | 35 ++++++ .../tests/unit/dags/test_dag_static_check.py | 97 ++++++++++++++ .../tests/unit/utils/test_static_checker.py | 36 ++++-- 6 files changed, 263 insertions(+), 56 deletions(-) create mode 100644 airflow-core/tests/unit/dags/test_dag_static_check.py diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index dac144f54cbf0..23a368f22b91b 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -2645,6 +2645,23 @@ dag_processor: type: boolean example: ~ default: "True" + static_check_level: + description: | + Controls the behavior of static checker performed before DAG parsing in the dag-processor. + Static checks detect potential issues such as runtime-varying values in DAG/Task constructors + that could cause DAG version inflation. + + * ``off``: Disables static checks entirely. No errors or warnings are generated. + * ``warning``: Performs static checks. DAGs load normally but warnings are displayed in the UI + when issues are detected. + * ``error``: Treats static check failures as DAG import errors, preventing the DAG from loading. + + Default is "warning" to alert users of potential issues without blocking DAG execution. + version_added: 3.2.0 + type: string + example: ~ + default: "warning" + profiling: description: | Configuration for memory profiling in Airflow component. diff --git a/airflow-core/src/airflow/dag_processing/processor.py b/airflow-core/src/airflow/dag_processing/processor.py index 68b7669a19d34..c2e165ab688d0 100644 --- a/airflow-core/src/airflow/dag_processing/processor.py +++ b/airflow-core/src/airflow/dag_processing/processor.py @@ -70,7 +70,7 @@ from airflow.serialization.serialized_objects import DagSerialization, LazyDeserializedDAG from airflow.utils.file import iter_airflow_imports from airflow.utils.state import TaskInstanceState -from airflow.utils.static_checker import check_dag_file_static, get_warning_dag_format_dict +from airflow.utils.static_checker import check_dag_file_static if TYPE_CHECKING: from structlog.typing import FilteringBoundLogger @@ -207,7 +207,15 @@ def _parse_file_entrypoint(): def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileParsingResult | None: # TODO: Set known_pool names on DagBag! - warning_statement = check_dag_file_static(os.fspath(msg.file)) + static_check_result = check_dag_file_static(os.fspath(msg.file)) + + if static_check_error_dict := static_check_result.get_error_format_dict(msg.file, msg.bundle_path): + # If static check level is error, we shouldn't parse the Dags and return the result early + return DagFileParsingResult( + fileloc=msg.file, + serialized_dags=[], + import_errors=static_check_error_dict, + ) bag = BundleDagBag( dag_folder=msg.file, @@ -217,7 +225,7 @@ def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileP ) if msg.callback_requests: - # If the request is for callback, we shouldn't serialize the DAGs + # If the request is for callback, we shouldn't serialize the Dags _execute_callbacks(bag, msg.callback_requests, log) return None @@ -227,7 +235,7 @@ def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileP fileloc=msg.file, serialized_dags=serialized_dags, import_errors=bag.import_errors, - warnings=get_warning_dag_format_dict(warning_statement, dag_ids=bag.dag_ids), + warnings=static_check_result.get_warning_dag_format_dict(bag.dag_ids), ) return result diff --git a/airflow-core/src/airflow/utils/static_checker.py b/airflow-core/src/airflow/utils/static_checker.py index 62e5d4943e1b6..38c1a6f959898 100644 --- a/airflow-core/src/airflow/utils/static_checker.py +++ b/airflow-core/src/airflow/utils/static_checker.py @@ -22,6 +22,68 @@ from pathlib import Path +class StaticCheckerResult: + """ + Represents the result of static analysis on a DAG file. + + Stores detected warnings and formats them appropriately based on the configured check level + (warning or error). + """ + + def __init__(self, check_level): + self.check_level: str = check_level + self.warnings: list[RuntimeVaryingValueWarning] = [] + self.runtime_varying_values: dict = {} + + def format_warnings(self) -> str | None: + """Return formatted string of warning list.""" + if not self.warnings: + return None + + lines = [ + "⚠️ This Dag uses runtime-variable values in Dag construction.", + "⚠️ It causes the Dag version to increase as values change on every Dag parse.", + "", + ] + for w in self.warnings: + lines.append(f"Line {w.line}, Col {w.col}") + lines.append(f"Code: {w.code}") + lines.append(f"Issue: {w.message}") + lines.append("") + + if self.runtime_varying_values: + lines.append("️⚠️ Don't use the variables as arguments in DAG/Task constructors:") + # Sort by line number + sorted_vars = sorted(self.runtime_varying_values.items(), key=lambda x: x[1][0]) + for var_name, (line, source) in sorted_vars: + lines.append(f" Line {line}: '{var_name}' related '{source}'") + lines.append("") + + return "\n".join(lines) + + def get_warning_dag_format_dict(self, dag_ids): + """Convert warning statement to Dag warning format.""" + from airflow.models.dagwarning import DagWarningType + + if not self.warnings or self.check_level != "warning": + return [] + return [ + { + "dag_id": dag_id, + "warning_type": DagWarningType.RUNTIME_VARYING_VALUE.value, + "message": self.format_warnings(), + } + for dag_id in dag_ids + ] + + def get_error_format_dict(self, file_path, bundle_path): + if not self.warnings or self.check_level != "error": + return None + + relative_file_path = str(Path(file_path).relative_to(bundle_path)) if bundle_path else file_path + return {relative_file_path: self.format_warnings()} + + @dataclass class RuntimeVaryingValueWarning: """Warning information for runtime-varying value detection.""" @@ -52,6 +114,10 @@ class WarningContext(str, Enum): ("random", "uniform"), ("uuid", "uuid4"), ("uuid", "uuid1"), + ("pendulum", "now"), + ("pendulum", "today"), + ("pendulum", "yesterday"), + ("pendulum", "tomorrow"), ] @@ -286,11 +352,12 @@ class AirflowRuntimeVaryingValueChecker(ast.NodeVisitor): - Track runtime-varying values and generate warnings """ - def __init__(self): - self.warnings: list[RuntimeVaryingValueWarning] = [] + def __init__(self, check_level="warn"): + self.static_check_result: StaticCheckerResult = StaticCheckerResult(check_level=check_level) self.imports: dict[str, str] = {} self.from_imports: dict[str, tuple[str, str]] = {} self.varying_vars: dict[str, tuple[int, str]] = {} + self.check_level = check_level # Helper objects self.value_analyzer = RuntimeVaryingValueAnalyzer(self.varying_vars, self.imports, self.from_imports) @@ -407,7 +474,7 @@ def _check_and_warn(self, call: ast.Call, context: WarningContext): """Check function call arguments and generate warnings.""" varying_source = self.value_analyzer.get_varying_source(call) if varying_source: - self.warnings.append( + self.static_check_result.warnings.append( RuntimeVaryingValueWarning( line=call.lineno, col=call.col_offset, @@ -422,47 +489,20 @@ def _get_warning_message(self, context: WarningContext): return "Don't use runtime-varying values as function arguments within with Dag block" return f"Don't use runtime-varying value as argument in {context.value}" - def format_warnings(self) -> str | None: - """Return formatted string of warning list.""" - if not self.warnings: - return None - lines = [ - "⚠️ This Dag uses runtime-variable values in Dag construction.", - "⚠️ It causes the Dag version to increase as values change on every Dag parse.", - "", - ] - for w in self.warnings: - lines.append(f"Line {w.line}, Col {w.col}") - lines.append(f"Code: {w.code}") - lines.append(f"Issue: {w.message}") - lines.append("") - - return "\n".join(lines) +def check_dag_file_static(file_path) -> StaticCheckerResult: + from airflow.configuration import conf + check_level = conf.get("dag_processor", "static_check_level").lower() + if check_level == "off" or check_level not in ("warning", "error"): + return StaticCheckerResult(check_level=check_level) -def check_dag_file_static(file_path) -> str | None: try: parsed = ast.parse(Path(file_path).read_bytes()) except Exception: - return None + return StaticCheckerResult(check_level=check_level) - checker = AirflowRuntimeVaryingValueChecker() + checker = AirflowRuntimeVaryingValueChecker(check_level) checker.visit(parsed) - return checker.format_warnings() - - -def get_warning_dag_format_dict(warning_statement: str | None, dag_ids): - """Convert warning statement to Dag warning format.""" - from airflow.models.dagwarning import DagWarningType - - if not warning_statement: - return [] - return [ - { - "dag_id": dag_id, - "warning_type": DagWarningType.RUNTIME_VARYING_VALUE.value, - "message": warning_statement, - } - for dag_id in dag_ids - ] + checker.static_check_result.runtime_varying_values = checker.varying_vars + return checker.static_check_result diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py b/airflow-core/tests/unit/dag_processing/test_processor.py index 960b8440902de..b2b900046e27c 100644 --- a/airflow-core/tests/unit/dag_processing/test_processor.py +++ b/airflow-core/tests/unit/dag_processing/test_processor.py @@ -609,6 +609,41 @@ def fake_collect_dags(self, *args, **kwargs): assert called is True +@conf_vars({("dag_processor", "static_check_level"): "error"}) +def test_parse_file_static_check_with_error(): + result = _parse_file( + DagFileParseRequest( + file=f"{TEST_DAG_FOLDER}/test_dag_static_check.py", + bundle_path=TEST_DAG_FOLDER, + bundle_name="testing", + ), + log=structlog.get_logger(), + ) + + assert result.serialized_dags == [] + assert list(result.import_errors.keys()) == ["test_dag_static_check.py"] + assert result.warnings is None + assert "Don't use the variables as arguments" in next(iter(result.import_errors.values())) + + +def test_parse_file_static_check_with_default_warning(): + result = _parse_file( + DagFileParseRequest( + file=f"{TEST_DAG_FOLDER}/test_dag_static_check.py", + bundle_path=TEST_DAG_FOLDER, + bundle_name="testing", + ), + log=structlog.get_logger(), + ) + + assert len(result.serialized_dags) > 0 + assert len(result.warnings) == len(result.serialized_dags) + assert all( + warning.get("dag_id") and warning.get("warning_type") and warning.get("message") + for warning in result.warnings + ) + + def test_callback_processing_does_not_update_timestamps(session): """Callback processing should not update last_finish_time to prevent stale DAG detection.""" stat = process_parse_results( diff --git a/airflow-core/tests/unit/dags/test_dag_static_check.py b/airflow-core/tests/unit/dags/test_dag_static_check.py new file mode 100644 index 0000000000000..89578c479c32f --- /dev/null +++ b/airflow-core/tests/unit/dags/test_dag_static_check.py @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import random +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.providers.standard.operators.bash import BashOperator +from airflow.providers.standard.operators.python import PythonOperator + + +def print_hello(): + print("Hello from Airflow!") + return "Success" + + +def print_date(): + print(f"Current date: {datetime.now()}") + return datetime.now() + + +def calculate_sum(): + result = sum(range(1, 11)) + print(f"Sum of 1 to 10: {result}") + return result + + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=5), +} + +random_task_id = random.randint(1, 10000) + +for i in range(1, 10): + dag_id = f"dynamic_dag_{i:03d}" + + dag = DAG( + dag_id=dag_id, + default_args=default_args, + description=f"number {i}", + schedule=timedelta(minutes=1), + start_date=datetime(2024, 1, 1), + catchup=False, + is_paused_upon_creation=False, + ) + + task1 = BashOperator( + task_id=f"{str(random_task_id)}", + bash_command=f'echo "Hello from DAG {i}!"', + dag=dag, + ) + + task2 = PythonOperator( + task_id="print_python_hello", + python_callable=print_hello, + dag=dag, + ) + + task3 = PythonOperator( + task_id="print_current_date", + python_callable=print_date, + dag=dag, + ) + + task4 = PythonOperator( + task_id="calculate_sum", + python_callable=calculate_sum, + dag=dag, + ) + + task5 = BashOperator( + task_id="final_task", + bash_command=f'echo "DAG {i} - All tasks completed!"', + dag=dag, + ) + + task1 >> [task2, task3] >> task4 >> task5 diff --git a/airflow-core/tests/unit/utils/test_static_checker.py b/airflow-core/tests/unit/utils/test_static_checker.py index f66fd4acb73c8..a68175071b2db 100644 --- a/airflow-core/tests/unit/utils/test_static_checker.py +++ b/airflow-core/tests/unit/utils/test_static_checker.py @@ -466,8 +466,8 @@ def test_visit_assign__warns_on_dag_with_varying_value(self): self.checker.visit(tree) - assert len(self.checker.warnings) == 1 - assert any("Dag constructor" in w.message for w in self.checker.warnings) + assert len(self.checker.static_check_result.warnings) == 1 + assert any("Dag constructor" in w.message for w in self.checker.static_check_result.warnings) def test_visit_call__detects_task_in_dag_context(self): """Detect task creation inside DAG with block.""" @@ -483,8 +483,8 @@ def test_visit_call__detects_task_in_dag_context(self): self.checker.visit(tree) - assert len(self.checker.warnings) == 1 - assert any("PythonOperator" in w.code for w in self.checker.warnings) + assert len(self.checker.static_check_result.warnings) == 1 + assert any("PythonOperator" in w.code for w in self.checker.static_check_result.warnings) def test_visit_for__warns_on_varying_range(self): """Warn when for-loop range is runtime-varying.""" @@ -507,7 +507,7 @@ def test_visit_for__warns_on_varying_range(self): tree = ast.parse(code) self.checker.visit(tree) - warnings = self.checker.warnings + warnings = self.checker.static_check_result.warnings assert len(warnings) == 1 assert any("BashOperator" in w.code for w in warnings) @@ -521,8 +521,8 @@ def test_check_and_warn__creates_warning_for_varying_arg(self): self.checker._check_and_warn(call_node, WarningContext.DAG_CONSTRUCTOR) - assert len(self.checker.warnings) == 1 - warning = self.checker.warnings[0] + assert len(self.checker.static_check_result.warnings) == 1 + warning = self.checker.static_check_result.warnings[0] assert WarningContext.DAG_CONSTRUCTOR.value in warning.message assert "datetime.now()" in warning.code @@ -535,8 +535,8 @@ def test_check_and_warn__creates_warning_for_varying_kwarg(self): self.checker._check_and_warn(call_node, WarningContext.TASK_CONSTRUCTOR) - assert len(self.checker.warnings) == 1 - warning = self.checker.warnings[0] + assert len(self.checker.static_check_result.warnings) == 1 + warning = self.checker.static_check_result.warnings[0] assert "dag_id" in warning.code assert "datetime.now()" in warning.code @@ -552,7 +552,7 @@ def _check_code(self, code: str) -> list[RuntimeVaryingValueWarning]: tree = ast.parse(code) checker = AirflowRuntimeVaryingValueChecker() checker.visit(tree) - return checker.warnings + return checker.static_check_result.warnings def test_antipattern__dynamic_dag_id_with_timestamp(self): """ANTI-PATTERN: Using timestamps in DAG IDs.""" @@ -682,6 +682,7 @@ def test_dag_generated_in_for_or_function_statement(self): from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime +import pendulum def create_dag(dag_id, task_id): default_args = { @@ -699,13 +700,16 @@ def create_dag(dag_id, task_id): return dag +now = pendulum.now() +seoul = now.in_timezone('Asia/Seoul') + for i in [datetime.now(), "3"]: dag_id = f"dag_{i}_{random.randint(1, 1000)}" dag = DAG( dag_id=dag_id, # !problem schedule_interval='@daily', - tags=[f"iteration_{i}_{datetime.now().day}"], # !problem + tags=[f"iteration_{i}"], ) task1 = BashOperator( @@ -720,7 +724,13 @@ def create_dag(dag_id, task_id): dag=dag, ) - task1 >> task2 + task3 = BashOperator( + task_id=f'random_task_in_{seoul}', # !problem + bash_command='echo "World"', + dag=dag, + ) + + task1 >> task2 >> task3 """ warnings = self._check_code(code) - assert len(warnings) == 4 + assert len(warnings) == 5 From 5369cd8796c8556251be2718b6c694c136c03df5 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Wed, 24 Dec 2025 06:24:07 +0900 Subject: [PATCH 07/12] fix module static checker -> dag stability checker --- .../src/airflow/config_templates/config.yml | 11 +++++------ .../src/airflow/dag_processing/processor.py | 12 ++++++------ ...tatic_checker.py => dag_stability_checker.py} | 16 +++++++++------- .../tests/unit/dag_processing/test_processor.py | 2 +- ..._checker.py => test_dag_stability_checker.py} | 2 +- 5 files changed, 22 insertions(+), 21 deletions(-) rename airflow-core/src/airflow/utils/{static_checker.py => dag_stability_checker.py} (97%) rename airflow-core/tests/unit/utils/{test_static_checker.py => test_dag_stability_checker.py} (99%) diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 23a368f22b91b..4a90e7e015a0e 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -2645,16 +2645,15 @@ dag_processor: type: boolean example: ~ default: "True" - static_check_level: + dag_stability_check_level: description: | - Controls the behavior of static checker performed before DAG parsing in the dag-processor. + Controls the behavior of dag stability checker performed before DAG parsing in the dag-processor. Static checks detect potential issues such as runtime-varying values in DAG/Task constructors that could cause DAG version inflation. - * ``off``: Disables static checks entirely. No errors or warnings are generated. - * ``warning``: Performs static checks. DAGs load normally but warnings are displayed in the UI - when issues are detected. - * ``error``: Treats static check failures as DAG import errors, preventing the DAG from loading. + * ``off``: Disables dag stability checks entirely. No errors or warnings are generated. + * ``warning``: DAGs load normally but warnings are displayed in the UI when issues are detected. + * ``error``: Treats dag stability failures as DAG import errors, preventing the DAG from loading. Default is "warning" to alert users of potential issues without blocking DAG execution. version_added: 3.2.0 diff --git a/airflow-core/src/airflow/dag_processing/processor.py b/airflow-core/src/airflow/dag_processing/processor.py index c2e165ab688d0..03738ef70ef3f 100644 --- a/airflow-core/src/airflow/dag_processing/processor.py +++ b/airflow-core/src/airflow/dag_processing/processor.py @@ -68,9 +68,9 @@ from airflow.sdk.execution_time.supervisor import WatchedSubprocess from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance, _send_error_email_notification from airflow.serialization.serialized_objects import DagSerialization, LazyDeserializedDAG +from airflow.utils.dag_stability_checker import check_dag_file_stability from airflow.utils.file import iter_airflow_imports from airflow.utils.state import TaskInstanceState -from airflow.utils.static_checker import check_dag_file_static if TYPE_CHECKING: from structlog.typing import FilteringBoundLogger @@ -207,14 +207,14 @@ def _parse_file_entrypoint(): def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileParsingResult | None: # TODO: Set known_pool names on DagBag! - static_check_result = check_dag_file_static(os.fspath(msg.file)) + stability_check_result = check_dag_file_stability(os.fspath(msg.file)) - if static_check_error_dict := static_check_result.get_error_format_dict(msg.file, msg.bundle_path): - # If static check level is error, we shouldn't parse the Dags and return the result early + if stability_check_error_dict := stability_check_result.get_error_format_dict(msg.file, msg.bundle_path): + # If Dag stability check level is error, we shouldn't parse the Dags and return the result early return DagFileParsingResult( fileloc=msg.file, serialized_dags=[], - import_errors=static_check_error_dict, + import_errors=stability_check_error_dict, ) bag = BundleDagBag( @@ -235,7 +235,7 @@ def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileP fileloc=msg.file, serialized_dags=serialized_dags, import_errors=bag.import_errors, - warnings=static_check_result.get_warning_dag_format_dict(bag.dag_ids), + warnings=stability_check_result.get_warning_dag_format_dict(bag.dag_ids), ) return result diff --git a/airflow-core/src/airflow/utils/static_checker.py b/airflow-core/src/airflow/utils/dag_stability_checker.py similarity index 97% rename from airflow-core/src/airflow/utils/static_checker.py rename to airflow-core/src/airflow/utils/dag_stability_checker.py index 38c1a6f959898..2046ddfecf89a 100644 --- a/airflow-core/src/airflow/utils/static_checker.py +++ b/airflow-core/src/airflow/utils/dag_stability_checker.py @@ -22,9 +22,9 @@ from pathlib import Path -class StaticCheckerResult: +class DagStabilityCheckerResult: """ - Represents the result of static analysis on a DAG file. + Represents the result of stability analysis on a DAG file. Stores detected warnings and formats them appropriately based on the configured check level (warning or error). @@ -353,7 +353,9 @@ class AirflowRuntimeVaryingValueChecker(ast.NodeVisitor): """ def __init__(self, check_level="warn"): - self.static_check_result: StaticCheckerResult = StaticCheckerResult(check_level=check_level) + self.static_check_result: DagStabilityCheckerResult = DagStabilityCheckerResult( + check_level=check_level + ) self.imports: dict[str, str] = {} self.from_imports: dict[str, tuple[str, str]] = {} self.varying_vars: dict[str, tuple[int, str]] = {} @@ -490,17 +492,17 @@ def _get_warning_message(self, context: WarningContext): return f"Don't use runtime-varying value as argument in {context.value}" -def check_dag_file_static(file_path) -> StaticCheckerResult: +def check_dag_file_stability(file_path) -> DagStabilityCheckerResult: from airflow.configuration import conf - check_level = conf.get("dag_processor", "static_check_level").lower() + check_level = conf.get("dag_processor", "dag_stability_check_level").lower() if check_level == "off" or check_level not in ("warning", "error"): - return StaticCheckerResult(check_level=check_level) + return DagStabilityCheckerResult(check_level=check_level) try: parsed = ast.parse(Path(file_path).read_bytes()) except Exception: - return StaticCheckerResult(check_level=check_level) + return DagStabilityCheckerResult(check_level=check_level) checker = AirflowRuntimeVaryingValueChecker(check_level) checker.visit(parsed) diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py b/airflow-core/tests/unit/dag_processing/test_processor.py index b2b900046e27c..bf65c03238a1b 100644 --- a/airflow-core/tests/unit/dag_processing/test_processor.py +++ b/airflow-core/tests/unit/dag_processing/test_processor.py @@ -609,7 +609,7 @@ def fake_collect_dags(self, *args, **kwargs): assert called is True -@conf_vars({("dag_processor", "static_check_level"): "error"}) +@conf_vars({("dag_processor", "dag_stability_check_level"): "error"}) def test_parse_file_static_check_with_error(): result = _parse_file( DagFileParseRequest( diff --git a/airflow-core/tests/unit/utils/test_static_checker.py b/airflow-core/tests/unit/utils/test_dag_stability_checker.py similarity index 99% rename from airflow-core/tests/unit/utils/test_static_checker.py rename to airflow-core/tests/unit/utils/test_dag_stability_checker.py index a68175071b2db..6c066dfa74efd 100644 --- a/airflow-core/tests/unit/utils/test_static_checker.py +++ b/airflow-core/tests/unit/utils/test_dag_stability_checker.py @@ -18,7 +18,7 @@ import ast -from airflow.utils.static_checker import ( +from airflow.utils.dag_stability_checker import ( AirflowRuntimeVaryingValueChecker, DagTaskDetector, RuntimeVaryingValueAnalyzer, From 32eca376dd01c97bf655a3f4886c6bfed8e518e4 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Fri, 2 Jan 2026 12:10:59 +0900 Subject: [PATCH 08/12] fix config description --- airflow-core/src/airflow/config_templates/config.yml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 4a90e7e015a0e..09dedccaeb4ad 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -2648,14 +2648,14 @@ dag_processor: dag_stability_check_level: description: | Controls the behavior of dag stability checker performed before DAG parsing in the dag-processor. - Static checks detect potential issues such as runtime-varying values in DAG/Task constructors - that could cause DAG version inflation. + The check detects detect potential issues such as runtime-varying values in Dag/Task constructors + that could cause Dag version inflation. * ``off``: Disables dag stability checks entirely. No errors or warnings are generated. - * ``warning``: DAGs load normally but warnings are displayed in the UI when issues are detected. - * ``error``: Treats dag stability failures as DAG import errors, preventing the DAG from loading. + * ``warning``: Dags load normally but warnings are displayed in the UI when issues are detected. + * ``error``: Treats dag stability failures as DAG import errors, preventing the Dag from loading. - Default is "warning" to alert users of potential issues without blocking DAG execution. + Default is "warning" to alert users of potential issues without blocking Dag execution. version_added: 3.2.0 type: string example: ~ From 0affb6e669170f5c8e9c70bdea85bda53b7bc527 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Tue, 6 Jan 2026 15:57:08 +0900 Subject: [PATCH 09/12] fix logics --- .../src/airflow/config_templates/config.yml | 6 +- .../airflow/utils/dag_stability_checker.py | 145 ++++++++++-------- .../unit/dag_processing/test_processor.py | 6 +- ...c_check.py => test_dag_stability_check.py} | 4 +- .../unit/utils/test_dag_stability_checker.py | 29 ++-- 5 files changed, 106 insertions(+), 84 deletions(-) rename airflow-core/tests/unit/dags/{test_dag_static_check.py => test_dag_stability_check.py} (95%) diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 09dedccaeb4ad..64a861fb9b10e 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -2647,13 +2647,13 @@ dag_processor: default: "True" dag_stability_check_level: description: | - Controls the behavior of dag stability checker performed before DAG parsing in the dag-processor. + Controls the behavior of Dag stability checker performed before Dag parsing in the Dag processor. The check detects detect potential issues such as runtime-varying values in Dag/Task constructors that could cause Dag version inflation. - * ``off``: Disables dag stability checks entirely. No errors or warnings are generated. + * ``off``: Disables Dag stability checks entirely. No errors or warnings are generated. * ``warning``: Dags load normally but warnings are displayed in the UI when issues are detected. - * ``error``: Treats dag stability failures as DAG import errors, preventing the Dag from loading. + * ``error``: Treats Dag stability failures as Dag import errors, preventing the Dag from loading. Default is "warning" to alert users of potential issues without blocking Dag execution. version_added: 3.2.0 diff --git a/airflow-core/src/airflow/utils/dag_stability_checker.py b/airflow-core/src/airflow/utils/dag_stability_checker.py index 2046ddfecf89a..dba25510b3910 100644 --- a/airflow-core/src/airflow/utils/dag_stability_checker.py +++ b/airflow-core/src/airflow/utils/dag_stability_checker.py @@ -21,17 +21,44 @@ from enum import Enum from pathlib import Path +RUNTIME_VARYING_CALLS = [ + ("datetime", "now"), + ("datetime", "today"), + ("datetime", "utcnow"), + ("date", "today"), + ("time", "time"), + ("time", "localtime"), + ("random", "random"), + ("random", "randint"), + ("random", "choice"), + ("random", "uniform"), + ("uuid", "uuid4"), + ("uuid", "uuid1"), + ("pendulum", "now"), + ("pendulum", "today"), + ("pendulum", "yesterday"), + ("pendulum", "tomorrow"), +] + + +class DagStabilityCheckLevel(Enum): + """enum class for Dag stability check level.""" + + off = "off" + warning = "warning" + error = "error" + class DagStabilityCheckerResult: """ - Represents the result of stability analysis on a DAG file. + Represents the result of stability analysis on a Dag file. Stores detected warnings and formats them appropriately based on the configured check level (warning or error). """ - def __init__(self, check_level): - self.check_level: str = check_level + def __init__(self, check_level: DagStabilityCheckLevel): + self.check_level: DagStabilityCheckLevel = check_level self.warnings: list[RuntimeVaryingValueWarning] = [] self.runtime_varying_values: dict = {} @@ -41,31 +68,42 @@ def format_warnings(self) -> str | None: return None lines = [ - "⚠️ This Dag uses runtime-variable values in Dag construction.", - "⚠️ It causes the Dag version to increase as values change on every Dag parse.", + "This Dag uses runtime-variable values in Dag construction.", + "It causes the Dag version to increase as values change on every Dag parse.", "", ] for w in self.warnings: - lines.append(f"Line {w.line}, Col {w.col}") - lines.append(f"Code: {w.code}") - lines.append(f"Issue: {w.message}") - lines.append("") + lines.extend( + [ + f"Line {w.line}, Col {w.col}", + f"Code: {w.code}", + f"Issue: {w.message}", + "", + ] + ) if self.runtime_varying_values: - lines.append("️⚠️ Don't use the variables as arguments in DAG/Task constructors:") - # Sort by line number - sorted_vars = sorted(self.runtime_varying_values.items(), key=lambda x: x[1][0]) - for var_name, (line, source) in sorted_vars: - lines.append(f" Line {line}: '{var_name}' related '{source}'") - lines.append("") + lines.extend( + [ + "️Don't use the variables as arguments in Dag/Task constructors:", + *( + f" Line {line}: '{var_name}' related '{source}'" + for var_name, (line, source) in sorted( + self.runtime_varying_values.items(), + key=lambda x: x[1][0], + ) + ), + "", + ] + ) return "\n".join(lines) - def get_warning_dag_format_dict(self, dag_ids): + def get_warning_dag_format_dict(self, dag_ids: list[str]) -> list[dict[str, str]]: """Convert warning statement to Dag warning format.""" from airflow.models.dagwarning import DagWarningType - if not self.warnings or self.check_level != "warning": + if not self.warnings or self.check_level != DagStabilityCheckLevel.warning: return [] return [ { @@ -77,7 +115,7 @@ def get_warning_dag_format_dict(self, dag_ids): ] def get_error_format_dict(self, file_path, bundle_path): - if not self.warnings or self.check_level != "error": + if not self.warnings or self.check_level != DagStabilityCheckLevel.error: return None relative_file_path = str(Path(file_path).relative_to(bundle_path)) if bundle_path else file_path @@ -101,26 +139,6 @@ class WarningContext(str, Enum): DAG_CONSTRUCTOR = "Dag constructor" -RUNTIME_VARYING_CALLS = [ - ("datetime", "now"), - ("datetime", "today"), - ("datetime", "utcnow"), - ("date", "today"), - ("time", "time"), - ("time", "localtime"), - ("random", "random"), - ("random", "randint"), - ("random", "choice"), - ("random", "uniform"), - ("uuid", "uuid4"), - ("uuid", "uuid1"), - ("pendulum", "now"), - ("pendulum", "today"), - ("pendulum", "yesterday"), - ("pendulum", "tomorrow"), -] - - class RuntimeVaryingValueAnalyzer: """ Analyzer dedicated to tracking and detecting runtime-varying values. @@ -129,7 +147,12 @@ class RuntimeVaryingValueAnalyzer: contains values that change on every execution (datetime.now(), random(), etc.). """ - def __init__(self, varying_vars: dict, imports: dict, from_imports: dict): + def __init__( + self, + varying_vars: dict[str, tuple[str, str]], + imports: dict[str, str], + from_imports: dict[str, tuple[str, str]], + ): self.varying_vars = varying_vars self.imports = imports self.from_imports = from_imports @@ -145,7 +168,7 @@ def get_varying_source(self, node: ast.expr) -> str | None: - Runtime-varying values in expressions/collections """ # 1. Direct runtime-varying call - if isinstance(node, ast.Call) and self._is_runtime_varying_call(node): + if isinstance(node, ast.Call) and self.is_runtime_varying_call(node): return ast.unparse(node) # 2. Runtime-varying variable reference @@ -155,7 +178,7 @@ def get_varying_source(self, node: ast.expr) -> str | None: # 3. f-string if isinstance(node, ast.JoinedStr): - return self._check_fstring_varying(node) + return self.get_varying_fstring(node) # 4. Binary operation if isinstance(node, ast.BinOp): @@ -163,7 +186,7 @@ def get_varying_source(self, node: ast.expr) -> str | None: # 5. Collections (list/tuple/set) if isinstance(node, (ast.List, ast.Tuple, ast.Set)): - return self._check_collection_varying(node.elts) + return self.get_varying_collection(node.elts) # 6. List comprehension if isinstance(node, ast.ListComp): @@ -171,7 +194,7 @@ def get_varying_source(self, node: ast.expr) -> str | None: # 7. Dictionary if isinstance(node, ast.Dict): - return self._check_dict_varying(node) + return self.get_varying_dict(node) # 8. Method call chain if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute): @@ -179,16 +202,15 @@ def get_varying_source(self, node: ast.expr) -> str | None: return None - def _check_fstring_varying(self, node: ast.JoinedStr) -> str | None: + def get_varying_fstring(self, node: ast.JoinedStr) -> str | None: """Check for runtime-varying values inside f-strings.""" for value in node.values: if isinstance(value, ast.FormattedValue): - source = self.get_varying_source(value.value) - if source: + if source := self.get_varying_source(value.value): return source return None - def _check_collection_varying(self, elements: list) -> str | None: + def get_varying_collection(self, elements: list) -> str | None: """Check for runtime-varying values in collection elements.""" for elt in elements: source = self.get_varying_source(elt) @@ -196,7 +218,7 @@ def _check_collection_varying(self, elements: list) -> str | None: return source return None - def _check_dict_varying(self, node: ast.Dict) -> str | None: + def get_varying_dict(self, node: ast.Dict) -> str | None: """Check for runtime-varying values in dictionary keys/values.""" for key, value in zip(node.keys, node.values): if key: @@ -209,7 +231,7 @@ def _check_dict_varying(self, node: ast.Dict) -> str | None: return source return None - def _is_runtime_varying_call(self, node: ast.Call) -> bool: + def is_runtime_varying_call(self, node: ast.Call) -> bool: """ Check if a call is runtime-varying. @@ -291,7 +313,7 @@ def __init__(self, from_imports: dict): self.is_in_dag_context: bool = False def is_dag_constructor(self, node: ast.Call) -> bool: - """Check if a call is a DAG constructor.""" + """Check if a call is a Dag constructor.""" if not isinstance(node.func, ast.Name): return False @@ -310,14 +332,14 @@ def is_task_constructor(self, node: ast.Call) -> bool: Check if a call is a Task constructor. Criteria: - 1. All calls within a DAG with block - 2. Calls that receive a DAG instance as an argument (dag=...) + 1. All calls within a Dag with block + 2. Calls that receive a Dag instance as an argument (dag=...) """ - # Inside DAG with block + # Inside Dag with block if self.is_in_dag_context: return True - # Passing DAG instance as argument + # Passing Dag instance as argument for arg in node.args: if isinstance(arg, ast.Name) and arg.id in self.dag_instances: return True @@ -330,21 +352,21 @@ def is_task_constructor(self, node: ast.Call) -> bool: return False def register_dag_instance(self, var_name: str): - """Register a DAG instance variable name.""" + """Register a Dag instance variable name.""" self.dag_instances.add(var_name) def enter_dag_context(self): - """Enter a DAG with block.""" + """Enter a Dag with block.""" self.is_in_dag_context = True def exit_dag_context(self): - """Exit a DAG with block.""" + """Exit a Dag with block.""" self.is_in_dag_context = False class AirflowRuntimeVaryingValueChecker(ast.NodeVisitor): """ - Main visitor class to detect runtime-varying value usage in Airflow DAG/Task. + Main visitor class to detect runtime-varying value usage in Airflow Dag/Task. Main responsibilities: - Traverse AST and visit nodes @@ -352,7 +374,7 @@ class AirflowRuntimeVaryingValueChecker(ast.NodeVisitor): - Track runtime-varying values and generate warnings """ - def __init__(self, check_level="warn"): + def __init__(self, check_level: DagStabilityCheckLevel = DagStabilityCheckLevel.warning): self.static_check_result: DagStabilityCheckerResult = DagStabilityCheckerResult( check_level=check_level ) @@ -495,8 +517,9 @@ def _get_warning_message(self, context: WarningContext): def check_dag_file_stability(file_path) -> DagStabilityCheckerResult: from airflow.configuration import conf - check_level = conf.get("dag_processor", "dag_stability_check_level").lower() - if check_level == "off" or check_level not in ("warning", "error"): + check_level = conf.getenum("dag_processor", "dag_stability_check_level", DagStabilityCheckLevel) + + if check_level == DagStabilityCheckLevel.off: return DagStabilityCheckerResult(check_level=check_level) try: diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py b/airflow-core/tests/unit/dag_processing/test_processor.py index bf65c03238a1b..6370955078d79 100644 --- a/airflow-core/tests/unit/dag_processing/test_processor.py +++ b/airflow-core/tests/unit/dag_processing/test_processor.py @@ -613,7 +613,7 @@ def fake_collect_dags(self, *args, **kwargs): def test_parse_file_static_check_with_error(): result = _parse_file( DagFileParseRequest( - file=f"{TEST_DAG_FOLDER}/test_dag_static_check.py", + file=f"{TEST_DAG_FOLDER}/test_dag_stability_check.py", bundle_path=TEST_DAG_FOLDER, bundle_name="testing", ), @@ -621,7 +621,7 @@ def test_parse_file_static_check_with_error(): ) assert result.serialized_dags == [] - assert list(result.import_errors.keys()) == ["test_dag_static_check.py"] + assert list(result.import_errors.keys()) == ["test_dag_stability_check.py"] assert result.warnings is None assert "Don't use the variables as arguments" in next(iter(result.import_errors.values())) @@ -629,7 +629,7 @@ def test_parse_file_static_check_with_error(): def test_parse_file_static_check_with_default_warning(): result = _parse_file( DagFileParseRequest( - file=f"{TEST_DAG_FOLDER}/test_dag_static_check.py", + file=f"{TEST_DAG_FOLDER}/test_dag_stability_check.py", bundle_path=TEST_DAG_FOLDER, bundle_name="testing", ), diff --git a/airflow-core/tests/unit/dags/test_dag_static_check.py b/airflow-core/tests/unit/dags/test_dag_stability_check.py similarity index 95% rename from airflow-core/tests/unit/dags/test_dag_static_check.py rename to airflow-core/tests/unit/dags/test_dag_stability_check.py index 89578c479c32f..96715b50308e0 100644 --- a/airflow-core/tests/unit/dags/test_dag_static_check.py +++ b/airflow-core/tests/unit/dags/test_dag_stability_check.py @@ -66,7 +66,7 @@ def calculate_sum(): task1 = BashOperator( task_id=f"{str(random_task_id)}", - bash_command=f'echo "Hello from DAG {i}!"', + bash_command=f'echo "Hello from Dag {i}!"', dag=dag, ) @@ -90,7 +90,7 @@ def calculate_sum(): task5 = BashOperator( task_id="final_task", - bash_command=f'echo "DAG {i} - All tasks completed!"', + bash_command=f'echo "Dag {i} - All tasks completed!"', dag=dag, ) diff --git a/airflow-core/tests/unit/utils/test_dag_stability_checker.py b/airflow-core/tests/unit/utils/test_dag_stability_checker.py index 6c066dfa74efd..d5b8e37785d1b 100644 --- a/airflow-core/tests/unit/utils/test_dag_stability_checker.py +++ b/airflow-core/tests/unit/utils/test_dag_stability_checker.py @@ -143,7 +143,7 @@ def test_is_runtime_varying_call__true_when_function_itself_varies(self): call_node = ast.parse(code, mode="eval").body self.imports["datetime"] = "datetime" - result = self.analyzer._is_runtime_varying_call(call_node) + result = self.analyzer.is_runtime_varying_call(call_node) assert result is True @@ -157,7 +157,7 @@ def test_is_runtime_varying_call__true_when_argument_varies(self): call_node = ast.parse(code, mode="eval").body self.imports["datetime"] = "datetime" - result = self.analyzer._is_runtime_varying_call(call_node) + result = self.analyzer.is_runtime_varying_call(call_node) assert result is True @@ -166,7 +166,7 @@ def test_is_runtime_varying_call__false_when_completely_static(self): code = 'print("hello")' call_node = ast.parse(code, mode="eval").body - result = self.analyzer._is_runtime_varying_call(call_node) + result = self.analyzer.is_runtime_varying_call(call_node) assert result is False @@ -305,7 +305,7 @@ def test_is_dag_constructor__detects_dag_generated_by_decorator(self): assert result is True def test_is_dag_constructor__ignores_non_dag_functions(self): - """Regular function calls should not be detected as DAG constructors.""" + """Regular function calls should not be detected as Dag constructors.""" code = "my_function()" call_node = ast.parse(code, mode="eval").body @@ -315,7 +315,7 @@ def test_is_dag_constructor__ignores_non_dag_functions(self): def test_is_task_constructor__true_when_inside_dag_context(self): """ - Any function call inside a DAG with-block is considered a task. + Any function call inside a Dag with-block is considered a task. Example: with DAG() as dag: @@ -330,7 +330,7 @@ def test_is_task_constructor__true_when_inside_dag_context(self): assert result is True def test_is_task_constructor__false_when_outside_dag_context(self): - """Same call outside DAG context is NOT automatically a task.""" + """Same call outside Dag context is NOT automatically a task.""" code = "PythonOperator(task_id='my_task')" call_node = ast.parse(code, mode="eval").body @@ -339,7 +339,7 @@ def test_is_task_constructor__false_when_outside_dag_context(self): def test_is_task_constructor__true_when_dag_passed_as_argument(self): """ - Detect task when dag= parameter references a DAG instance. + Detect task when dag= parameter references a Dag instance. Example: my_dag = DAG(dag_id='dag); task = PythonOperator(dag=my_dag) """ @@ -352,7 +352,7 @@ def test_is_task_constructor__true_when_dag_passed_as_argument(self): def test_is_task_constructor__true_when_dag_in_positional_args(self): """ - Detect task even when DAG is passed as positional argument. + Detect task even when Dag is passed as positional argument. Example: my_dag = DAG(dag_id='dag); task = PythonOperator('task_id', my_dag) """ @@ -364,7 +364,7 @@ def test_is_task_constructor__true_when_dag_in_positional_args(self): assert result is True def test_enter_and_exit_dag_context(self): - """Properly track entering and exiting DAG with-blocks.""" + """Properly track entering and exiting Dag with-blocks.""" assert self.detector.is_in_dag_context is False self.detector.enter_dag_context() @@ -374,7 +374,7 @@ def test_enter_and_exit_dag_context(self): assert self.detector.is_in_dag_context is False def test_register_dag_instance(self): - """Remember variable names that hold DAG instances.""" + """Remember variable names that hold Dag instances.""" assert "my_dag" not in self.detector.dag_instances self.detector.register_dag_instance("my_dag") @@ -456,7 +456,7 @@ def test_visit_assign__tracks_varying_variable(self): assert "datetime.now()" in source def test_visit_assign__warns_on_dag_with_varying_value(self): - """Warn when DAG constructor uses runtime-varying values.""" + """Warn when Dag constructor uses runtime-varying values.""" code = """ from airflow import DAG from datetime import datetime @@ -470,7 +470,7 @@ def test_visit_assign__warns_on_dag_with_varying_value(self): assert any("Dag constructor" in w.message for w in self.checker.static_check_result.warnings) def test_visit_call__detects_task_in_dag_context(self): - """Detect task creation inside DAG with block.""" + """Detect task creation inside Dag with block.""" code = """ from airflow import DAG from airflow.operators.python import PythonOperator @@ -555,12 +555,12 @@ def _check_code(self, code: str) -> list[RuntimeVaryingValueWarning]: return checker.static_check_result.warnings def test_antipattern__dynamic_dag_id_with_timestamp(self): - """ANTI-PATTERN: Using timestamps in DAG IDs.""" + """ANTI-PATTERN: Using timestamps in Dag IDs.""" code = """ from airflow import DAG from datetime import datetime -# BAD: DAG ID includes current timestamp +# BAD: Dag ID includes current timestamp dag = DAG(dag_id=f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}") """ warnings = self._check_code(code) @@ -618,7 +618,6 @@ def test_correct_pattern__static_dag_with_runtime_context(self): import time -# time 모듈 동적 함수들 current_timestamp = time.time() local_time = time.localtime() From 279d9ca02516d15d11f14f71a2a70098e52593e3 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Tue, 6 Jan 2026 17:05:34 +0900 Subject: [PATCH 10/12] fix logics --- airflow-core/src/airflow/utils/dag_stability_checker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/utils/dag_stability_checker.py b/airflow-core/src/airflow/utils/dag_stability_checker.py index dba25510b3910..9484418390f02 100644 --- a/airflow-core/src/airflow/utils/dag_stability_checker.py +++ b/airflow-core/src/airflow/utils/dag_stability_checker.py @@ -99,7 +99,7 @@ def format_warnings(self) -> str | None: return "\n".join(lines) - def get_warning_dag_format_dict(self, dag_ids: list[str]) -> list[dict[str, str]]: + def get_warning_dag_format_dict(self, dag_ids: list[str]) -> list[dict[str, str | None]]: """Convert warning statement to Dag warning format.""" from airflow.models.dagwarning import DagWarningType @@ -149,7 +149,7 @@ class RuntimeVaryingValueAnalyzer: def __init__( self, - varying_vars: dict[str, tuple[str, str]], + varying_vars: dict[str, tuple[int, str]], imports: dict[str, str], from_imports: dict[str, tuple[str, str]], ): From d365db9df4961151d6cf609daaee083231b4d1f2 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Fri, 9 Jan 2026 10:04:57 +0900 Subject: [PATCH 11/12] fix logics --- .../src/airflow/config_templates/config.yml | 2 +- .../src/airflow/dag_processing/processor.py | 4 +- ...er.py => dag_version_inflation_checker.py} | 104 +++++++++--------- .../unit/dag_processing/test_processor.py | 8 +- ...py => test_dag_version_inflation_check.py} | 0 ... => test_dag_version_inflation_checker.py} | 18 +-- 6 files changed, 68 insertions(+), 68 deletions(-) rename airflow-core/src/airflow/utils/{dag_stability_checker.py => dag_version_inflation_checker.py} (84%) rename airflow-core/tests/unit/dags/{test_dag_stability_check.py => test_dag_version_inflation_check.py} (100%) rename airflow-core/tests/unit/utils/{test_dag_stability_checker.py => test_dag_version_inflation_checker.py} (97%) diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 64a861fb9b10e..c3c5863a11e3d 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -2645,7 +2645,7 @@ dag_processor: type: boolean example: ~ default: "True" - dag_stability_check_level: + dag_version_inflation_check_level: description: | Controls the behavior of Dag stability checker performed before Dag parsing in the Dag processor. The check detects detect potential issues such as runtime-varying values in Dag/Task constructors diff --git a/airflow-core/src/airflow/dag_processing/processor.py b/airflow-core/src/airflow/dag_processing/processor.py index 03738ef70ef3f..628e94df61500 100644 --- a/airflow-core/src/airflow/dag_processing/processor.py +++ b/airflow-core/src/airflow/dag_processing/processor.py @@ -68,7 +68,7 @@ from airflow.sdk.execution_time.supervisor import WatchedSubprocess from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance, _send_error_email_notification from airflow.serialization.serialized_objects import DagSerialization, LazyDeserializedDAG -from airflow.utils.dag_stability_checker import check_dag_file_stability +from airflow.utils.dag_version_inflation_checker import check_dag_file_stability from airflow.utils.file import iter_airflow_imports from airflow.utils.state import TaskInstanceState @@ -235,7 +235,7 @@ def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileP fileloc=msg.file, serialized_dags=serialized_dags, import_errors=bag.import_errors, - warnings=stability_check_result.get_warning_dag_format_dict(bag.dag_ids), + warnings=stability_check_result.get_formatted_warnings(bag.dag_ids), ) return result diff --git a/airflow-core/src/airflow/utils/dag_stability_checker.py b/airflow-core/src/airflow/utils/dag_version_inflation_checker.py similarity index 84% rename from airflow-core/src/airflow/utils/dag_stability_checker.py rename to airflow-core/src/airflow/utils/dag_version_inflation_checker.py index 9484418390f02..9dec604adf0fa 100644 --- a/airflow-core/src/airflow/utils/dag_stability_checker.py +++ b/airflow-core/src/airflow/utils/dag_version_inflation_checker.py @@ -41,15 +41,15 @@ ] -class DagStabilityCheckLevel(Enum): - """enum class for Dag stability check level.""" +class DagVersionInflationCheckLevel(Enum): + """enum class for Dag version inflation check level.""" off = "off" warning = "warning" error = "error" -class DagStabilityCheckerResult: +class DagVersionInflationCheckResult: """ Represents the result of stability analysis on a Dag file. @@ -57,8 +57,8 @@ class DagStabilityCheckerResult: (warning or error). """ - def __init__(self, check_level: DagStabilityCheckLevel): - self.check_level: DagStabilityCheckLevel = check_level + def __init__(self, check_level: DagVersionInflationCheckLevel): + self.check_level: DagVersionInflationCheckLevel = check_level self.warnings: list[RuntimeVaryingValueWarning] = [] self.runtime_varying_values: dict = {} @@ -99,11 +99,11 @@ def format_warnings(self) -> str | None: return "\n".join(lines) - def get_warning_dag_format_dict(self, dag_ids: list[str]) -> list[dict[str, str | None]]: + def get_formatted_warnings(self, dag_ids: list[str]) -> list[dict[str, str | None]]: """Convert warning statement to Dag warning format.""" from airflow.models.dagwarning import DagWarningType - if not self.warnings or self.check_level != DagStabilityCheckLevel.warning: + if not self.warnings or self.check_level != DagVersionInflationCheckLevel.warning: return [] return [ { @@ -115,7 +115,7 @@ def get_warning_dag_format_dict(self, dag_ids: list[str]) -> list[dict[str, str ] def get_error_format_dict(self, file_path, bundle_path): - if not self.warnings or self.check_level != DagStabilityCheckLevel.error: + if not self.warnings or self.check_level != DagVersionInflationCheckLevel.error: return None relative_file_path = str(Path(file_path).relative_to(bundle_path)) if bundle_path else file_path @@ -167,39 +167,40 @@ def get_varying_source(self, node: ast.expr) -> str | None: - Runtime-varying values in f-strings - Runtime-varying values in expressions/collections """ - # 1. Direct runtime-varying call - if isinstance(node, ast.Call) and self.is_runtime_varying_call(node): - return ast.unparse(node) + if isinstance(node, ast.Call): + # 1. Direct runtime-varying call + if self.is_runtime_varying_call(node): + return ast.unparse(node) - # 2. Runtime-varying variable reference + # 2. Method call chain + if isinstance(node.func, ast.Attribute): + return self.get_varying_source(node.func.value) + + # 3. Runtime-varying variable reference if isinstance(node, ast.Name) and node.id in self.varying_vars: _, source = self.varying_vars[node.id] return source - # 3. f-string + # 4. f-string if isinstance(node, ast.JoinedStr): return self.get_varying_fstring(node) - # 4. Binary operation + # 5. Binary operation if isinstance(node, ast.BinOp): return self.get_varying_source(node.left) or self.get_varying_source(node.right) - # 5. Collections (list/tuple/set) + # 6. Collections (list/tuple/set) if isinstance(node, (ast.List, ast.Tuple, ast.Set)): return self.get_varying_collection(node.elts) - # 6. List comprehension + # 7. List comprehension if isinstance(node, ast.ListComp): return self.get_varying_source(node.elt) - # 7. Dictionary + # 8. Dictionary if isinstance(node, ast.Dict): return self.get_varying_dict(node) - # 8. Method call chain - if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute): - return self.get_varying_source(node.func.value) - return None def get_varying_fstring(self, node: ast.JoinedStr) -> str | None: @@ -213,8 +214,7 @@ def get_varying_fstring(self, node: ast.JoinedStr) -> str | None: def get_varying_collection(self, elements: list) -> str | None: """Check for runtime-varying values in collection elements.""" for elt in elements: - source = self.get_varying_source(elt) - if source: + if source := self.get_varying_source(elt): return source return None @@ -222,12 +222,10 @@ def get_varying_dict(self, node: ast.Dict) -> str | None: """Check for runtime-varying values in dictionary keys/values.""" for key, value in zip(node.keys, node.values): if key: - source = self.get_varying_source(key) - if source: + if source := self.get_varying_source(key): return source if value: - source = self.get_varying_source(value) - if source: + if source := self.get_varying_source(value): return source return None @@ -239,17 +237,16 @@ def is_runtime_varying_call(self, node: ast.Call) -> bool: 2. Do the arguments contain runtime-varying values? """ # Check if the function itself is runtime-varying - if isinstance(node.func, ast.Attribute): - if self._is_runtime_varying_attribute_call(node.func): - return True - elif isinstance(node.func, ast.Name): - if self._is_runtime_varying_name_call(node.func): - return True + if isinstance(node.func, ast.Attribute) and self.is_runtime_varying_attribute_call(node.func): + return True + + if isinstance(node.func, ast.Name) and self.is_runtime_varying_name_call(node.func): + return True # Check if arguments contain runtime-varying values - return self._has_varying_arguments(node) + return self.has_varying_arguments(node) - def _has_varying_arguments(self, node: ast.Call) -> bool: + def has_varying_arguments(self, node: ast.Call) -> bool: """Check if function arguments contain runtime-varying values.""" for arg in node.args: if self.get_varying_source(arg): @@ -261,7 +258,7 @@ def _has_varying_arguments(self, node: ast.Call) -> bool: return False - def _is_runtime_varying_attribute_call(self, attr: ast.Attribute) -> bool: + def is_runtime_varying_attribute_call(self, attr: ast.Attribute) -> bool: """Check for runtime-varying calls like datetime.now().""" method_name = attr.attr @@ -284,7 +281,7 @@ def _is_runtime_varying_attribute_call(self, attr: ast.Attribute) -> bool: return False - def _is_runtime_varying_name_call(self, func: ast.Name) -> bool: + def is_runtime_varying_name_call(self, func: ast.Name) -> bool: """Check for runtime-varying calls like now() (when imported via 'from import').""" func_name = func.id @@ -307,8 +304,8 @@ class DagTaskDetector: in Airflow. It needs to handle both traditional class instantiation and decorator styles. """ - def __init__(self, from_imports: dict): - self.from_imports = from_imports + def __init__(self, from_imports: dict[str, tuple[str, str]]): + self.from_imports: dict[str, tuple[str, str]] = from_imports self.dag_instances: set[str] = set() self.is_in_dag_context: bool = False @@ -374,8 +371,8 @@ class AirflowRuntimeVaryingValueChecker(ast.NodeVisitor): - Track runtime-varying values and generate warnings """ - def __init__(self, check_level: DagStabilityCheckLevel = DagStabilityCheckLevel.warning): - self.static_check_result: DagStabilityCheckerResult = DagStabilityCheckerResult( + def __init__(self, check_level: DagVersionInflationCheckLevel = DagVersionInflationCheckLevel.warning): + self.static_check_result: DagVersionInflationCheckResult = DagVersionInflationCheckResult( check_level=check_level ) self.imports: dict[str, str] = {} @@ -444,8 +441,7 @@ def visit_For(self, node: ast.For): """ # check the iterator value is runtime-varying # iter is runtime-varying : for iter in [datetime.now(), 3] - varying_source = self.value_analyzer.get_varying_source(node.iter) - if varying_source: + if varying_source := self.value_analyzer.get_varying_source(node.iter): if isinstance(node.target, ast.Name): self.varying_vars[node.target.id] = (node.lineno, varying_source) @@ -488,8 +484,7 @@ def _register_dag_instances(self, targets: list): def _track_varying_assignment(self, node: ast.Assign): """Track variable assignments with runtime-varying values.""" - varying_source = self.value_analyzer.get_varying_source(node.value) - if varying_source: + if varying_source := self.value_analyzer.get_varying_source(node.value): for target in node.targets: if isinstance(target, ast.Name): self.varying_vars[target.id] = (node.lineno, varying_source) @@ -507,25 +502,30 @@ def _check_and_warn(self, call: ast.Call, context: WarningContext): ) ) - def _get_warning_message(self, context: WarningContext): + def _get_warning_message(self, context: WarningContext) -> str: """Get appropriate warning message based on context.""" if self.dag_detector.is_in_dag_context and context == WarningContext.TASK_CONSTRUCTOR: return "Don't use runtime-varying values as function arguments within with Dag block" return f"Don't use runtime-varying value as argument in {context.value}" -def check_dag_file_stability(file_path) -> DagStabilityCheckerResult: +def check_dag_file_stability(file_path) -> DagVersionInflationCheckResult: from airflow.configuration import conf - check_level = conf.getenum("dag_processor", "dag_stability_check_level", DagStabilityCheckLevel) + try: + check_level = DagVersionInflationCheckLevel( + conf.get("dag_processor", "dag_version_inflation_check_level") + ) + except ValueError: + check_level = DagVersionInflationCheckLevel.warning - if check_level == DagStabilityCheckLevel.off: - return DagStabilityCheckerResult(check_level=check_level) + if check_level == DagVersionInflationCheckLevel.off: + return DagVersionInflationCheckResult(check_level=check_level) try: parsed = ast.parse(Path(file_path).read_bytes()) - except Exception: - return DagStabilityCheckerResult(check_level=check_level) + except (SyntaxError, ValueError, TypeError, FileNotFoundError): + return DagVersionInflationCheckResult(check_level=check_level) checker = AirflowRuntimeVaryingValueChecker(check_level) checker.visit(parsed) diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py b/airflow-core/tests/unit/dag_processing/test_processor.py index 6370955078d79..76d13c99f958b 100644 --- a/airflow-core/tests/unit/dag_processing/test_processor.py +++ b/airflow-core/tests/unit/dag_processing/test_processor.py @@ -609,11 +609,11 @@ def fake_collect_dags(self, *args, **kwargs): assert called is True -@conf_vars({("dag_processor", "dag_stability_check_level"): "error"}) +@conf_vars({("dag_processor", "dag_version_inflation_check_level"): "error"}) def test_parse_file_static_check_with_error(): result = _parse_file( DagFileParseRequest( - file=f"{TEST_DAG_FOLDER}/test_dag_stability_check.py", + file=f"{TEST_DAG_FOLDER}/test_dag_version_inflation_check.py", bundle_path=TEST_DAG_FOLDER, bundle_name="testing", ), @@ -621,7 +621,7 @@ def test_parse_file_static_check_with_error(): ) assert result.serialized_dags == [] - assert list(result.import_errors.keys()) == ["test_dag_stability_check.py"] + assert list(result.import_errors.keys()) == ["test_dag_version_inflation_check.py"] assert result.warnings is None assert "Don't use the variables as arguments" in next(iter(result.import_errors.values())) @@ -629,7 +629,7 @@ def test_parse_file_static_check_with_error(): def test_parse_file_static_check_with_default_warning(): result = _parse_file( DagFileParseRequest( - file=f"{TEST_DAG_FOLDER}/test_dag_stability_check.py", + file=f"{TEST_DAG_FOLDER}/test_dag_version_inflation_check.py", bundle_path=TEST_DAG_FOLDER, bundle_name="testing", ), diff --git a/airflow-core/tests/unit/dags/test_dag_stability_check.py b/airflow-core/tests/unit/dags/test_dag_version_inflation_check.py similarity index 100% rename from airflow-core/tests/unit/dags/test_dag_stability_check.py rename to airflow-core/tests/unit/dags/test_dag_version_inflation_check.py diff --git a/airflow-core/tests/unit/utils/test_dag_stability_checker.py b/airflow-core/tests/unit/utils/test_dag_version_inflation_checker.py similarity index 97% rename from airflow-core/tests/unit/utils/test_dag_stability_checker.py rename to airflow-core/tests/unit/utils/test_dag_version_inflation_checker.py index d5b8e37785d1b..fdc2160328ffe 100644 --- a/airflow-core/tests/unit/utils/test_dag_stability_checker.py +++ b/airflow-core/tests/unit/utils/test_dag_version_inflation_checker.py @@ -18,7 +18,7 @@ import ast -from airflow.utils.dag_stability_checker import ( +from airflow.utils.dag_version_inflation_checker import ( AirflowRuntimeVaryingValueChecker, DagTaskDetector, RuntimeVaryingValueAnalyzer, @@ -43,7 +43,7 @@ def test_is_runtime_varying_attribute_call__detects_datetime_now(self): # The func is an Attribute node: datetime.now assert isinstance(call_node.func, ast.Attribute) - result = self.analyzer._is_runtime_varying_attribute_call(call_node.func) + result = self.analyzer.is_runtime_varying_attribute_call(call_node.func) assert result is True @@ -53,7 +53,7 @@ def test_is_runtime_varying_attribute_call__ignores_static_method(self): call_node = ast.parse(code, mode="eval").body assert isinstance(call_node.func, ast.Attribute) - result = self.analyzer._is_runtime_varying_attribute_call(call_node.func) + result = self.analyzer.is_runtime_varying_attribute_call(call_node.func) assert result is False @@ -68,7 +68,7 @@ def test_is_runtime_varying_attribute_call__handles_aliased_imports(self): self.imports["dt"] = "datetime" # dt is alias for datetime assert isinstance(call_node.func, ast.Attribute) - result = self.analyzer._is_runtime_varying_attribute_call(call_node.func) + result = self.analyzer.is_runtime_varying_attribute_call(call_node.func) assert result is True @@ -79,7 +79,7 @@ def test_is_runtime_varying_name_call__detects_uuid4(self): self.from_imports["uuid4"] = ("uuid", "uuid4") assert isinstance(call_node.func, ast.Name) - result = self.analyzer._is_runtime_varying_name_call(call_node.func) + result = self.analyzer.is_runtime_varying_name_call(call_node.func) assert result is True @@ -88,7 +88,7 @@ def test_is_runtime_varying_name_call__ignores_regular_function(self): call_node = ast.parse(code, mode="eval").body assert isinstance(call_node.func, ast.Name) - result = self.analyzer._is_runtime_varying_name_call(call_node.func) + result = self.analyzer.is_runtime_varying_name_call(call_node.func) assert result is False @@ -102,7 +102,7 @@ def test_has_varying_arguments__detects_varying_positional_arg(self): call_node = ast.parse(code, mode="eval").body self.imports["datetime"] = "datetime" - result = self.analyzer._has_varying_arguments(call_node) + result = self.analyzer.has_varying_arguments(call_node) assert result is True @@ -116,7 +116,7 @@ def test_has_varying_arguments__detects_varying_keyword_arg(self): call_node = ast.parse(code, mode="eval").body self.imports["random"] = "random" - result = self.analyzer._has_varying_arguments(call_node) + result = self.analyzer.has_varying_arguments(call_node) assert result is True @@ -129,7 +129,7 @@ def test_has_varying_arguments__returns_false_for_static_args(self): code = 'print("hello", 123)' call_node = ast.parse(code, mode="eval").body - result = self.analyzer._has_varying_arguments(call_node) + result = self.analyzer.has_varying_arguments(call_node) assert result is False From 539157d4f3b8c4b0b29def71192d44ada6a6b485 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Wed, 14 Jan 2026 19:14:12 +0900 Subject: [PATCH 12/12] fix logics --- .../utils/dag_version_inflation_checker.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/airflow-core/src/airflow/utils/dag_version_inflation_checker.py b/airflow-core/src/airflow/utils/dag_version_inflation_checker.py index 9dec604adf0fa..ca7f58b26026f 100644 --- a/airflow-core/src/airflow/utils/dag_version_inflation_checker.py +++ b/airflow-core/src/airflow/utils/dag_version_inflation_checker.py @@ -206,9 +206,8 @@ def get_varying_source(self, node: ast.expr) -> str | None: def get_varying_fstring(self, node: ast.JoinedStr) -> str | None: """Check for runtime-varying values inside f-strings.""" for value in node.values: - if isinstance(value, ast.FormattedValue): - if source := self.get_varying_source(value.value): - return source + if isinstance(value, ast.FormattedValue) and (source := self.get_varying_source(value.value)): + return source return None def get_varying_collection(self, elements: list) -> str | None: @@ -221,12 +220,10 @@ def get_varying_collection(self, elements: list) -> str | None: def get_varying_dict(self, node: ast.Dict) -> str | None: """Check for runtime-varying values in dictionary keys/values.""" for key, value in zip(node.keys, node.values): - if key: - if source := self.get_varying_source(key): - return source - if value: - if source := self.get_varying_source(value): - return source + if key and (source := self.get_varying_source(key)): + return source + if value and (source := self.get_varying_source(value)): + return source return None def is_runtime_varying_call(self, node: ast.Call) -> bool: @@ -491,8 +488,7 @@ def _track_varying_assignment(self, node: ast.Assign): def _check_and_warn(self, call: ast.Call, context: WarningContext): """Check function call arguments and generate warnings.""" - varying_source = self.value_analyzer.get_varying_source(call) - if varying_source: + if self.value_analyzer.get_varying_source(call): self.static_check_result.warnings.append( RuntimeVaryingValueWarning( line=call.lineno,