Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 17 additions & 27 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

from graphviz.dot import Dot
from sqlalchemy.orm import Session
from sqlalchemy.sql.functions import func

from airflow import settings
from airflow.api.client import get_current_api_client
Expand All @@ -38,6 +37,7 @@
from airflow.models import DagBag, DagModel, DagRun, TaskInstance
from airflow.models.dag import DAG
from airflow.models.serialized_dag import SerializedDagModel
from airflow.timetables.base import DataInterval
from airflow.utils import cli as cli_utils, timezone
from airflow.utils.cli import get_dag, get_dags, process_subdir, sigint_handler, suppress_logs_and_warning
from airflow.utils.dot_renderer import render_dag, render_dag_dependencies
Expand Down Expand Up @@ -296,36 +296,26 @@ def dag_next_execution(args):
print("[INFO] Please be reminded this DAG is PAUSED now.", file=sys.stderr)

with create_session() as session:
max_date_subq = (
session.query(func.max(DagRun.execution_date).label("max_date"))
.filter(DagRun.dag_id == dag.dag_id)
.subquery()
)
max_date_run: DagRun | None = (
session.query(DagRun)
.filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date == max_date_subq.c.max_date)
.one_or_none()
)

if max_date_run is None:
print("[WARN] Only applicable when there is execution record found for the DAG.", file=sys.stderr)
last_parsed_dag: DagModel = session.query(DagModel).filter(DagModel.dag_id == dag.dag_id).one()

def print_execution_interval(interval: DataInterval | None):
if interval is None:
print(
"[WARN] No following schedule can be found. "
"This DAG may have schedule interval '@once' or `None`.",
file=sys.stderr,
)
print(None)
return
print(interval.start.isoformat())

next_info = dag.next_dagrun_info(dag.get_run_data_interval(max_date_run), restricted=False)
if next_info is None:
print(
"[WARN] No following schedule can be found. "
"This DAG may have schedule interval '@once' or `None`.",
file=sys.stderr,
)
print(None)
return
next_interval = dag.get_next_data_interval(last_parsed_dag)
print_execution_interval(next_interval)

print(next_info.logical_date.isoformat())
for _ in range(1, args.num_executions):
next_info = dag.next_dagrun_info(next_info.data_interval, restricted=False)
print(next_info.logical_date.isoformat())
for i in range(1, args.num_executions):
next_info = dag.next_dagrun_info(next_interval, restricted=False)
next_interval = None if next_info is None else next_info.data_interval
print_execution_interval(next_interval)


@cli_utils.action_cli
Expand Down
116 changes: 63 additions & 53 deletions tests/cli/commands/test_dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import pendulum
import pytest
import time_machine

from airflow import settings
from airflow.cli import cli_parser
Expand All @@ -37,7 +38,6 @@
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.types import DagRunType
from tests.models import TEST_DAGS_FOLDER
from tests.test_utils.config import conf_vars
Expand Down Expand Up @@ -237,7 +237,6 @@ def test_backfill(self, mock_run):

@mock.patch("airflow.cli.commands.dag_command.get_dag")
def test_backfill_fails_without_loading_dags(self, mock_get_dag):

cli_args = self.parser.parse_args(["dags", "backfill", "example_bash_operator"])

with pytest.raises(AirflowException):
Expand Down Expand Up @@ -384,71 +383,82 @@ def test_cli_backfill_depends_on_past_backwards(self, mock_run):
disable_retry=False,
)

def test_next_execution(self):
dag_ids = [
"example_bash_operator", # schedule='0 0 * * *'
"latest_only", # schedule=timedelta(hours=4)
"example_python_operator", # schedule=None
"example_xcom", # schedule="@once"
def test_next_execution(self, tmp_path):
dag_test_list = [
("future_schedule_daily", "timedelta(days=5)", "'0 0 * * *'", "True"),
("future_schedule_every_4_hours", "timedelta(days=5)", "timedelta(hours=4)", "True"),
("future_schedule_once", "timedelta(days=5)", "'@once'", "True"),
("future_schedule_none", "timedelta(days=5)", "None", "True"),
("past_schedule_once", "timedelta(days=-5)", "'@once'", "True"),
("past_schedule_daily", "timedelta(days=-5)", "'0 0 * * *'", "True"),
("past_schedule_daily_catchup_false", "timedelta(days=-5)", "'0 0 * * *'", "False"),
]

# Delete DagRuns
with create_session() as session:
dr = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids))
dr.delete(synchronize_session=False)

