From c0a19f2efdb046bd880bfccad7fc13bbac98f48f Mon Sep 17 00:00:00 2001 From: Ultrabug Date: Tue, 18 Jul 2017 23:10:36 +0200 Subject: [PATCH] [AIRFLOW-1424] add a next_scheduler_run property to DAGs The scheduler's DAG run creation logic can be tricky and one is easily confused with the start_date + interval and period end scheduling way of thinking. It would ease airflow's usage to add a *next_scheduler_run* property to DAGs so that we can very easily see the (un)famous *period end* after which the scheduler will create a new DAG run for our workflows. These patches add such a field to the *dag* table and the DagModel property + scheduler logic to update it when calculated. --- airflow/jobs.py | 7 ++++ ...452_add_next_scheduler_run_field_to_dag.py | 37 +++++++++++++++++++ airflow/models.py | 12 ++++++ tests/jobs.py | 29 ++++++++++++++- 4 files changed, 83 insertions(+), 2 deletions(-) create mode 100644 airflow/migrations/versions/258d7101c452_add_next_scheduler_run_field_to_dag.py diff --git a/airflow/jobs.py b/airflow/jobs.py index 6b63df0694491..e1fc1232af78c 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -872,6 +872,13 @@ def create_dag_run(self, dag, session=None): elif next_run_date: period_end = dag.following_schedule(next_run_date) + # update the DAG's next scheduler run property so users can get + # a clear view of the next expected run period of the job + if dag.next_scheduler_run != period_end: + session.query(models.DagModel).filter_by( + dag_id=dag.dag_id).update( + {models.DagModel.next_scheduler_run : period_end}) + # Don't schedule a dag beyond its end_date (as specified by the dag param) if next_run_date and dag.end_date and next_run_date > dag.end_date: return diff --git a/airflow/migrations/versions/258d7101c452_add_next_scheduler_run_field_to_dag.py b/airflow/migrations/versions/258d7101c452_add_next_scheduler_run_field_to_dag.py new file mode 100644 index 0000000000000..f8302e6749269 --- /dev/null +++ b/airflow/migrations/versions/258d7101c452_add_next_scheduler_run_field_to_dag.py @@ -0,0 +1,37 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""[AIRFLOW-1424] add next_scheduler_run field to dag + +Revision ID: 258d7101c452 +Revises: cc1e65623dc7 +Create Date: 2017-07-18 11:37:53.890462 + +""" + +# revision identifiers, used by Alembic. +revision = '258d7101c452' +down_revision = 'cc1e65623dc7' +branch_labels = None +depends_on = None + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.add_column('dag', sa.Column('next_scheduler_run', sa.DateTime(), nullable=True)) + + +def downgrade(): + op.drop_column('dag', 'next_scheduler_run') diff --git a/airflow/models.py b/airflow/models.py index 32ad144a22bb5..4196dc6ab7cd9 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -2634,6 +2634,8 @@ class DagModel(Base): fileloc = Column(String(2000)) # String representing the owners owners = Column(String(2000)) + # Next time the scheduler will run this DAG + next_scheduler_run = Column(DateTime) def __repr__(self): return "".format(self=self) @@ -3008,6 +3010,16 @@ def is_paused(self, session=None): DagModel.dag_id == self.dag_id) return qry.value('is_paused') + @property + @provide_session + def next_scheduler_run(self, session=None): + """ + Returns a boolean indicating whether this DAG is paused + """ + qry = session.query(DagModel).filter( + DagModel.dag_id == self.dag_id) + return qry.value('next_scheduler_run') + @provide_session def get_active_runs(self, session=None): """ diff --git a/tests/jobs.py b/tests/jobs.py index e987e0cd6c802..e17cbeb5259b6 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -755,7 +755,7 @@ def test_find_executable_task_instances_backfill_nodagrun(self): res_keys = map(lambda x: x.key, res) self.assertIn(ti_no_dagrun.key, res_keys) self.assertIn(ti_with_dagrun.key, res_keys) - + def test_find_executable_task_instances_pool(self): dag_id = 'SchedulerJobTest.test_find_executable_task_instances_pool' task_id_1 = 'dummy' @@ -2285,7 +2285,7 @@ def test_list_py_file_paths(self): for file_path in list_py_file_paths(TEST_DAGS_FOLDER): detected_files.append(file_path) self.assertEqual(sorted(detected_files), sorted(expected_files)) - + def test_reset_orphaned_tasks_nothing(self): """Try with nothing. """ scheduler = SchedulerJob(**self.default_scheduler_args) @@ -2499,3 +2499,28 @@ def test_reset_orphaned_tasks_with_orphans(self): self.assertEqual(state, ti.state) session.close() + + def test_scheduler_updates_next_scheduler_run_dag_property(self): + """ + [AIRFLOW-1424] Test if a scheduler run updates the 'next_scheduler_run' + property of the DAG. + """ + dag = DAG( + dag_id='test_scheduler_process_execute_task', + start_date=DEFAULT_DATE) + dag_task1 = DummyOperator( + task_id='dummy', + dag=dag, + owner='airflow') + + session = settings.Session() + orm_dag = DagModel(dag_id=dag.dag_id) + session.merge(orm_dag) + session.commit() + session.close() + + scheduler = SchedulerJob() + dag.clear() + dr = scheduler.create_dag_run(dag) + self.assertIsNotNone(dr) + self.assertIsNotNone(dag.next_scheduler_run)