diff --git a/airflow/jobs.py b/airflow/jobs.py index c07c4115629d3..95804350cd315 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1057,6 +1057,10 @@ def _process_dags(self, dagbag, dags, tis_out): """ for dag in dags: dag = dagbag.get_dag(dag.dag_id) + if dag.reached_max_runs: + self.logger.info("Not processing DAG {} since its max runs has been reached" + .format(dag.dag_id)) + continue if dag.is_paused: self.logger.info("Not processing DAG {} since it's paused" .format(dag.dag_id)) diff --git a/airflow/models.py b/airflow/models.py index 182f7cc364018..4ccf2a1baa3eb 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -2879,6 +2879,15 @@ def subdags(self): l += task.subdag.subdags return l + @property + def reached_max_runs(self): + active_runs = DagRun.find( + dag_id=self.dag_id, + state=State.RUNNING, + external_trigger=False + ) + return len(active_runs) >= self.max_active_runs + def resolve_template_files(self): for t in self.tasks: t.resolve_template_files() diff --git a/tests/jobs.py b/tests/jobs.py index e86b9dadd18af..351268a59293a 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -20,6 +20,7 @@ import datetime import logging import os +import time import unittest from airflow import AirflowException, settings @@ -609,14 +610,23 @@ def test_scheduler_verify_max_active_runs(self): session.commit() session.close() - scheduler = SchedulerJob() - dag.clear() + scheduler = SchedulerJob(dag.dag_id, + run_duration=1) dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) - dr = scheduler.create_dag_run(dag) - self.assertIsNone(dr) + dr2 = scheduler.create_dag_run(dag) + self.assertIsNone(dr2) + + dag.clear() + + dag.max_active_runs = 0 + scheduler.run() + + session = settings.Session() + self.assertEqual( + len(session.query(TI).filter(TI.dag_id == dag.dag_id).all()), 0) def test_scheduler_fail_dagrun_timeout(self): """