diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index cc33daf95805b..36e80151eece6 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -181,7 +181,7 @@ repos: entry: ruff --fix --no-update-check --force-exclude additional_dependencies: ['ruff==0.0.226'] files: \.pyi?$ - exclude: ^.*/.*_vendor/ + exclude: ^.*/.*_vendor/|^tests/dags/test_imports.py - repo: https://github.com/asottile/blacken-docs rev: 1.13.0 hooks: @@ -905,7 +905,7 @@ repos: language: python entry: ./scripts/ci/pre_commit/pre_commit_mypy.py --namespace-packages files: \.py$ - exclude: ^.*/.*_vendor/|^airflow/migrations|^airflow/providers|^dev|^docs|^provider_packages|^tests/providers|^tests/system/providers + exclude: ^.*/.*_vendor/|^airflow/migrations|^airflow/providers|^dev|^docs|^provider_packages|^tests/providers|^tests/system/providers|^tests/dags/test_imports.py require_serial: true additional_dependencies: ['rich>=12.4.4', 'inputimeout', 'pyyaml'] - id: mypy-providers diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 29e02df9b3b6b..9d8e0ffdbb6dc 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2344,6 +2344,16 @@ scheduler: version_added: 2.0.0 type: boolean default: "True" + parsing_pre_import_modules: + description: | + The scheduler reads dag files to extract the airflow modules that are going to be used, + and imports them ahead of time to avoid having to re-do it for each parsing process. + This flag can be set to False to disable this behavior in case an airflow module needs to be freshly + imported each time (at the cost of increased DAG parsing time). + version_added: 2.6.0 + type: boolean + example: ~ + default: "True" parsing_processes: description: | The scheduler can run multiple processes in parallel to parse dags. @@ -2363,7 +2373,6 @@ scheduler: same host. This is useful when running with Scheduler in HA mode where each scheduler can parse different DAG files. * ``alphabetical``: Sort by filename - version_added: 2.1.0 type: string example: ~ diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 257536cc038fd..44b1401f47df4 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -1190,6 +1190,12 @@ max_dagruns_per_loop_to_schedule = 20 # dags in some circumstances schedule_after_task_execution = True +# The scheduler reads dag files to extract the airflow modules that are going to be used, +# and imports them ahead of time to avoid having to re-do it for each parsing process. +# This flag can be set to False to disable this behavior in case an airflow module needs to be freshly +# imported each time (at the cost of increased DAG parsing time). +parsing_pre_import_modules = True + # The scheduler can run multiple processes in parallel to parse dags. # This defines how many processes will run. parsing_processes = 2 diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index f1ba08e496aa0..442431afb5026 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import importlib import logging import multiprocessing import os @@ -50,6 +51,7 @@ from airflow.stats import Stats from airflow.utils import timezone from airflow.utils.email import get_email_address_list, send_email +from airflow.utils.file import iter_airflow_imports from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter, set_context from airflow.utils.mixins import MultiprocessingStartMethodMixin from airflow.utils.session import NEW_SESSION, provide_session @@ -187,6 +189,23 @@ def _handle_dag_file_processing(): def start(self) -> None: """Launch the process and start processing the DAG.""" + if conf.getboolean("scheduler", "parsing_pre_import_modules", fallback=True): + # Read the file to pre-import airflow modules used. + # This prevents them from being re-imported from zero in each "processing" process + # and saves CPU time and memory. + for module in iter_airflow_imports(self.file_path): + try: + importlib.import_module(module) + except Exception as e: + # only log as warning because an error here is not preventing anything from working, and + # if it's serious, it's going to be surfaced to the user when the dag is actually parsed. + self.log.warning( + "Error when trying to pre-import module '%s' found in %s: %s", + module, + self.file_path, + e, + ) + context = self._get_multiprocessing_context() _parent_channel, _child_channel = context.Pipe(duplex=False) diff --git a/airflow/utils/file.py b/airflow/utils/file.py index b3b1e8f3d187f..81089e06d4c26 100644 --- a/airflow/utils/file.py +++ b/airflow/utils/file.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +import ast import io import logging import os @@ -371,3 +372,23 @@ def might_contain_dag_via_default_heuristic(file_path: str, zip_file: zipfile.Zi content = dag_file.read() content = content.lower() return all(s in content for s in (b"dag", b"airflow")) + + +def _find_imported_modules(module: ast.Module) -> Generator[str, None, None]: + for st in module.body: + if isinstance(st, ast.Import): + for n in st.names: + yield n.name + elif isinstance(st, ast.ImportFrom) and st.module is not None: + yield st.module + + +def iter_airflow_imports(file_path: str) -> Generator[str, None, None]: + """Find Airflow modules imported in the given file.""" + try: + parsed = ast.parse(Path(file_path).read_bytes()) + except (OSError, SyntaxError, UnicodeDecodeError): + return + for m in _find_imported_modules(parsed): + if m.startswith("airflow."): + yield m diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py index 4758b140aa522..9a83b9feb2090 100644 --- a/tests/dag_processing/test_job_runner.py +++ b/tests/dag_processing/test_job_runner.py @@ -211,7 +211,8 @@ def test_max_runs_when_no_files(self): parent_pipe.close() @pytest.mark.backend("mysql", "postgres") - def test_start_new_processes_with_same_filepath(self): + @mock.patch("airflow.dag_processing.processor.iter_airflow_imports") + def test_start_new_processes_with_same_filepath(self, _): """ Test that when a processor already exist with a filepath, a new processor won't be created with that filepath. The filepath will just be removed from the list. diff --git a/tests/dags/test_imports.py b/tests/dags/test_imports.py new file mode 100644 index 0000000000000..43be6fc08e4aa --- /dev/null +++ b/tests/dags/test_imports.py @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# fmt: off + +# this file contains sample code than only needs to pass the lexer +# it is "badly" formatted on purpose to test edge cases. + +from __future__ import annotations + +# multiline import +import \ + datetime, \ +enum,time +""" +import airflow.in_comment +""" +# from import +from airflow.utils import file +# multiline airflow import +import airflow.decorators, airflow.models\ +, airflow.sensors + +if prod: + import airflow.if_branch +else: + import airflow.else_branch + +def f(): + # local import + import airflow.local_import + +# fmt: on diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 4870a2aa132e9..5c78524d67867 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3014,6 +3014,7 @@ def test_list_py_file_paths(self): "test_ignore_this.py", "test_invalid_param.py", "test_nested_dag.py", + "test_imports.py", "__init__.py", } for root, _, files in os.walk(TEST_DAG_FOLDER): diff --git a/tests/utils/test_file.py b/tests/utils/test_file.py index e5e51f7b207d9..448ddf31c7a9c 100644 --- a/tests/utils/test_file.py +++ b/tests/utils/test_file.py @@ -188,3 +188,29 @@ def test_might_contain_dag(self): # With safe_mode is False, the user defined callable won't be invoked assert file_utils.might_contain_dag(file_path=file_path_with_dag, safe_mode=False) + + def test_get_modules(self): + file_path = os.path.join(TEST_DAGS_FOLDER, "test_imports.py") + + modules = list(file_utils.iter_airflow_imports(file_path)) + + assert len(modules) == 4 + assert "airflow.utils" in modules + assert "airflow.decorators" in modules + assert "airflow.models" in modules + assert "airflow.sensors" in modules + # this one is a local import, we don't want it. + assert "airflow.local_import" not in modules + # this one is in a comment, we don't want it + assert "airflow.in_comment" not in modules + # we don't want imports under conditions + assert "airflow.if_branch" not in modules + assert "airflow.else_branch" not in modules + + def test_get_modules_from_invalid_file(self): + file_path = os.path.join(TEST_DAGS_FOLDER, "README.md") # just getting a non-python file + + # should not error + modules = list(file_utils.iter_airflow_imports(file_path)) + + assert len(modules) == 0