diff --git a/airflow/models.py b/airflow/models.py index 22e8d2596a95b..4a94215b92799 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -5252,7 +5252,9 @@ def verify_integrity(self, session=None): # check for missing tasks for task in six.itervalues(dag.task_dict): - if task.adhoc: + if task.adhoc or task.start_date > self.execution_date: + continue + if task.start_date > self.execution_date and not self.is_backfill: continue if task.task_id not in task_ids: diff --git a/tests/dags/test_scheduler_dags.py b/tests/dags/test_scheduler_dags.py index ae2bd202d9803..94e6f8216d226 100644 --- a/tests/dags/test_scheduler_dags.py +++ b/tests/dags/test_scheduler_dags.py @@ -17,18 +17,34 @@ # specific language governing permissions and limitations # under the License. -from datetime import datetime +from datetime import datetime, timedelta from airflow.models import DAG from airflow.operators.dummy_operator import DummyOperator -DEFAULT_DATE = datetime(2100, 1, 1) +DEFAULT_DATE = datetime(2016, 1, 1) # DAG tests backfill with pooled tasks # Previously backfill would queue the task but never run it dag1 = DAG( dag_id='test_start_date_scheduling', - start_date=datetime(2100, 1, 1)) + start_date=datetime.utcnow() + timedelta(days=1)) dag1_task1 = DummyOperator( task_id='dummy', dag=dag1, owner='airflow') + +dag2 = DAG( + dag_id='test_task_start_date_scheduling', + start_date=DEFAULT_DATE +) +dag2_task1 = DummyOperator( + task_id='dummy1', + dag=dag2, + owner='airflow', + start_date=DEFAULT_DATE + timedelta(days=3) +) +dag2_task2 = DummyOperator( + task_id='dummy2', + dag=dag2, + owner='airflow' +) diff --git a/tests/jobs.py b/tests/jobs.py index bb714bd201602..c23c6035c7bd3 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -2217,7 +2217,7 @@ def test_scheduler_start_date(self): dag_id = 'test_start_date_scheduling' dag = self.dagbag.get_dag(dag_id) dag.clear() - self.assertTrue(dag.start_date > DEFAULT_DATE) + self.assertTrue(dag.start_date > datetime.datetime.utcnow() ) scheduler = SchedulerJob(dag_id, num_runs=2) @@ -2252,6 +2252,27 @@ def test_scheduler_start_date(self): self.assertEqual( len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1) + def test_scheduler_task_start_date(self): + """ + Test that the scheduler respects task start dates that are different + from DAG start dates + """ + dag_id = 'test_task_start_date_scheduling' + dag = self.dagbag.get_dag(dag_id) + dag.clear() + scheduler = SchedulerJob(dag_id, + num_runs=2) + scheduler.run() + + session = settings.Session() + tiq = session.query(TI).filter(TI.dag_id == dag_id) + ti1s = tiq.filter(TI.task_id == 'dummy1').all() + ti2s = tiq.filter(TI.task_id == 'dummy2').all() + self.assertEqual(len(ti1s), 0) + self.assertEqual(len(ti2s), 2) + for t in ti2s: + self.assertEqual(t.state, State.SUCCESS) + def test_scheduler_multiprocessing(self): """ Test that the scheduler can successfully queue multiple dags in parallel