diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index 4a9a1f2c12d74..ef891fba62832 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -27,7 +27,6 @@ from graphviz.dot import Dot from sqlalchemy.orm import Session -from sqlalchemy.sql.functions import func from airflow import settings from airflow.api.client import get_current_api_client @@ -38,6 +37,7 @@ from airflow.models import DagBag, DagModel, DagRun, TaskInstance from airflow.models.dag import DAG from airflow.models.serialized_dag import SerializedDagModel +from airflow.timetables.base import DataInterval from airflow.utils import cli as cli_utils, timezone from airflow.utils.cli import get_dag, get_dags, process_subdir, sigint_handler, suppress_logs_and_warning from airflow.utils.dot_renderer import render_dag, render_dag_dependencies @@ -296,36 +296,26 @@ def dag_next_execution(args): print("[INFO] Please be reminded this DAG is PAUSED now.", file=sys.stderr) with create_session() as session: - max_date_subq = ( - session.query(func.max(DagRun.execution_date).label("max_date")) - .filter(DagRun.dag_id == dag.dag_id) - .subquery() - ) - max_date_run: DagRun | None = ( - session.query(DagRun) - .filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date == max_date_subq.c.max_date) - .one_or_none() - ) - - if max_date_run is None: - print("[WARN] Only applicable when there is execution record found for the DAG.", file=sys.stderr) + last_parsed_dag: DagModel = session.query(DagModel).filter(DagModel.dag_id == dag.dag_id).one() + + def print_execution_interval(interval: DataInterval | None): + if interval is None: + print( + "[WARN] No following schedule can be found. " + "This DAG may have schedule interval '@once' or `None`.", + file=sys.stderr, + ) print(None) return + print(interval.start.isoformat()) - next_info = dag.next_dagrun_info(dag.get_run_data_interval(max_date_run), restricted=False) - if next_info is None: - print( - "[WARN] No following schedule can be found. " - "This DAG may have schedule interval '@once' or `None`.", - file=sys.stderr, - ) - print(None) - return + next_interval = dag.get_next_data_interval(last_parsed_dag) + print_execution_interval(next_interval) - print(next_info.logical_date.isoformat()) - for _ in range(1, args.num_executions): - next_info = dag.next_dagrun_info(next_info.data_interval, restricted=False) - print(next_info.logical_date.isoformat()) + for i in range(1, args.num_executions): + next_info = dag.next_dagrun_info(next_interval, restricted=False) + next_interval = None if next_info is None else next_info.data_interval + print_execution_interval(next_interval) @cli_utils.action_cli diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index 81d62699ba42f..6b4e2187fadcf 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -28,6 +28,7 @@ import pendulum import pytest +import time_machine from airflow import settings from airflow.cli import cli_parser @@ -37,7 +38,6 @@ from airflow.models.serialized_dag import SerializedDagModel from airflow.utils import timezone from airflow.utils.session import create_session -from airflow.utils.state import State from airflow.utils.types import DagRunType from tests.models import TEST_DAGS_FOLDER from tests.test_utils.config import conf_vars @@ -237,7 +237,6 @@ def test_backfill(self, mock_run): @mock.patch("airflow.cli.commands.dag_command.get_dag") def test_backfill_fails_without_loading_dags(self, mock_get_dag): - cli_args = self.parser.parse_args(["dags", "backfill", "example_bash_operator"]) with pytest.raises(AirflowException): @@ -384,71 +383,82 @@ def test_cli_backfill_depends_on_past_backwards(self, mock_run): disable_retry=False, ) - def test_next_execution(self): - dag_ids = [ - "example_bash_operator", # schedule='0 0 * * *' - "latest_only", # schedule=timedelta(hours=4) - "example_python_operator", # schedule=None - "example_xcom", # schedule="@once" + def test_next_execution(self, tmp_path): + dag_test_list = [ + ("future_schedule_daily", "timedelta(days=5)", "'0 0 * * *'", "True"), + ("future_schedule_every_4_hours", "timedelta(days=5)", "timedelta(hours=4)", "True"), + ("future_schedule_once", "timedelta(days=5)", "'@once'", "True"), + ("future_schedule_none", "timedelta(days=5)", "None", "True"), + ("past_schedule_once", "timedelta(days=-5)", "'@once'", "True"), + ("past_schedule_daily", "timedelta(days=-5)", "'0 0 * * *'", "True"), + ("past_schedule_daily_catchup_false", "timedelta(days=-5)", "'0 0 * * *'", "False"), ] - # Delete DagRuns - with create_session() as session: - dr = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids)) - dr.delete(synchronize_session=False) - - # Test None output - args = self.parser.parse_args(["dags", "next-execution", dag_ids[0]]) - with contextlib.redirect_stdout(io.StringIO()) as temp_stdout: - dag_command.dag_next_execution(args) - out = temp_stdout.getvalue() - # `next_execution` function is inapplicable if no execution record found - # It prints `None` in such cases - assert "None" in out - - # The details below is determined by the schedule_interval of example DAGs - now = DEFAULT_DATE - expected_output = [ - (now + timedelta(days=1)).isoformat(), - (now + timedelta(hours=4)).isoformat(), - "None", - "None", - ] - expected_output_2 = [ - (now + timedelta(days=1)).isoformat() + os.linesep + (now + timedelta(days=2)).isoformat(), - (now + timedelta(hours=4)).isoformat() + os.linesep + (now + timedelta(hours=8)).isoformat(), - "None", - "None", - ] - - for i, dag_id in enumerate(dag_ids): - dag = self.dagbag.dags[dag_id] - # Create a DagRun for each DAG, to prepare for next step - dag.create_dagrun( - run_type=DagRunType.SCHEDULED, - execution_date=now, - start_date=now, - state=State.FAILED, + for f in dag_test_list: + file_content = os.linesep.join( + [ + "from airflow import DAG", + "from airflow.operators.empty import EmptyOperator", + "from datetime import timedelta; from pendulum import today", + f"dag = DAG('{f[0]}', start_date=today() + {f[1]}, schedule={f[2]}, catchup={f[3]})", + "task = EmptyOperator(task_id='empty_task',dag=dag)", + ] ) + dag_file = tmp_path / f"{f[0]}.py" + dag_file.write_text(file_content) + + with time_machine.travel(DEFAULT_DATE): + clear_db_dags() + self.dagbag = DagBag(dag_folder=tmp_path, include_examples=False) + self.dagbag.sync_to_db() + + default_run = DEFAULT_DATE + future_run = default_run + timedelta(days=5) + past_run = default_run + timedelta(days=-5) + + expected_output = { + "future_schedule_daily": ( + future_run.isoformat(), + future_run.isoformat() + os.linesep + (future_run + timedelta(days=1)).isoformat(), + ), + "future_schedule_every_4_hours": ( + future_run.isoformat(), + future_run.isoformat() + os.linesep + (future_run + timedelta(hours=4)).isoformat(), + ), + "future_schedule_once": (future_run.isoformat(), future_run.isoformat() + os.linesep + "None"), + "future_schedule_none": ("None", "None"), + "past_schedule_once": (past_run.isoformat(), "None"), + "past_schedule_daily": ( + past_run.isoformat(), + past_run.isoformat() + os.linesep + (past_run + timedelta(days=1)).isoformat(), + ), + "past_schedule_daily_catchup_false": ( + (default_run - timedelta(days=1)).isoformat(), + (default_run - timedelta(days=1)).isoformat() + os.linesep + default_run.isoformat(), + ), + } + for dag_id in expected_output: # Test num-executions = 1 (default) - args = self.parser.parse_args(["dags", "next-execution", dag_id]) + args = self.parser.parse_args(["dags", "next-execution", dag_id, "-S", str(tmp_path)]) with contextlib.redirect_stdout(io.StringIO()) as temp_stdout: dag_command.dag_next_execution(args) out = temp_stdout.getvalue() - assert expected_output[i] in out + assert expected_output[dag_id][0] in out # Test num-executions = 2 - args = self.parser.parse_args(["dags", "next-execution", dag_id, "--num-executions", "2"]) + args = self.parser.parse_args( + ["dags", "next-execution", dag_id, "--num-executions", "2", "-S", str(tmp_path)] + ) with contextlib.redirect_stdout(io.StringIO()) as temp_stdout: dag_command.dag_next_execution(args) out = temp_stdout.getvalue() - assert expected_output_2[i] in out + assert expected_output[dag_id][1] in out - # Clean up before leaving - with create_session() as session: - dr = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids)) - dr.delete(synchronize_session=False) + # Rebuild Test DB for other tests + clear_db_dags() + TestCliDags.dagbag = DagBag(include_examples=True) + TestCliDags.dagbag.sync_to_db() @conf_vars({("core", "load_examples"): "true"}) def test_cli_report(self):