From 216f64691fd14b043e8960c16395143cc94095bf Mon Sep 17 00:00:00 2001 From: Eugene Kostieiev Date: Mon, 5 Dec 2022 15:43:06 +0200 Subject: [PATCH 1/3] Make BaseJob.most_recent_job method to favor "running" jobs compare to others. --- airflow/jobs/base_job.py | 15 +++++++++++++-- tests/jobs/test_base_job.py | 20 ++++++++++++++++++++ 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py index e3d79d67b914d..9af6ca4b19362 100644 --- a/airflow/jobs/base_job.py +++ b/airflow/jobs/base_job.py @@ -19,7 +19,7 @@ from time import sleep -from sqlalchemy import Column, Index, Integer, String +from sqlalchemy import Column, Index, Integer, String, or_ from sqlalchemy.exc import OperationalError from sqlalchemy.orm import backref, foreign, relationship from sqlalchemy.orm.session import make_transient @@ -125,12 +125,23 @@ def most_recent_job(cls, session=None) -> BaseJob | None: Return the most recent job of this type, if any, based on last heartbeat received. + Jobs in "running" state take precedence over others to make sure alive + job is returned if it is available. This method should be called on a subclass (i.e. on SchedulerJob) to return jobs of that type. :param session: Database session """ - return session.query(cls).order_by(cls.latest_heartbeat.desc()).limit(1).first() + return ( + session.query(cls) + .order_by( + # Make sure "running" jobs first in order. + or_(cls.state.is_(None), cls.state != State.RUNNING), + cls.latest_heartbeat.desc(), + ) + .limit(1) + .first() + ) def is_alive(self, grace_multiplier=2.1): """ diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py index 66715a9d635e3..6b0fbf7489f76 100644 --- a/tests/jobs/test_base_job.py +++ b/tests/jobs/test_base_job.py @@ -108,6 +108,26 @@ def test_most_recent_job(self): session.rollback() + def test_most_recent_job_running_precedence(self): + with create_session() as session: + old_running_state_job = MockJob(None, heartrate=10) + old_running_state_job.latest_heartbeat = timezone.utcnow() + old_running_state_job.state = State.RUNNING + new_failed_state_job = MockJob(None, heartrate=10) + new_failed_state_job.latest_heartbeat = timezone.utcnow() + new_failed_state_job.state = State.FAILED + new_null_state_job = MockJob(None, heartrate=10) + new_null_state_job.latest_heartbeat = timezone.utcnow() + new_null_state_job.state = None + session.add(old_running_state_job) + session.add(new_failed_state_job) + session.add(new_null_state_job) + session.flush() + + assert MockJob.most_recent_job(session=session) == old_running_state_job + + session.rollback() + def test_is_alive(self): job = MockJob(None, heartrate=10, state=State.RUNNING) assert job.is_alive() is True From 268a28414bcffe0a08b730e1443f09631152b00b Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Tue, 6 Dec 2022 12:10:22 +0800 Subject: [PATCH 2/3] English --- airflow/jobs/base_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py index 9af6ca4b19362..4bfeade621fb8 100644 --- a/airflow/jobs/base_job.py +++ b/airflow/jobs/base_job.py @@ -135,7 +135,7 @@ def most_recent_job(cls, session=None) -> BaseJob | None: return ( session.query(cls) .order_by( - # Make sure "running" jobs first in order. + # Put "running" jobs at the front. or_(cls.state.is_(None), cls.state != State.RUNNING), cls.latest_heartbeat.desc(), ) From 4d3b5bd98b065677bbce2e162842762a2f665af9 Mon Sep 17 00:00:00 2001 From: Eugene Kostieiev Date: Tue, 6 Dec 2022 10:35:24 +0200 Subject: [PATCH 3/3] Fix MsSQL order by expression issue. --- airflow/jobs/base_job.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py index 4bfeade621fb8..d695615a5d8ba 100644 --- a/airflow/jobs/base_job.py +++ b/airflow/jobs/base_job.py @@ -19,7 +19,7 @@ from time import sleep -from sqlalchemy import Column, Index, Integer, String, or_ +from sqlalchemy import Column, Index, Integer, String, case from sqlalchemy.exc import OperationalError from sqlalchemy.orm import backref, foreign, relationship from sqlalchemy.orm.session import make_transient @@ -136,7 +136,7 @@ def most_recent_job(cls, session=None) -> BaseJob | None: session.query(cls) .order_by( # Put "running" jobs at the front. - or_(cls.state.is_(None), cls.state != State.RUNNING), + case({State.RUNNING: 0}, value=cls.state, else_=1), cls.latest_heartbeat.desc(), ) .limit(1)