Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airflow/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ def _merge_from(self, job: Job | JobPydantic | None):
def _heartrate(job_type: str) -> float:
if job_type == "TriggererJob":
return conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC")
elif job_type == "SchedulerJob":
return conf.getfloat("scheduler", "SCHEDULER_HEARTBEAT_SEC")
else:
# Heartrate used to be hardcoded to scheduler, so in all other
# cases continue to use that value for back compat
Expand Down
1 change: 0 additions & 1 deletion airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
"""

job_type = "SchedulerJob"
heartrate: int = conf.getint("scheduler", "SCHEDULER_HEARTBEAT_SEC")

def __init__(
self,
Expand Down
9 changes: 6 additions & 3 deletions tests/jobs/test_base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,12 @@ def abort():
)
def test_heart_rate_after_fetched_from_db(self, job_runner, job_type, job_heartbeat_sec):
"""Ensure heartrate is set correctly after jobs are queried from the DB"""
with create_session() as session, conf_vars(
{(job_type.lower(), "job_heartbeat_sec"): job_heartbeat_sec}
):
if job_type == "scheduler":
config_name = "scheduler_heartbeat_sec"
else:
config_name = "job_heartbeat_sec"

with create_session() as session, conf_vars({(job_type.lower(), config_name): job_heartbeat_sec}):
job = Job()
job_runner(job=job)
session.add(job)
Expand Down
10 changes: 10 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,16 @@ def test_is_alive(self, configs):
not scheduler_job.is_alive()
), "Completed jobs even with recent heartbeat should not be alive"

@pytest.mark.parametrize(
"heartrate",
[10, 5],
)
def test_heartrate(self, heartrate):
with conf_vars({("scheduler", "scheduler_heartbeat_sec"): str(heartrate)}):
scheduler_job = Job(executor=self.null_exec)
_ = SchedulerJobRunner(job=scheduler_job)
assert scheduler_job.heartrate == heartrate

def run_single_scheduler_loop_with_no_dags(self, dags_folder):
"""
Utility function that runs a single scheduler loop without actually
Expand Down