From 83927ce19ae89ae7862333a0eae5c21f36e844a4 Mon Sep 17 00:00:00 2001 From: Shlomit-B Date: Sun, 3 Aug 2025 08:34:21 +0300 Subject: [PATCH] Add config checks to skip tracing and metrics when disabled --- .../src/airflow/jobs/scheduler_job_runner.py | 35 ++++++++++++++----- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index e641af6063841..9d586a8715b3f 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -721,6 +721,20 @@ def _process_task_event_logs(log_records: deque[Log], session: Session): objects = (log_records.popleft() for _ in range(len(log_records))) session.bulk_save_objects(objects=objects, preserve_order=False) + @staticmethod + def _is_metrics_enabled(): + return any( + [ + conf.getboolean("metrics", "statsd_datadog_enabled", fallback=False), + conf.getboolean("metrics", "statsd_on", fallback=False), + conf.getboolean("metrics", "otel_on", fallback=False), + ] + ) + + @staticmethod + def _is_tracing_enabled(): + return conf.getboolean("traces", "otel_on") + def _process_executor_events(self, executor: BaseExecutor, session: Session) -> int: return SchedulerJobRunner.process_executor_events( executor=executor, @@ -1188,15 +1202,17 @@ def _run_scheduler_loop(self) -> None: self._mark_backfills_complete, ) - timers.call_regular_interval( - conf.getfloat("scheduler", "pool_metrics_interval", fallback=5.0), - self._emit_pool_metrics, - ) + if self._is_metrics_enabled() or self._is_tracing_enabled(): + timers.call_regular_interval( + conf.getfloat("scheduler", "pool_metrics_interval", fallback=5.0), + self._emit_pool_metrics, + ) - timers.call_regular_interval( - conf.getfloat("scheduler", "running_metrics_interval", fallback=30.0), - self._emit_running_ti_metrics, - ) + if self._is_metrics_enabled(): + timers.call_regular_interval( + conf.getfloat("scheduler", "running_metrics_interval", fallback=30.0), + self._emit_running_ti_metrics, + ) timers.call_regular_interval( conf.getfloat("scheduler", "task_instance_heartbeat_timeout_detection_interval", fallback=10.0), @@ -1240,7 +1256,8 @@ def _run_scheduler_loop(self) -> None: ) with create_session() as session: - self._end_spans_of_externally_ended_ops(session) + if self._is_tracing_enabled(): + self._end_spans_of_externally_ended_ops(session) # This will schedule for as many executors as possible. num_queued_tis = self._do_scheduling(session)