diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py index 9d1b6173fdd68..673f035533efc 100644 --- a/airflow/jobs/base_job.py +++ b/airflow/jobs/base_job.py @@ -19,7 +19,7 @@ from time import sleep -from sqlalchemy import Column, Index, Integer, String +from sqlalchemy import Column, Index, Integer, String, case from sqlalchemy.exc import OperationalError from sqlalchemy.orm import backref, foreign, relationship from sqlalchemy.orm.session import make_transient @@ -125,12 +125,23 @@ def most_recent_job(cls, session=None) -> BaseJob | None: """ Return the most recent job of this type, if any, based on last heartbeat received. + Jobs in "running" state take precedence over others to make sure alive + job is returned if it is available. This method should be called on a subclass (i.e. on SchedulerJob) to return jobs of that type. :param session: Database session """ - return session.query(cls).order_by(cls.latest_heartbeat.desc()).limit(1).first() + return ( + session.query(cls) + .order_by( + # Put "running" jobs at the front. + case({State.RUNNING: 0}, value=cls.state, else_=1), + cls.latest_heartbeat.desc(), + ) + .limit(1) + .first() + ) def is_alive(self, grace_multiplier=2.1): """ diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py index 66715a9d635e3..6b0fbf7489f76 100644 --- a/tests/jobs/test_base_job.py +++ b/tests/jobs/test_base_job.py @@ -108,6 +108,26 @@ def test_most_recent_job(self): session.rollback() + def test_most_recent_job_running_precedence(self): + with create_session() as session: + old_running_state_job = MockJob(None, heartrate=10) + old_running_state_job.latest_heartbeat = timezone.utcnow() + old_running_state_job.state = State.RUNNING + new_failed_state_job = MockJob(None, heartrate=10) + new_failed_state_job.latest_heartbeat = timezone.utcnow() + new_failed_state_job.state = State.FAILED + new_null_state_job = MockJob(None, heartrate=10) + new_null_state_job.latest_heartbeat = timezone.utcnow() + new_null_state_job.state = None + session.add(old_running_state_job) + session.add(new_failed_state_job) + session.add(new_null_state_job) + session.flush() + + assert MockJob.most_recent_job(session=session) == old_running_state_job + + session.rollback() + def test_is_alive(self): job = MockJob(None, heartrate=10, state=State.RUNNING) assert job.is_alive() is True