Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,13 @@ def create_dag_run(self, dag, session=None):
elif next_run_date:
period_end = dag.following_schedule(next_run_date)

# update the DAG's next scheduler run property so users can get
# a clear view of the next expected run period of the job
if dag.next_scheduler_run != period_end:
session.query(models.DagModel).filter_by(
dag_id=dag.dag_id).update(
{models.DagModel.next_scheduler_run : period_end})

# Don't schedule a dag beyond its end_date (as specified by the dag param)
if next_run_date and dag.end_date and next_run_date > dag.end_date:
return
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""[AIRFLOW-1424] add next_scheduler_run field to dag

Revision ID: 258d7101c452
Revises: cc1e65623dc7
Create Date: 2017-07-18 11:37:53.890462

"""

# revision identifiers, used by Alembic.
revision = '258d7101c452'
down_revision = 'cc1e65623dc7'
branch_labels = None
depends_on = None

from alembic import op
import sqlalchemy as sa


def upgrade():
op.add_column('dag', sa.Column('next_scheduler_run', sa.DateTime(), nullable=True))


def downgrade():
op.drop_column('dag', 'next_scheduler_run')
12 changes: 12 additions & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2634,6 +2634,8 @@ class DagModel(Base):
fileloc = Column(String(2000))
# String representing the owners
owners = Column(String(2000))
# Next time the scheduler will run this DAG
next_scheduler_run = Column(DateTime)

def __repr__(self):
return "<DAG: {self.dag_id}>".format(self=self)
Expand Down Expand Up @@ -3008,6 +3010,16 @@ def is_paused(self, session=None):
DagModel.dag_id == self.dag_id)
return qry.value('is_paused')

@property
@provide_session
def next_scheduler_run(self, session=None):
"""
Returns a boolean indicating whether this DAG is paused
"""
qry = session.query(DagModel).filter(
DagModel.dag_id == self.dag_id)
return qry.value('next_scheduler_run')

@provide_session
def get_active_runs(self, session=None):
"""
Expand Down
29 changes: 27 additions & 2 deletions tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ def test_find_executable_task_instances_backfill_nodagrun(self):
res_keys = map(lambda x: x.key, res)
self.assertIn(ti_no_dagrun.key, res_keys)
self.assertIn(ti_with_dagrun.key, res_keys)

def test_find_executable_task_instances_pool(self):
dag_id = 'SchedulerJobTest.test_find_executable_task_instances_pool'
task_id_1 = 'dummy'
Expand Down Expand Up @@ -2285,7 +2285,7 @@ def test_list_py_file_paths(self):
for file_path in list_py_file_paths(TEST_DAGS_FOLDER):
detected_files.append(file_path)
self.assertEqual(sorted(detected_files), sorted(expected_files))

def test_reset_orphaned_tasks_nothing(self):
"""Try with nothing. """
scheduler = SchedulerJob(**self.default_scheduler_args)
Expand Down Expand Up @@ -2499,3 +2499,28 @@ def test_reset_orphaned_tasks_with_orphans(self):
self.assertEqual(state, ti.state)

session.close()

def test_scheduler_updates_next_scheduler_run_dag_property(self):
"""
[AIRFLOW-1424] Test if a scheduler run updates the 'next_scheduler_run'
property of the DAG.
"""
dag = DAG(
dag_id='test_scheduler_process_execute_task',
start_date=DEFAULT_DATE)
dag_task1 = DummyOperator(
task_id='dummy',
dag=dag,
owner='airflow')

session = settings.Session()
orm_dag = DagModel(dag_id=dag.dag_id)
session.merge(orm_dag)
session.commit()
session.close()

scheduler = SchedulerJob()
dag.clear()
dr = scheduler.create_dag_run(dag)
self.assertIsNotNone(dr)
self.assertIsNotNone(dag.next_scheduler_run)