Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 0 additions & 151 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,13 @@
import json
import os
import textwrap
from argparse import ArgumentError
from typing import Callable, Iterable, NamedTuple, Union

import lazy_object_proxy

from airflow import settings
from airflow.cli.commands.legacy_commands import check_legacy_command
from airflow.configuration import conf
from airflow.executors.executor_constants import CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR
from airflow.executors.executor_loader import ExecutorLoader
from airflow.settings import _ENABLE_AIP_44
from airflow.utils.cli import ColorMode
from airflow.utils.module_loading import import_string
Expand Down Expand Up @@ -61,46 +58,6 @@ class DefaultHelpParser(argparse.ArgumentParser):

def _check_value(self, action, value):
"""Override _check_value and check conditionally added command."""
if action.dest == "subcommand" and value == "celery":
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this was an attempt to handle the situation of using CLI commands with missing deps. Now only the configured executor commands are available and we assume that the dependencies for that are installed (otherwise the user has much bigger issues to worry about).

executor = conf.get("core", "EXECUTOR")
if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
executor_cls, _ = ExecutorLoader.import_executor_cls(executor)
classes = ()
try:
from airflow.providers.celery.executors.celery_executor import CeleryExecutor

classes += (CeleryExecutor,)
except ImportError:
message = (
"The celery subcommand requires that you pip install the celery module. "
"To do it, run: pip install 'apache-airflow[celery]'"
)
raise ArgumentError(action, message)
try:
from airflow.providers.celery.executors.celery_kubernetes_executor import (
CeleryKubernetesExecutor,
)

classes += (CeleryKubernetesExecutor,)
except ImportError:
pass
if not issubclass(executor_cls, classes):
message = (
f"celery subcommand works only with CeleryExecutor, CeleryKubernetesExecutor and "
f"executors derived from them, your current executor: {executor}, subclassed from: "
f'{", ".join([base_cls.__qualname__ for base_cls in executor_cls.__bases__])}'
)
raise ArgumentError(action, message)
if action.dest == "subcommand" and value == "kubernetes":
try:
import kubernetes.client # noqa: F401
except ImportError:
message = (
"The kubernetes subcommand requires that you pip install the kubernetes python client. "
"To do it, run: pip install 'apache-airflow[cncf.kubernetes]'"
)
raise ArgumentError(action, message)

if action.choices is not None and value not in action.choices:
check_legacy_command(action, value)

Expand Down Expand Up @@ -823,25 +780,6 @@ def string_lower_type(val):
action="store_true",
)

ARG_QUEUES = Arg(
("-q", "--queues"),
help="Comma delimited list of queues to serve",
default=conf.get("operators", "DEFAULT_QUEUE"),
)
ARG_CONCURRENCY = Arg(
("-c", "--concurrency"),
type=int,
help="The number of worker processes",
default=conf.getint("celery", "worker_concurrency"),
)
ARG_CELERY_HOSTNAME = Arg(
("-H", "--celery-hostname"),
help="Set the hostname of celery worker if you have multiple workers on a single machine",
)
ARG_UMASK = Arg(
("-u", "--umask"),
help="Set the umask of celery worker in daemon mode",
)
ARG_WITHOUT_MINGLE = Arg(
("--without-mingle",),
default=False,
Expand All @@ -855,34 +793,6 @@ def string_lower_type(val):
action="store_true",
)

# flower
ARG_BROKER_API = Arg(("-a", "--broker-api"), help="Broker API")
ARG_FLOWER_HOSTNAME = Arg(
("-H", "--hostname"),
default=conf.get("celery", "FLOWER_HOST"),
help="Set the hostname on which to run the server",
)
ARG_FLOWER_PORT = Arg(
("-p", "--port"),
default=conf.getint("celery", "FLOWER_PORT"),
type=int,
help="The port on which to run the server",
)
ARG_FLOWER_CONF = Arg(("-c", "--flower-conf"), help="Configuration file for flower")
ARG_FLOWER_URL_PREFIX = Arg(
("-u", "--url-prefix"),
default=conf.get("celery", "FLOWER_URL_PREFIX"),
help="URL prefix for Flower",
)
ARG_FLOWER_BASIC_AUTH = Arg(
("-A", "--basic-auth"),
default=conf.get("celery", "FLOWER_BASIC_AUTH"),
help=(
"Securing Flower with Basic Authentication. "
"Accepts user:password pairs separated by a comma. "
"Example: flower_basic_auth = user1:password1,user2:password2"
),
)
ARG_TASK_PARAMS = Arg(("-t", "--task-params"), help="Sends a JSON params dict to the task")
ARG_POST_MORTEM = Arg(
("-m", "--post-mortem"), action="store_true", help="Open debugger on uncaught exception"
Expand Down Expand Up @@ -1978,55 +1888,6 @@ class GroupCommand(NamedTuple):
),
)

