diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index c05c446290d65..28b267a8632d3 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -915,7 +915,7 @@ class SchedulerJob(BaseJob): } heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC') - def __init__( + def __init__( self, dag_id=None, dag_ids=None, @@ -1106,10 +1106,10 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None): DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date) ) - .filter(or_(DR.run_id == None, # noqa: E711 pylint: disable=singleton-comparison + .filter(or_(DR.run_id.is_(None), not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%')))) .outerjoin(DM, DM.dag_id == TI.dag_id) - .filter(or_(DM.dag_id == None, # noqa: E711 pylint: disable=singleton-comparison + .filter(or_(DM.dag_id.is_(None), not_(DM.is_paused))) ) @@ -1117,11 +1117,11 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None): if states: if None in states: if all(x is None for x in states): - ti_query = ti_query.filter(TI.state == None) # noqa pylint: disable=singleton-comparison + ti_query = ti_query.filter(TI.state.is_(None)) else: not_none_states = [state for state in states if state] ti_query = ti_query.filter( - or_(TI.state == None, # noqa: E711 pylint: disable=singleton-comparison + or_(TI.state.is_(None), TI.state.in_(not_none_states)) ) else: @@ -1291,11 +1291,11 @@ def _change_state_for_executable_task_instances(self, task_instances, if acceptable_states: if None in acceptable_states: if all(x is None for x in acceptable_states): - ti_query = ti_query.filter(TI.state == None) # noqa pylint: disable=singleton-comparison + ti_query = ti_query.filter(TI.state.is_(None)) else: not_none_acceptable_states = [state for state in acceptable_states if state] ti_query = ti_query.filter( - or_(TI.state == None, # noqa pylint: disable=singleton-comparison + or_(TI.state.is_(None), TI.state.in_(not_none_acceptable_states)) ) else: