Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,14 @@ repos:
files: \.py$
exclude: ^tests/.*\.py$|^airflow/_vendor/.*$
pass_filenames: true
- id: pylint
require_serial: true # Pylint tests should be run in one chunk to detect all cycles
- id: pylint-tests
name: Run pylint for tests
language: system
entry: "./scripts/ci/pre_commit_pylint_tests.sh"
files: ^tests/.*\.py$
pass_filenames: true
require_serial: false # For tests, it's perfectly OK to run pylint in parallel
- id: flake8
name: Run flake8
language: system
Expand Down
4 changes: 2 additions & 2 deletions BREEZE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ This is the current syntax for `./breeze <./breeze>`_:
-S, --static-check <STATIC_CHECK>
Run selected static checks for currently changed files. You should specify static check that
you would like to run or 'all' to run all checks. One of
[ all all-but-pylint check-apache-license check-executables-have-shebangs check-hooks-apply check-merge-conflict check-xml debug-statements doctoc detect-private-key end-of-file-fixer flake8 forbid-tabs insert-license lint-dockerfile mixed-line-ending mypy pylint setup-order shellcheck].
[ all all-but-pylint check-apache-license check-executables-have-shebangs check-hooks-apply check-merge-conflict check-xml debug-statements doctoc detect-private-key end-of-file-fixer flake8 forbid-tabs insert-license lint-dockerfile mixed-line-ending mypy pylint pylint-test setup-order shellcheck].
You can pass extra arguments including options to to the pre-commit framework as
<EXTRA_ARGS> passed after --. For example:

Expand All @@ -886,7 +886,7 @@ This is the current syntax for `./breeze <./breeze>`_:
-F, --static-check-all-files <STATIC_CHECK>
Run selected static checks for all applicable files. You should specify static check that
you would like to run or 'all' to run all checks. One of
[ all all-but-pylint check-apache-license check-executables-have-shebangs check-hooks-apply check-merge-conflict check-xml debug-statements doctoc detect-private-key end-of-file-fixer flake8 forbid-tabs insert-license lint-dockerfile mixed-line-ending mypy pylint setup-order shellcheck].
[ all all-but-pylint check-apache-license check-executables-have-shebangs check-hooks-apply check-merge-conflict check-xml debug-statements doctoc detect-private-key end-of-file-fixer flake8 forbid-tabs insert-license lint-dockerfile mixed-line-ending mypy pylint pylint-test setup-order shellcheck].
You can pass extra arguments including options to the pre-commit framework as
<EXTRA_ARGS> passed after --. For example:

Expand Down
4 changes: 3 additions & 1 deletion CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,9 @@ image built locally):
----------------------------------- ---------------------------------------------------------------- ------------
``pydevd`` Check for accidentally commited pydevd statements.
----------------------------------- ---------------------------------------------------------------- ------------
``pylint`` Runs pylint. *
``pylint`` Runs pylint for main code. *
----------------------------------- ---------------------------------------------------------------- ------------
``pylint-tests`` Runs pylint for tests. *
----------------------------------- ---------------------------------------------------------------- ------------
``python-no-log-warn`` Checks if there are no deprecate log warn.
----------------------------------- ---------------------------------------------------------------- ------------
Expand Down
5 changes: 5 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ assists users migrating to a new version.

## Airflow Master

### Removal of airflow.AirflowMacroPlugin class

The class was there in airflow package but it has not been used (apparently since 2015).
It has been removed.

### Changes to settings

CONTEXT_MANAGER_DAG was removed from settings. It's role has been taken by `DagContext` in
Expand Down
21 changes: 3 additions & 18 deletions airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,8 @@

settings.initialize()

login = None # type: Optional[Callable]
from airflow.plugins_manager import integrate_plugins

from airflow import executors
from airflow import hooks
from airflow import macros
from airflow import operators
from airflow import sensors
login: Optional[Callable] = None


class AirflowMacroPlugin:
# pylint: disable=missing-docstring
def __init__(self, namespace):
self.namespace = namespace


operators._integrate_plugins() # pylint: disable=protected-access
sensors._integrate_plugins() # pylint: disable=protected-access
hooks._integrate_plugins() # pylint: disable=protected-access
executors._integrate_plugins() # pylint: disable=protected-access
macros._integrate_plugins() # pylint: disable=protected-access
integrate_plugins()
2 changes: 1 addition & 1 deletion airflow/cli/commands/flower_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import daemon
from daemon.pidfile import TimeoutPIDLockFile

