diff --git a/airflow/jobs.py b/airflow/jobs.py index 9e68fad79785c..1431f2ce3a421 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -772,7 +772,7 @@ def update_import_errors(session, dagbag): session.commit() @provide_session - def create_dag_run(self, dag, session=None): + def create_dag_run(self, dag, session=None, dry_run=False): """ This method checks whether a new DagRun needs to be created for a DAG based on scheduling interval. @@ -867,7 +867,6 @@ def create_dag_run(self, dag, session=None): "Dag start date: %s. Next run date: %s", dag.start_date, next_run_date ) - # don't ever schedule in the future if next_run_date > timezone.utcnow(): return @@ -892,6 +891,11 @@ def create_dag_run(self, dag, session=None): if next_run_date and min_task_end_date and next_run_date > min_task_end_date: return + # Don't really schedule the job, we are interested in its next run date + # as calculated by the scheduler + if dry_run is True: + return next_run_date + if next_run_date and period_end and period_end <= timezone.utcnow(): next_run = dag.create_dagrun( run_id=DagRun.ID_PREFIX + next_run_date.isoformat(), diff --git a/airflow/models.py b/airflow/models.py index e2699a53038db..50d562526f956 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -3732,6 +3732,32 @@ def latest_execution_date(self, session=None): ).scalar() return execution_date + @property + @provide_session + def schedulable_at(self, session=None): + """ + Returns the earliest time at which the DAG will next be eligible for + execution by the scheduler + """ + from airflow.jobs import SchedulerJob + scheduler = SchedulerJob() + next_run_date = scheduler.create_dag_run(self, dry_run=True) + if next_run_date: + period_end = self.following_schedule(next_run_date) + return period_end + + @property + def scheduled_in(self): + """ + Returns a human readable duration before the next schedulable time + of the DAG + """ + import pendulum + diff = self.schedulable_at - pendulum.now() + if diff.in_seconds() <= 0: + return "overdue" + return diff + @property def subdags(self): """ diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index b4c09de0b3e44..1f33831b462ad 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -39,6 +39,11 @@

ROOT: {{ root }} {% endif %}

+

+ + scheduled in: {{ dag.scheduled_in }} + +

schedule: {{ dag.schedule_interval }} diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html index 0a7a6ec2a4791..52a751bcc0955 100644 --- a/airflow/www/templates/airflow/dags.html +++ b/airflow/www/templates/airflow/dags.html @@ -59,6 +59,9 @@

DAGs

+ Scheduled in + + Links @@ -134,7 +137,16 @@

DAGs

- + + + {% if dag_id in webserver_dags %} +
+ {{ dag.scheduled_in }} + + {% endif %} + + + {% if dag %} diff --git a/tests/models.py b/tests/models.py index d4cb7383861c0..d9d37ce92cb1c 100644 --- a/tests/models.py +++ b/tests/models.py @@ -454,6 +454,31 @@ def jinja_udf(name): result = task.render_template('', "{{ 'world' | hello}}", dict()) self.assertEqual(result, 'Hello world') + def test_schedulable_at(self): + """ + [AIRFLOW-1424] test the 'schedulable_after' DAG property. + """ + test_dag_id = 'test_get_num_task_instances_dag' + test_task_id = 'task_1' + + test_dag = DAG(dag_id=test_dag_id, start_date=DEFAULT_DATE, + schedule_interval=datetime.timedelta(days=1)) + DummyOperator(task_id=test_task_id, dag=test_dag) + self.assertEqual(test_dag.schedulable_at, + DEFAULT_DATE + datetime.timedelta(days=1)) + + def test_scheduled_in_overdue(self): + """ + [AIRFLOW-1424] test the 'scheduled_in' DAG property. + """ + test_dag_id = 'test_get_num_task_instances_dag' + test_task_id = 'task_1' + + test_dag = DAG(dag_id=test_dag_id, start_date=DEFAULT_DATE, + schedule_interval=datetime.timedelta(days=1)) + DummyOperator(task_id=test_task_id, dag=test_dag) + self.assertEqual(test_dag.scheduled_in, "overdue") + def test_cycle(self): # test empty dag = DAG(