diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index a90fda69ff476..c882804778cf6 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -417,9 +417,6 @@ def string_lower_type(val): ), ) -# list_tasks -ARG_TREE = Arg(("-t", "--tree"), help="Tree view", action="store_true") - # tasks_run # This is a hidden option -- not meant for users to set or know about ARG_SHUT_DOWN_LOGGING = Arg( @@ -1324,7 +1321,7 @@ class GroupCommand(NamedTuple): name="list", help="List the tasks within a DAG", func=lazy_load_command("airflow.cli.commands.task_command.task_list"), - args=(ARG_DAG_ID, ARG_TREE, ARG_SUBDIR, ARG_VERBOSE), + args=(ARG_DAG_ID, ARG_SUBDIR, ARG_VERBOSE), ), ActionCommand( name="clear", diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py index ffb08ff66d186..bdec53e1ffc80 100644 --- a/airflow/cli/commands/task_command.py +++ b/airflow/cli/commands/task_command.py @@ -547,11 +547,8 @@ def task_state(args) -> None: def task_list(args, dag: DAG | None = None) -> None: """List the tasks within a DAG at the command line.""" dag = dag or get_dag(args.subdir, args.dag_id) - if args.tree: - dag.tree_view() - else: - tasks = sorted(t.task_id for t in dag.tasks) - print("\n".join(tasks)) + tasks = sorted(t.task_id for t in dag.tasks) + print("\n".join(tasks)) class _SupportedDebugger(Protocol): diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 96d6216c5da49..bd1679e177b42 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -39,7 +39,6 @@ Callable, Collection, Container, - Generator, Iterable, Iterator, List, @@ -2224,28 +2223,6 @@ def pickle(self, session=NEW_SESSION) -> DagPickle: return dp - def tree_view(self) -> None: - """Print an ASCII tree representation of the DAG.""" - for tmp in self._generate_tree_view(): - print(tmp) - - def _generate_tree_view(self) -> Generator[str, None, None]: - def get_downstream(task, level=0) -> Generator[str, None, None]: - yield (" " * level * 4) + str(task) - level += 1 - for tmp_task in sorted(task.downstream_list, key=lambda x: x.task_id): - yield from get_downstream(tmp_task, level) - - for t in sorted(self.roots, key=lambda x: x.task_id): - yield from get_downstream(t) - - def get_tree_view(self) -> str: - """Return an ASCII tree representation of the DAG.""" - rst = "" - for tmp in self._generate_tree_view(): - rst += tmp + "\n" - return rst - @property def task(self) -> TaskDecoratorCollection: from airflow.decorators import task diff --git a/newsfragments/41964.significant.rst b/newsfragments/41964.significant.rst new file mode 100644 index 0000000000000..918587ae6ca21 --- /dev/null +++ b/newsfragments/41964.significant.rst @@ -0,0 +1,6 @@ +``--tree`` flag for ``airflow tasks list`` command removed + +The format of the output with that flag can be expensive to generate and extremely large, depending on the DAG. +``airflow dag show`` is a better way to visualize the relationship of tasks in a DAG. + +``DAG.tree_view`` and ``DAG.get_tree_view`` have also been removed. diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py index 5dcdb32dc7856..aa428c92486ec 100644 --- a/tests/cli/commands/test_task_command.py +++ b/tests/cli/commands/test_task_command.py @@ -115,9 +115,6 @@ def test_cli_list_tasks(self): args = self.parser.parse_args(["tasks", "list", dag_id]) task_command.task_list(args) - args = self.parser.parse_args(["tasks", "list", "example_bash_operator", "--tree"]) - task_command.task_list(args) - @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_test(self): """Test the `airflow test` command""" diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 804f32ae6e333..17e9c1b165a9f 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -25,10 +25,8 @@ import re import warnings import weakref -from contextlib import redirect_stdout from datetime import timedelta from importlib import reload -from io import StringIO from pathlib import Path from typing import TYPE_CHECKING from unittest import mock @@ -1288,34 +1286,6 @@ def test_leaves(self): assert set(dag.leaves) == {op4, op5} - def test_tree_view(self): - """Verify correctness of dag.tree_view().""" - with DAG("test_dag", schedule=None, start_date=DEFAULT_DATE) as dag: - op1_a = EmptyOperator(task_id="t1_a") - op1_b = EmptyOperator(task_id="t1_b") - op2 = EmptyOperator(task_id="t2") - op3 = EmptyOperator(task_id="t3") - op1_b >> op2 - op1_a >> op2 >> op3 - - with redirect_stdout(StringIO()) as stdout: - dag.tree_view() - stdout = stdout.getvalue() - - stdout_lines = stdout.splitlines() - assert "t1_a" in stdout_lines[0] - assert "t2" in stdout_lines[1] - assert "t3" in stdout_lines[2] - assert "t1_b" in stdout_lines[3] - assert dag.get_tree_view() == ( - "\n" - " \n" - " \n" - "\n" - " \n" - " \n" - ) - def test_duplicate_task_ids_not_allowed_with_dag_context_manager(self): """Verify tasks with Duplicate task_id raises error""" with DAG("test_dag", schedule=None, start_date=DEFAULT_DATE) as dag: