From 18daed215e0753455439dc89bc3a74192563138e Mon Sep 17 00:00:00 2001 From: Ultrabug Date: Thu, 20 Jul 2017 22:12:48 +0200 Subject: [PATCH 1/5] [AIRFLOW-1424] make the next execution date of DAGs visible The scheduler's DAG run creation logic can be tricky and one is easily confused with the start_date + interval and period end scheduling way of thinking. It would ease airflow's usage to add a *next execution* field to DAGs so that we can very easily see the (un)famous *period end* after which the scheduler will create a new DAG run for our workflows. These patches are a simple way to implement this on the DAG model and make use of this in the interface. --- airflow/models.py | 31 +++++++++++++++++++++++++ airflow/www/templates/airflow/dag.html | 5 ++++ airflow/www/templates/airflow/dags.html | 14 ++++++++++- tests/models.py | 17 ++++++++++++++ 4 files changed, 66 insertions(+), 1 deletion(-) diff --git a/airflow/models.py b/airflow/models.py index 32ad144a22bb5..ef05a9425d93b 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -3055,6 +3055,37 @@ def latest_execution_date(self): session.close() return execution_date + @property + def next_run_date(self): + """ + Returns the next run date for which the dag will be scheduled + """ + next_run_date = None + if not self.latest_execution_date: + # First run + task_start_dates = [t.start_date for t in self.tasks] + if task_start_dates: + next_run_date = self.normalize_schedule(min(task_start_dates)) + else: + next_run_date = self.following_schedule(self.latest_execution_date) + return next_run_date + + @property + def next_execution_date(self): + """ + Returns the next execution date at which the dag will be scheduled by + the scheduler (period end) + """ + if self.schedule_interval is None: + return None + else: + execution_date = None + if self.schedule_interval == '@once': + execution_date = self.next_run_date + elif self.next_run_date: + execution_date = self.following_schedule(self.next_run_date) + return execution_date + @property def subdags(self): """ diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index 706ed329d58b9..1c059b06ef97c 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -39,6 +39,11 @@

ROOT: {{ root }} {% endif %}

+

+ + next execution date: {{ dag.next_execution_date }} + +

schedule: {{ dag.schedule_interval }} diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html index 513e4aa317b8a..1bfeb688dc304 100644 --- a/airflow/www/templates/airflow/dags.html +++ b/airflow/www/templates/airflow/dags.html @@ -47,6 +47,9 @@

DAGs

+ Next Execution + + Links @@ -113,7 +116,16 @@

DAGs

- + + + {% if dag_id in webserver_dags %} +
+ {{ dag.next_execution_date }} + + {% endif %} + + + {% if dag %} diff --git a/tests/models.py b/tests/models.py index 400c659a1ea89..08237ce99cdeb 100644 --- a/tests/models.py +++ b/tests/models.py @@ -289,6 +289,23 @@ def jinja_udf(name): result = task.render_template('', "{{ 'world' | hello}}", dict()) self.assertEqual(result, 'Hello world') + def test_next_run_date_and_next_execution_date(self): + """ + [AIRFLOW-1424] test the 'next_run_date' and 'next_execution_date' DAG + properties used in the interface to give more visibility to the + scheduler's execution time of DAGs. + """ + test_dag_id = 'test_get_num_task_instances_dag' + test_task_id = 'task_1' + + test_dag = DAG(dag_id=test_dag_id, start_date=DEFAULT_DATE, + schedule_interval=datetime.timedelta(days=1)) + self.assertEqual(test_dag.next_run_date, None) + + test_task = DummyOperator(task_id=test_task_id, dag=test_dag) + self.assertEqual(test_dag.next_run_date, DEFAULT_DATE) + self.assertEqual(test_dag.next_execution_date, + DEFAULT_DATE + datetime.timedelta(days=1)) class DagStatTest(unittest.TestCase): def test_dagstats_crud(self): From da9b738bf3f2bebe5249efddf4402e30ca060e79 Mon Sep 17 00:00:00 2001 From: Ultrabug Date: Sun, 4 Nov 2018 14:58:38 +0100 Subject: [PATCH 2/5] leverage next_execution_date property in the cli --- airflow/bin/cli.py | 13 ++----------- tests/models.py | 3 +-- 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index cfc6c6b8d625e..134eb0aa87f9e 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -600,18 +600,9 @@ def next_execution(args): if dag.is_paused: print("[INFO] Please be reminded this DAG is PAUSED now.") - - if dag.latest_execution_date: - next_execution_dttm = dag.following_schedule(dag.latest_execution_date) - - if next_execution_dttm is None: - print("[WARN] No following schedule can be found. " + - "This DAG may have schedule interval '@once' or `None`.") - - print(next_execution_dttm) - else: + if dag.next_execution_date is None: print("[WARN] Only applicable when there is execution record found for the DAG.") - print(None) + print(dag.next_execution_date) @cli_utils.action_logging diff --git a/tests/models.py b/tests/models.py index ffedfa9443030..c020ee0cf96d3 100644 --- a/tests/models.py +++ b/tests/models.py @@ -454,7 +454,6 @@ def jinja_udf(name): result = task.render_template('', "{{ 'world' | hello}}", dict()) self.assertEqual(result, 'Hello world') - def test_next_run_date_and_next_execution_date(self): """ [AIRFLOW-1424] test the 'next_run_date' and 'next_execution_date' DAG @@ -468,7 +467,7 @@ def test_next_run_date_and_next_execution_date(self): schedule_interval=datetime.timedelta(days=1)) self.assertEqual(test_dag.next_run_date, None) - test_task = DummyOperator(task_id=test_task_id, dag=test_dag) + DummyOperator(task_id=test_task_id, dag=test_dag) self.assertEqual(test_dag.next_run_date, DEFAULT_DATE) self.assertEqual(test_dag.next_execution_date, DEFAULT_DATE + datetime.timedelta(days=1)) From f9e734b63f3970e44595dddcbbf447c634abdf06 Mon Sep 17 00:00:00 2001 From: Ultrabug Date: Tue, 6 Nov 2018 16:53:02 +0100 Subject: [PATCH 3/5] reuse the scheduler logic to determine next_execution_date --- airflow/jobs.py | 7 +++++-- airflow/models.py | 30 +++++------------------------- 2 files changed, 10 insertions(+), 27 deletions(-) diff --git a/airflow/jobs.py b/airflow/jobs.py index 9e68fad79785c..7b3e79ed090f0 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -772,7 +772,7 @@ def update_import_errors(session, dagbag): session.commit() @provide_session - def create_dag_run(self, dag, session=None): + def create_dag_run(self, dag, session=None, dry_run=False): """ This method checks whether a new DagRun needs to be created for a DAG based on scheduling interval. @@ -867,7 +867,6 @@ def create_dag_run(self, dag, session=None): "Dag start date: %s. Next run date: %s", dag.start_date, next_run_date ) - # don't ever schedule in the future if next_run_date > timezone.utcnow(): return @@ -892,6 +891,10 @@ def create_dag_run(self, dag, session=None): if next_run_date and min_task_end_date and next_run_date > min_task_end_date: return + # Don't really schedule the job, we are interested in its next execution date + if dry_run is True: + return next_run_date + if next_run_date and period_end and period_end <= timezone.utcnow(): next_run = dag.create_dagrun( run_id=DagRun.ID_PREFIX + next_run_date.isoformat(), diff --git a/airflow/models.py b/airflow/models.py index 77d5e2712807e..195894804628b 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -3733,35 +3733,15 @@ def latest_execution_date(self, session=None): return execution_date @property - def next_run_date(self): - """ - Returns the next run date for which the dag will be scheduled - """ - next_run_date = None - if not self.latest_execution_date: - # First run - task_start_dates = [t.start_date for t in self.tasks] - if task_start_dates: - next_run_date = self.normalize_schedule(min(task_start_dates)) - else: - next_run_date = self.following_schedule(self.latest_execution_date) - return next_run_date - - @property - def next_execution_date(self): + @provide_session + def next_execution_date(self, session=None): """ Returns the next execution date at which the dag will be scheduled by the scheduler (period end) """ - if self.schedule_interval is None: - return None - else: - execution_date = None - if self.schedule_interval == '@once': - execution_date = self.next_run_date - elif self.next_run_date: - execution_date = self.following_schedule(self.next_run_date) - return execution_date + from airflow.jobs import SchedulerJob + scheduler = SchedulerJob() + return scheduler.create_dag_run(self, dry_run=True) @property def subdags(self): From e79fdaa9e380f7947369658eef3b297719097f52 Mon Sep 17 00:00:00 2001 From: Ultrabug Date: Wed, 7 Nov 2018 17:08:41 +0100 Subject: [PATCH 4/5] fix naming and logic as discussed by @ashb --- airflow/jobs.py | 3 ++- airflow/models.py | 23 +++++++++++++++++++---- airflow/www/templates/airflow/dag.html | 2 +- airflow/www/templates/airflow/dags.html | 10 +++++----- tests/models.py | 23 +++++++++++++++-------- 5 files changed, 42 insertions(+), 19 deletions(-) diff --git a/airflow/jobs.py b/airflow/jobs.py index 7b3e79ed090f0..1431f2ce3a421 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -891,7 +891,8 @@ def create_dag_run(self, dag, session=None, dry_run=False): if next_run_date and min_task_end_date and next_run_date > min_task_end_date: return - # Don't really schedule the job, we are interested in its next execution date + # Don't really schedule the job, we are interested in its next run date + # as calculated by the scheduler if dry_run is True: return next_run_date diff --git a/airflow/models.py b/airflow/models.py index 195894804628b..50d562526f956 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -3734,14 +3734,29 @@ def latest_execution_date(self, session=None): @property @provide_session - def next_execution_date(self, session=None): + def schedulable_at(self, session=None): """ - Returns the next execution date at which the dag will be scheduled by - the scheduler (period end) + Returns the earliest time at which the DAG will next be eligible for + execution by the scheduler """ from airflow.jobs import SchedulerJob scheduler = SchedulerJob() - return scheduler.create_dag_run(self, dry_run=True) + next_run_date = scheduler.create_dag_run(self, dry_run=True) + if next_run_date: + period_end = self.following_schedule(next_run_date) + return period_end + + @property + def scheduled_in(self): + """ + Returns a human readable duration before the next schedulable time + of the DAG + """ + import pendulum + diff = self.schedulable_at - pendulum.now() + if diff.in_seconds() <= 0: + return "overdue" + return diff @property def subdags(self): diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index 48b7ccac9b2f6..1f33831b462ad 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -41,7 +41,7 @@

- next execution date: {{ dag.next_execution_date }} + scheduled in: {{ dag.scheduled_in }}

diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html index 72730e4e14e11..52a751bcc0955 100644 --- a/airflow/www/templates/airflow/dags.html +++ b/airflow/www/templates/airflow/dags.html @@ -59,8 +59,8 @@

DAGs

- Next Execution - + Scheduled in + Links @@ -137,11 +137,11 @@

DAGs

- + {% if dag_id in webserver_dags %} - - {{ dag.next_execution_date }} + + {{ dag.scheduled_in }} {% endif %} diff --git a/tests/models.py b/tests/models.py index c020ee0cf96d3..d9d37ce92cb1c 100644 --- a/tests/models.py +++ b/tests/models.py @@ -454,24 +454,31 @@ def jinja_udf(name): result = task.render_template('', "{{ 'world' | hello}}", dict()) self.assertEqual(result, 'Hello world') - def test_next_run_date_and_next_execution_date(self): + def test_schedulable_at(self): """ - [AIRFLOW-1424] test the 'next_run_date' and 'next_execution_date' DAG - properties used in the interface to give more visibility to the - scheduler's execution time of DAGs. + [AIRFLOW-1424] test the 'schedulable_after' DAG property. """ test_dag_id = 'test_get_num_task_instances_dag' test_task_id = 'task_1' test_dag = DAG(dag_id=test_dag_id, start_date=DEFAULT_DATE, schedule_interval=datetime.timedelta(days=1)) - self.assertEqual(test_dag.next_run_date, None) - DummyOperator(task_id=test_task_id, dag=test_dag) - self.assertEqual(test_dag.next_run_date, DEFAULT_DATE) - self.assertEqual(test_dag.next_execution_date, + self.assertEqual(test_dag.schedulable_at, DEFAULT_DATE + datetime.timedelta(days=1)) + def test_scheduled_in_overdue(self): + """ + [AIRFLOW-1424] test the 'scheduled_in' DAG property. + """ + test_dag_id = 'test_get_num_task_instances_dag' + test_task_id = 'task_1' + + test_dag = DAG(dag_id=test_dag_id, start_date=DEFAULT_DATE, + schedule_interval=datetime.timedelta(days=1)) + DummyOperator(task_id=test_task_id, dag=test_dag) + self.assertEqual(test_dag.scheduled_in, "overdue") + def test_cycle(self): # test empty dag = DAG( From 7bea66e689bce9197ec87b14e80561bb0aae0c8d Mon Sep 17 00:00:00 2001 From: Ultrabug Date: Wed, 7 Nov 2018 17:11:06 +0100 Subject: [PATCH 5/5] Revert "leverage next_execution_date property in the cli" This reverts commit da9b738bf3f2bebe5249efddf4402e30ca060e79. --- airflow/bin/cli.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 134eb0aa87f9e..cfc6c6b8d625e 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -600,9 +600,18 @@ def next_execution(args): if dag.is_paused: print("[INFO] Please be reminded this DAG is PAUSED now.") - if dag.next_execution_date is None: + + if dag.latest_execution_date: + next_execution_dttm = dag.following_schedule(dag.latest_execution_date) + + if next_execution_dttm is None: + print("[WARN] No following schedule can be found. " + + "This DAG may have schedule interval '@once' or `None`.") + + print(next_execution_dttm) + else: print("[WARN] Only applicable when there is execution record found for the DAG.") - print(dag.next_execution_date) + print(None) @cli_utils.action_logging