CELERY_COMMANDS = (
ActionCommand(
name="worker",
help="Start a Celery worker node",
func=lazy_load_command("airflow.cli.commands.celery_command.worker"),
args=(
ARG_QUEUES,
ARG_CONCURRENCY,
ARG_CELERY_HOSTNAME,
ARG_PID,
ARG_DAEMON,
ARG_UMASK,
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
ARG_AUTOSCALE,
ARG_SKIP_SERVE_LOGS,
ARG_WITHOUT_MINGLE,
ARG_WITHOUT_GOSSIP,
ARG_VERBOSE,
),
),
ActionCommand(
name="flower",
help="Start a Celery Flower",
func=lazy_load_command("airflow.cli.commands.celery_command.flower"),
args=(
ARG_FLOWER_HOSTNAME,
ARG_FLOWER_PORT,
ARG_FLOWER_CONF,
ARG_FLOWER_URL_PREFIX,
ARG_FLOWER_BASIC_AUTH,
ARG_BROKER_API,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
ARG_VERBOSE,
),
),
ActionCommand(
name="stop",
help="Stop the Celery worker gracefully",
func=lazy_load_command("airflow.cli.commands.celery_command.stop_worker"),
args=(ARG_PID, ARG_VERBOSE),
),
)

CONFIG_COMMANDS = (
ActionCommand(
name="get-value",
Expand Down Expand Up @@ -2109,9 +1970,6 @@ class GroupCommand(NamedTuple):
help="Manage DAGs",
subcommands=DAGS_COMMANDS,
),
GroupCommand(
name="kubernetes", help="Tools to help run the KubernetesExecutor", subcommands=KUBERNETES_COMMANDS
),
GroupCommand(
name="tasks",
help="Manage tasks",
Expand Down Expand Up @@ -2298,15 +2156,6 @@ class GroupCommand(NamedTuple):
func=lazy_load_command("airflow.cli.commands.plugins_command.dump_plugins"),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
GroupCommand(
name="celery",
help="Celery components",
description=(
"Start celery components. Works only when using CeleryExecutor. For more information, see "
"https://airflow.apache.org/docs/apache-airflow/stable/executor/celery.html"
),
subcommands=CELERY_COMMANDS,
),
ActionCommand(
name="standalone",
help="Run an all-in-one copy of Airflow",
Expand Down
18 changes: 18 additions & 0 deletions airflow/cli/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from __future__ import annotations

import argparse
import logging
from argparse import Action
from functools import lru_cache
from typing import Iterable
Expand All @@ -41,10 +42,27 @@
core_commands,
)
from airflow.exceptions import AirflowException
from airflow.executors.executor_loader import ExecutorLoader
from airflow.utils.helpers import partition

airflow_commands = core_commands

log = logging.getLogger(__name__)
try:
executor, _ = ExecutorLoader.import_default_executor_cls(validate=False)
airflow_commands.extend(executor.get_cli_commands())
except Exception:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under no circumstances should the executor commands fail the parsing process. So except any Exception here.

executor_name = ExecutorLoader.get_default_executor_name()
log.exception("Failed to load CLI commands from executor: %s", executor_name)
log.error(
"Ensure all dependencies are met and try again. If using a Celery based executor install "
"a 3.3.0+ version of the Celery provider. If using a Kubernetes executor, install a "
"7.4.0+ version of the CNCF provider"
)
# Do no re-raise the exception since we want the CLI to still function for
# other commands.


ALL_COMMANDS_DICT: dict[str, CLICommand] = {sp.name: sp for sp in airflow_commands}


Expand Down
10 changes: 10 additions & 0 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import pendulum

from airflow.cli.cli_config import GroupCommand
from airflow.configuration import conf
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.stats import Stats
Expand Down Expand Up @@ -479,3 +480,12 @@ def send_callback(self, request: CallbackRequest) -> None:
if not self.callback_sink:
raise ValueError("Callback sink is not ready.")
self.callback_sink.send(request)

@staticmethod
def get_cli_commands() -> list[GroupCommand]:
"""Vends CLI commands to be included in Airflow CLI.

Override this method to expose commands via Airflow CLI to manage this executor. This can
be commands to setup/teardown the executor, inspect state, etc.
"""
return []
16 changes: 12 additions & 4 deletions airflow/executors/executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,24 @@ def load_executor(cls, executor_name: str) -> BaseExecutor:
return executor_cls()

@classmethod
def import_executor_cls(cls, executor_name: str) -> tuple[type[BaseExecutor], ConnectorSource]:
def import_executor_cls(
cls, executor_name: str, validate: bool = True
) -> tuple[type[BaseExecutor], ConnectorSource]:
"""
Imports the executor class.

Supports the same formats as ExecutorLoader.load_executor.

:param executor_name: Name of core executor or module path to provider provided as a plugin.
:param validate: Whether or not to validate the executor before returning

:return: executor class via executor_name and executor import source
"""

def _import_and_validate(path: str) -> type[BaseExecutor]:
executor = import_string(path)
cls.validate_database_executor_compatibility(executor)
if validate:
cls.validate_database_executor_compatibility(executor)
return executor

if executor_name in cls.executors:
Expand All @@ -151,14 +157,16 @@ def _import_and_validate(path: str) -> type[BaseExecutor]:
return _import_and_validate(executor_name), ConnectorSource.CUSTOM_PATH

@classmethod
def import_default_executor_cls(cls) -> tuple[type[BaseExecutor], ConnectorSource]:
def import_default_executor_cls(cls, validate: bool = True) -> tuple[type[BaseExecutor], ConnectorSource]:
"""
Imports the default executor class.

:param validate: Whether or not to validate the executor before returning

:return: executor class and executor import source
"""
executor_name = cls.get_default_executor_name()
executor, source = cls.import_executor_cls(executor_name)
executor, source = cls.import_executor_cls(executor_name, validate=validate)
return executor, source

@classmethod
Expand Down
Loading