Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ repos:
entry: ruff --fix --no-update-check --force-exclude
additional_dependencies: ['ruff==0.0.226']
files: \.pyi?$
exclude: ^.*/.*_vendor/
exclude: ^.*/.*_vendor/|^tests/dags/test_imports.py
- repo: https://github.com/asottile/blacken-docs
rev: 1.13.0
hooks:
Expand Down Expand Up @@ -905,7 +905,7 @@ repos:
language: python
entry: ./scripts/ci/pre_commit/pre_commit_mypy.py --namespace-packages
files: \.py$
exclude: ^.*/.*_vendor/|^airflow/migrations|^airflow/providers|^dev|^docs|^provider_packages|^tests/providers|^tests/system/providers
exclude: ^.*/.*_vendor/|^airflow/migrations|^airflow/providers|^dev|^docs|^provider_packages|^tests/providers|^tests/system/providers|^tests/dags/test_imports.py
require_serial: true
additional_dependencies: ['rich>=12.4.4', 'inputimeout', 'pyyaml']
- id: mypy-providers
Expand Down
11 changes: 10 additions & 1 deletion airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2344,6 +2344,16 @@ scheduler:
version_added: 2.0.0
type: boolean
default: "True"
parsing_pre_import_modules:
description: |
The scheduler reads dag files to extract the airflow modules that are going to be used,
and imports them ahead of time to avoid having to re-do it for each parsing process.
This flag can be set to False to disable this behavior in case an airflow module needs to be freshly
imported each time (at the cost of increased DAG parsing time).
version_added: 2.6.0
type: boolean
example: ~
default: "True"
parsing_processes:
description: |
The scheduler can run multiple processes in parallel to parse dags.
Expand All @@ -2363,7 +2373,6 @@ scheduler:
same host. This is useful when running with Scheduler in HA mode where each scheduler can
parse different DAG files.
* ``alphabetical``: Sort by filename

version_added: 2.1.0
type: string
example: ~
Expand Down
6 changes: 6 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,12 @@ max_dagruns_per_loop_to_schedule = 20
# dags in some circumstances
schedule_after_task_execution = True

# The scheduler reads dag files to extract the airflow modules that are going to be used,
# and imports them ahead of time to avoid having to re-do it for each parsing process.
# This flag can be set to False to disable this behavior in case an airflow module needs to be freshly
# imported each time (at the cost of increased DAG parsing time).
parsing_pre_import_modules = True

# The scheduler can run multiple processes in parallel to parse dags.
# This defines how many processes will run.
parsing_processes = 2
Expand Down
19 changes: 19 additions & 0 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

import importlib
import logging
import multiprocessing
import os
Expand Down Expand Up @@ -50,6 +51,7 @@
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.email import get_email_address_list, send_email
from airflow.utils.file import iter_airflow_imports
from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter, set_context
from airflow.utils.mixins import MultiprocessingStartMethodMixin
from airflow.utils.session import NEW_SESSION, provide_session
Expand Down Expand Up @@ -187,6 +189,23 @@ def _handle_dag_file_processing():

def start(self) -> None:
"""Launch the process and start processing the DAG."""
if conf.getboolean("scheduler", "parsing_pre_import_modules", fallback=True):
# Read the file to pre-import airflow modules used.
# This prevents them from being re-imported from zero in each "processing" process
# and saves CPU time and memory.
for module in iter_airflow_imports(self.file_path):
try:
importlib.import_module(module)
except Exception as e:
# only log as warning because an error here is not preventing anything from working, and
# if it's serious, it's going to be surfaced to the user when the dag is actually parsed.
self.log.warning(
"Error when trying to pre-import module '%s' found in %s: %s",
module,
self.file_path,
e,
)

context = self._get_multiprocessing_context()

_parent_channel, _child_channel = context.Pipe(duplex=False)
Expand Down
21 changes: 21 additions & 0 deletions airflow/utils/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations

import ast
import io
import logging
import os
Expand Down Expand Up @@ -371,3 +372,23 @@ def might_contain_dag_via_default_heuristic(file_path: str, zip_file: zipfile.Zi
content = dag_file.read()
content = content.lower()
return all(s in content for s in (b"dag", b"airflow"))


def _find_imported_modules(module: ast.Module) -> Generator[str, None, None]:
for st in module.body:
if isinstance(st, ast.Import):
for n in st.names:
yield n.name
elif isinstance(st, ast.ImportFrom) and st.module is not None:
yield st.module


def iter_airflow_imports(file_path: str) -> Generator[str, None, None]:
"""Find Airflow modules imported in the given file."""
try:
parsed = ast.parse(Path(file_path).read_bytes())
except (OSError, SyntaxError, UnicodeDecodeError):
return
for m in _find_imported_modules(parsed):
if m.startswith("airflow."):
yield m
3 changes: 2 additions & 1 deletion tests/dag_processing/test_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ def test_max_runs_when_no_files(self):
parent_pipe.close()

@pytest.mark.backend("mysql", "postgres")
def test_start_new_processes_with_same_filepath(self):
@mock.patch("airflow.dag_processing.processor.iter_airflow_imports")
def test_start_new_processes_with_same_filepath(self, _):
"""
Test that when a processor already exist with a filepath, a new processor won't be created
with that filepath. The filepath will just be removed from the list.
Expand Down
47 changes: 47 additions & 0 deletions tests/dags/test_imports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# fmt: off

# this file contains sample code than only needs to pass the lexer
# it is "badly" formatted on purpose to test edge cases.

from __future__ import annotations

# multiline import
import \
datetime, \
enum,time
"""
import airflow.in_comment
"""
# from import
from airflow.utils import file
# multiline airflow import
import airflow.decorators, airflow.models\
, airflow.sensors

if prod:
import airflow.if_branch
else:
import airflow.else_branch

def f():
# local import
import airflow.local_import

# fmt: on
1 change: 1 addition & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3014,6 +3014,7 @@ def test_list_py_file_paths(self):
"test_ignore_this.py",
"test_invalid_param.py",
"test_nested_dag.py",
"test_imports.py",
"__init__.py",
}
for root, _, files in os.walk(TEST_DAG_FOLDER):
Expand Down
26 changes: 26 additions & 0 deletions tests/utils/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,29 @@ def test_might_contain_dag(self):

# With safe_mode is False, the user defined callable won't be invoked
assert file_utils.might_contain_dag(file_path=file_path_with_dag, safe_mode=False)

def test_get_modules(self):
file_path = os.path.join(TEST_DAGS_FOLDER, "test_imports.py")

modules = list(file_utils.iter_airflow_imports(file_path))

assert len(modules) == 4
assert "airflow.utils" in modules
assert "airflow.decorators" in modules
assert "airflow.models" in modules
assert "airflow.sensors" in modules
# this one is a local import, we don't want it.
assert "airflow.local_import" not in modules
# this one is in a comment, we don't want it
assert "airflow.in_comment" not in modules
# we don't want imports under conditions
assert "airflow.if_branch" not in modules
assert "airflow.else_branch" not in modules

def test_get_modules_from_invalid_file(self):
file_path = os.path.join(TEST_DAGS_FOLDER, "README.md") # just getting a non-python file

# should not error
modules = list(file_utils.iter_airflow_imports(file_path))

assert len(modules) == 0