From d4e58d1bc6c85dc30802431dc87b600e032028e5 Mon Sep 17 00:00:00 2001 From: Julia Hsieh Date: Fri, 26 Aug 2016 16:11:25 -0700 Subject: [PATCH] Move check for max_runs to only affect creation of new DagRun --- airflow/jobs.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/airflow/jobs.py b/airflow/jobs.py index 0791ff5cb2224..df9a492ae59e7 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1072,10 +1072,6 @@ def _process_dags(self, dagbag, dags, tis_out): """ for dag in dags: dag = dagbag.get_dag(dag.dag_id) - if dag.reached_max_runs: - self.logger.info("Not processing DAG {} since its max runs has been reached" - .format(dag.dag_id)) - continue if dag.is_paused: self.logger.info("Not processing DAG {} since it's paused" .format(dag.dag_id)) @@ -1088,9 +1084,14 @@ def _process_dags(self, dagbag, dags, tis_out): self.logger.info("Processing {}".format(dag.dag_id)) - dag_run = self.create_dag_run(dag) - if dag_run: - self.logger.info("Created {}".format(dag_run)) + if dag.reached_max_runs: + self.logger.info("Not creating new DagRun for {} since its max runs has been reached" + .format(dag.dag_id)) + else: + dag_run = self.create_dag_run(dag) + if dag_run: + self.logger.info("Created {}".format(dag_run)) + self._process_task_instances(dag, tis_out) self.manage_slas(dag)