Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions airflow/jobs/base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from time import sleep

from sqlalchemy import Column, Index, Integer, String
from sqlalchemy import Column, Index, Integer, String, case
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import backref, foreign, relationship
from sqlalchemy.orm.session import make_transient
Expand Down Expand Up @@ -125,12 +125,23 @@ def most_recent_job(cls, session=None) -> BaseJob | None:
"""
Return the most recent job of this type, if any, based on last heartbeat received.

Jobs in "running" state take precedence over others to make sure alive
job is returned if it is available.
This method should be called on a subclass (i.e. on SchedulerJob) to
return jobs of that type.

:param session: Database session
"""
return session.query(cls).order_by(cls.latest_heartbeat.desc()).limit(1).first()
return (
session.query(cls)
.order_by(
# Put "running" jobs at the front.
case({State.RUNNING: 0}, value=cls.state, else_=1),
cls.latest_heartbeat.desc(),
)
.limit(1)
.first()
)

def is_alive(self, grace_multiplier=2.1):
"""
Expand Down
20 changes: 20 additions & 0 deletions tests/jobs/test_base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,26 @@ def test_most_recent_job(self):

session.rollback()

def test_most_recent_job_running_precedence(self):
with create_session() as session:
old_running_state_job = MockJob(None, heartrate=10)
old_running_state_job.latest_heartbeat = timezone.utcnow()
old_running_state_job.state = State.RUNNING
new_failed_state_job = MockJob(None, heartrate=10)
new_failed_state_job.latest_heartbeat = timezone.utcnow()
new_failed_state_job.state = State.FAILED
new_null_state_job = MockJob(None, heartrate=10)
new_null_state_job.latest_heartbeat = timezone.utcnow()
new_null_state_job.state = None
session.add(old_running_state_job)
session.add(new_failed_state_job)
session.add(new_null_state_job)
session.flush()

assert MockJob.most_recent_job(session=session) == old_running_state_job

session.rollback()

def test_is_alive(self):
job = MockJob(None, heartrate=10, state=State.RUNNING)
assert job.is_alive() is True
Expand Down