diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 40ac199b574e7..16f537f7626d5 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -288,24 +288,15 @@ def update_state(self, session=None): for t in unfinished_tasks) if unfinished_tasks: scheduleable_tasks = [ut for ut in unfinished_tasks if ut.state in SCHEDULEABLE_STATES] + self.log.debug( + "number of scheduleable tasks for %s: %s task(s)", + self, len(scheduleable_tasks)) + ready_tis, changed_tis = self._get_ready_tis(scheduleable_tasks, finished_tasks, session) + self.log.debug("ready tis length for %s: %s task(s)", self, len(ready_tis)) if none_depends_on_past and none_task_concurrency: # small speed up - self.log.debug( - "number of scheduleable tasks for %s: %s task(s)", - self, len(scheduleable_tasks)) - ready_tis, changed_tis = self._get_ready_tis(scheduleable_tasks, finished_tasks, session) - self.log.debug("ready tis length for %s: %s task(s)", self, len(ready_tis)) are_runnable_tasks = ready_tis or self._are_premature_tis( unfinished_tasks, finished_tasks, session) or changed_tis - else: - # slow path - for ti in scheduleable_tasks: - if ti.are_dependencies_met( - dep_context=DepContext(flag_upstream_failed=True), - session=session - ): - self.log.debug('Queuing task: %s', ti) - ready_tis.append(ti) duration = (timezone.utcnow() - start_dttm) Stats.timing("dagrun.dependency-check.{}".format(self.dag_id), duration)