Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 0 additions & 105 deletions airflow/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,108 +15,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os

import rich_click as click

from airflow import settings
from airflow.utils.cli import ColorMode
from airflow.utils.timezone import parse as parsedate

BUILD_DOCS = "BUILDING_AIRFLOW_DOCS" in os.environ

click_color = click.option(
'--color',
type=click.Choice([ColorMode.ON, ColorMode.OFF, ColorMode.AUTO]),
default=ColorMode.AUTO,
help="Do emit colored output (default: auto)",
)
click_conf = click.option(
'-c', '--conf', help="JSON string that gets pickled into the DagRun's conf attribute"
)
click_daemon = click.option(
"-D", "--daemon", 'daemon_', is_flag=True, help="Daemonize instead of running in the foreground"
)
click_dag_id = click.argument("dag_id", help="The id of the dag")
click_dag_id_opt = click.option("-d", "--dag-id", help="The id of the dag")
click_debug = click.option(
"-d", "--debug", is_flag=True, help="Use the server that ships with Flask in debug mode"
)
click_dry_run = click.option(
'-n',
'--dry-run',
is_flag=True,
default=False,
help="Perform a dry run for each task. Only renders Template Fields for each task, nothing else",
)
click_end_date = click.option(
"-e",
"--end-date",
type=parsedate,
help="Override end_date YYYY-MM-DD",
)
click_execution_date = click.argument("execution_date", help="The execution date of the DAG", type=parsedate)
click_execution_date_or_run_id = click.argument(
"execution_date_or_run_id", help="The execution_date of the DAG or run_id of the DAGRun"
)
click_log_file = click.option(
"-l",
"--log-file",
metavar="LOG_FILE",
type=click.Path(exists=False, dir_okay=False, writable=True),
help="Location of the log file",
)
click_output = click.option(
"-o",
"--output",
type=click.Choice(["table", "json", "yaml", "plain"]),
default="table",
help="Output format.",
)
click_pid = click.option("--pid", metavar="PID", type=click.Path(exists=False), help="PID file location")
click_start_date = click.option(
"-s",
"--start-date",
type=parsedate,
help="Override start_date YYYY-MM-DD",
)
click_stderr = click.option(
"--stderr",
metavar="STDERR",
type=click.Path(exists=False, dir_okay=False, writable=True),
help="Redirect stderr to this file",
)
click_stdout = click.option(
"--stdout",
metavar="STDOUT",
type=click.Path(exists=False, dir_okay=False, writable=True),
help="Redirect stdout to this file",
)
click_subdir = click.option(
"-S",
"--subdir",
default='[AIRFLOW_HOME]/dags' if BUILD_DOCS else settings.DAGS_FOLDER,
type=click.Path(),
help=(
"File location or directory from which to look for the dag. "
"Defaults to '[AIRFLOW_HOME]/dags' where [AIRFLOW_HOME] is the "
"value you set for 'AIRFLOW_HOME' config you set in 'airflow.cfg' "
),
)
click_task_id = click.argument("task_id", help="The id of the task")
click_task_regex = click.option(
"-t", "--task-regex", help="The regex to filter specific task_ids to backfill (optional)"
)
click_verbose = click.option(
'-v', '--verbose', is_flag=True, default=False, help="Make logging output more verbose"
)
click_yes = click.option(
'-y', '--yes', is_flag=True, default=False, help="Do not prompt to confirm. Use with care!"
)


# https://click.palletsprojects.com/en/8.1.x/documentation/#help-parameter-customization
@click.group(context_settings={'help_option_names': ['-h', '--help']})
@click.pass_context
def airflow_cmd(ctx):
pass
23 changes: 0 additions & 23 deletions airflow/cli/__main__.py

This file was deleted.

71 changes: 25 additions & 46 deletions airflow/utils/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,14 @@

def _check_cli_args(args):
if not args:
raise ValueError(f"Args should be set: {args} [{type(args)}]")
raise ValueError("Args should be set")
if not isinstance(args[0], Namespace):
raise ValueError(
f"1st positional argument should be argparse.Namespace instance, but is {type(args[0])}"
)


def action_cli(func=None, check_db=True, check_cli_args=True):
def action_cli(func=None, check_db=True):
def action_logging(f: T) -> T:
"""
Decorates function to execute function at the same time submitting action_logging
Expand All @@ -75,14 +79,15 @@ def action_logging(f: T) -> T:
@functools.wraps(f)
def wrapper(*args, **kwargs):
"""
An wrapper for cli functions.
An wrapper for cli functions. It assumes to have Namespace instance
at 1st positional argument