from airflow import conf
from airflow.configuration import conf
from airflow.utils import cli as cli_utils
from airflow.utils.cli import setup_locations, sigint_handler

Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/commands/serve_logs_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"""Serve logs command"""
import os

from airflow import conf
from airflow.configuration import conf
from airflow.utils import cli as cli_utils


Expand Down
4 changes: 2 additions & 2 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from contextlib import redirect_stderr, redirect_stdout

from airflow import DAG, AirflowException, conf, jobs, settings
from airflow.executors import get_default_executor
from airflow.executors.executor_loader import ExecutorLoader
from airflow.models import DagPickle, TaskInstance
from airflow.ti_deps.dep_context import SCHEDULER_QUEUED_DEPS, DepContext
from airflow.utils import cli as cli_utils, db
Expand Down Expand Up @@ -69,7 +69,7 @@ def _run(args, dag, ti):
print(e)
raise e

executor = get_default_executor()
executor = ExecutorLoader.get_default_executor()
executor.start()
print("Sending to executor.")
executor.queue_task_instance(
Expand Down
3 changes: 2 additions & 1 deletion airflow/cli/commands/worker_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import daemon
from daemon.pidfile import TimeoutPIDLockFile

from airflow import conf, settings
from airflow import settings
from airflow.configuration import conf
from airflow.utils import cli as cli_utils
from airflow.utils.cli import setup_locations, setup_logging, sigint_handler

Expand Down
84 changes: 1 addition & 83 deletions airflow/executors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand All @@ -16,84 +14,4 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# pylint: disable=missing-docstring

import sys
from typing import Optional

from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor
from airflow.executors.local_executor import LocalExecutor
from airflow.executors.sequential_executor import SequentialExecutor
from airflow.utils.log.logging_mixin import LoggingMixin

DEFAULT_EXECUTOR = None # type: Optional[BaseExecutor]


def _integrate_plugins():
"""Integrate plugins to the context."""
from airflow.plugins_manager import executors_modules
for executors_module in executors_modules:
sys.modules[executors_module.__name__] = executors_module
globals()[executors_module._name] = executors_module # pylint: disable=protected-access


def get_default_executor():
"""Creates a new instance of the configured executor if none exists and returns it"""
global DEFAULT_EXECUTOR # pylint: disable=global-statement

if DEFAULT_EXECUTOR is not None:
return DEFAULT_EXECUTOR

executor_name = conf.get('core', 'EXECUTOR')

DEFAULT_EXECUTOR = _get_executor(executor_name)

log = LoggingMixin().log
log.info("Using executor %s", executor_name)

return DEFAULT_EXECUTOR


class Executors:
LocalExecutor = "LocalExecutor"
SequentialExecutor = "SequentialExecutor"
CeleryExecutor = "CeleryExecutor"
DaskExecutor = "DaskExecutor"
KubernetesExecutor = "KubernetesExecutor"


def _get_executor(executor_name):
"""
Creates a new instance of the named executor.
In case the executor name is not know in airflow,
look for it in the plugins
"""
if executor_name == Executors.LocalExecutor:
return LocalExecutor()
elif executor_name == Executors.SequentialExecutor:
return SequentialExecutor()
elif executor_name == Executors.CeleryExecutor:
from airflow.executors.celery_executor import CeleryExecutor
return CeleryExecutor()
elif executor_name == Executors.DaskExecutor:
from airflow.executors.dask_executor import DaskExecutor
return DaskExecutor()
elif executor_name == Executors.KubernetesExecutor:
from airflow.executors.kubernetes_executor import KubernetesExecutor
return KubernetesExecutor()
else:
# Loading plugins
_integrate_plugins()
executor_path = executor_name.split('.')
if len(executor_path) != 2:
raise AirflowException(
"Executor {0} not supported: "
"please specify in format plugin_module.executor".format(executor_name))

if executor_path[0] in globals():
return globals()[executor_path[0]].__dict__[executor_path[1]]()
else:
raise AirflowException("Executor {0} not supported.".format(executor_name))
"""Executors."""
Loading