# Test None output
args = self.parser.parse_args(["dags", "next-execution", dag_ids[0]])
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
dag_command.dag_next_execution(args)
out = temp_stdout.getvalue()
# `next_execution` function is inapplicable if no execution record found
# It prints `None` in such cases
assert "None" in out

# The details below is determined by the schedule_interval of example DAGs
now = DEFAULT_DATE
expected_output = [
(now + timedelta(days=1)).isoformat(),
(now + timedelta(hours=4)).isoformat(),
"None",
"None",
]
expected_output_2 = [
(now + timedelta(days=1)).isoformat() + os.linesep + (now + timedelta(days=2)).isoformat(),
(now + timedelta(hours=4)).isoformat() + os.linesep + (now + timedelta(hours=8)).isoformat(),
"None",
"None",
]

for i, dag_id in enumerate(dag_ids):
dag = self.dagbag.dags[dag_id]
# Create a DagRun for each DAG, to prepare for next step
dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=now,
start_date=now,
state=State.FAILED,
for f in dag_test_list:
file_content = os.linesep.join(
[
"from airflow import DAG",
"from airflow.operators.empty import EmptyOperator",
"from datetime import timedelta; from pendulum import today",
f"dag = DAG('{f[0]}', start_date=today() + {f[1]}, schedule={f[2]}, catchup={f[3]})",
"task = EmptyOperator(task_id='empty_task',dag=dag)",
]
)
dag_file = tmp_path / f"{f[0]}.py"
dag_file.write_text(file_content)

with time_machine.travel(DEFAULT_DATE):
clear_db_dags()
self.dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
self.dagbag.sync_to_db()

default_run = DEFAULT_DATE
future_run = default_run + timedelta(days=5)
past_run = default_run + timedelta(days=-5)

expected_output = {
"future_schedule_daily": (
future_run.isoformat(),
future_run.isoformat() + os.linesep + (future_run + timedelta(days=1)).isoformat(),
),
"future_schedule_every_4_hours": (
future_run.isoformat(),
future_run.isoformat() + os.linesep + (future_run + timedelta(hours=4)).isoformat(),
),
"future_schedule_once": (future_run.isoformat(), future_run.isoformat() + os.linesep + "None"),
"future_schedule_none": ("None", "None"),
"past_schedule_once": (past_run.isoformat(), "None"),
"past_schedule_daily": (
past_run.isoformat(),
past_run.isoformat() + os.linesep + (past_run + timedelta(days=1)).isoformat(),
),
"past_schedule_daily_catchup_false": (
(default_run - timedelta(days=1)).isoformat(),
(default_run - timedelta(days=1)).isoformat() + os.linesep + default_run.isoformat(),
),
}

for dag_id in expected_output:
# Test num-executions = 1 (default)
args = self.parser.parse_args(["dags", "next-execution", dag_id])
args = self.parser.parse_args(["dags", "next-execution", dag_id, "-S", str(tmp_path)])
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
dag_command.dag_next_execution(args)
out = temp_stdout.getvalue()
assert expected_output[i] in out
assert expected_output[dag_id][0] in out

# Test num-executions = 2
args = self.parser.parse_args(["dags", "next-execution", dag_id, "--num-executions", "2"])
args = self.parser.parse_args(
["dags", "next-execution", dag_id, "--num-executions", "2", "-S", str(tmp_path)]
)
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
dag_command.dag_next_execution(args)
out = temp_stdout.getvalue()
assert expected_output_2[i] in out
assert expected_output[dag_id][1] in out

# Clean up before leaving
with create_session() as session:
dr = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids))
dr.delete(synchronize_session=False)
# Rebuild Test DB for other tests
clear_db_dags()
TestCliDags.dagbag = DagBag(include_examples=True)
TestCliDags.dagbag.sync_to_db()

@conf_vars({("core", "load_examples"): "true"})
def test_cli_report(self):
Expand Down