:param args: Positional argument.
:param args: Positional argument. It assumes to have Namespace instance
at 1st positional argument
:param kwargs: A passthrough keyword argument
"""
if check_cli_args:
_check_cli_args(args)
metrics = _build_metrics(f.__name__, args, kwargs)
_check_cli_args(args)
metrics = _build_metrics(f.__name__, args[0])
cli_action_loggers.on_pre_execution(**metrics)
try:
# Check and run migrations if necessary
Expand All @@ -106,16 +111,15 @@ def wrapper(*args, **kwargs):
return action_logging


def _build_metrics(func_name, args, kwargs):
def _build_metrics(func_name, namespace):
"""
Builds metrics dict from function args
If the first item in args is a Namespace instance, it assumes that it
optionally contains "dag_id", "task_id", and "execution_date".
It assumes that function arguments is from airflow.bin.cli module's function
and has Namespace instance where it optionally contains "dag_id", "task_id",
and "execution_date".

:param func_name: name of function
:param args: Arguments from wrapped function, possibly including the Namespace instance from
argparse as the first argument
:param kwargs: Keyword arguments from wrapped function
:param namespace: Namespace instance from argparse
:return: dict with metrics
"""
from airflow.models import Log
Expand All @@ -142,7 +146,11 @@ def _build_metrics(func_name, args, kwargs):
'user': getuser(),
}

tmp_dic = vars(args[0]) if (args and isinstance(args[0], Namespace)) else kwargs
if not isinstance(namespace, Namespace):
raise ValueError(
f"namespace argument should be argparse.Namespace instance, but is {type(namespace)}"
)
tmp_dic = vars(namespace)
metrics['dag_id'] = tmp_dic.get('dag_id')
metrics['task_id'] = tmp_dic.get('task_id')
metrics['execution_date'] = tmp_dic.get('execution_date')
Expand Down Expand Up @@ -298,13 +306,11 @@ class ColorMode:
AUTO = "auto"


def should_use_colors(args_or_color):
def should_use_colors(args) -> bool:
"""Processes arguments and decides whether to enable color in output"""
# args.color is from argparse, Click CLI will pass in the color directly
color = args_or_color.color if hasattr(args_or_color, 'color') else args_or_color
if color == ColorMode.ON:
if args.color == ColorMode.ON:
return True
if color == ColorMode.OFF:
if args.color == ColorMode.OFF:
return False
return is_terminal_support_colors()

Expand Down Expand Up @@ -332,30 +338,3 @@ def _wrapper(*args, **kwargs):
logging.disable(logging.NOTSET)

return cast(T, _wrapper)


def suppress_logs_and_warning_click_compatible(f: T) -> T:
"""
Click compatible version of suppress_logs_and_warning.
Place after click_verbose decorator.

Decorator to suppress logging and warning messages
in cli functions.
"""

@functools.wraps(f)
def _wrapper(*args, **kwargs):
if kwargs.get("verbose"):
f(*args, **kwargs)
else:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
logging.disable(logging.CRITICAL)
try:
f(*args, **kwargs)
finally:
# logging output again depends on the effective
# levels of individual loggers
logging.disable(logging.NOTSET)

return cast(T, _wrapper)
2 changes: 0 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ install_requires =
python-nvd3>=0.15.0
python-slugify>=5.0
rich>=12.4.4
rich-click>=1.3.1
setproctitle>=1.1.8
sqlalchemy>=1.4
sqlalchemy_jsonfield>=1.0
Expand Down Expand Up @@ -175,7 +174,6 @@ generated=
[options.entry_points]
console_scripts=
airflow=airflow.__main__:main
airflow-ng=airflow.cli.__main__:airflow_cmd

[bdist_wheel]
python-tag=py3
Expand Down
4 changes: 2 additions & 2 deletions tests/utils/test_cli_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def test_metrics_build(self):
func_name = 'test'
exec_date = datetime.utcnow()
namespace = Namespace(dag_id='foo', task_id='bar', subcommand='test', execution_date=exec_date)
metrics = cli._build_metrics(func_name, [namespace], {})
metrics = cli._build_metrics(func_name, namespace)

expected = {
'user': os.environ.get('USER'),
Expand Down Expand Up @@ -132,7 +132,7 @@ def test_cli_create_user_supplied_password_is_masked(self, given_command, expect
exec_date = datetime.utcnow()
namespace = Namespace(dag_id='foo', task_id='bar', subcommand='test', execution_date=exec_date)
with mock.patch.object(sys, "argv", args):
metrics = cli._build_metrics(args[1], [namespace], {})
metrics = cli._build_metrics(args[1], namespace)

assert metrics.get('start_datetime') <= datetime.utcnow()

Expand Down