diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 76385a26aa41a..c7088412de220 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -626,6 +626,8 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) - }, synchronize_session=False, ) + for ti in executable_tis: + ti.emit_state_change_metric(State.QUEUED) for ti in executable_tis: make_transient(ti) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 9fc8761bba242..184072e795b58 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1352,7 +1352,9 @@ def check_and_change_state_before_execution( if not test_mode: session.add(Log(State.RUNNING, self)) + self.state = State.RUNNING + self.emit_state_change_metric(State.RUNNING) self.external_executor_id = external_executor_id self.end_date = None if not test_mode: @@ -1392,6 +1394,52 @@ def _log_state(self, lead_msg: str = "") -> None: self._date_or_empty("end_date"), ) + def emit_state_change_metric(self, new_state: TaskInstanceState): + """ + Sends a time metric representing how much time a given state transition took. + The previous state and metric name is deduced from the state the task was put in. + + :param new_state: The state that has just been set for this task. + We do not use `self.state`, because sometimes the state is updated directly in the DB and not in + the local TaskInstance object. + Supported states: QUEUED and RUNNING + """ + if self.end_date: + # if the task has an end date, it means that this is not its first round. + # we send the state transition time metric only on the first try, otherwise it gets more complex. + return + + # switch on state and deduce which metric to send + if new_state == State.RUNNING: + metric_name = "queued_duration" + if self.queued_dttm is None: + # this should not really happen except in tests or rare cases, + # but we don't want to create errors just for a metric, so we just skip it + self.log.warning( + "cannot record %s for task %s because previous state change time has not been saved", + metric_name, + self.task_id, + ) + return + timing = (timezone.utcnow() - self.queued_dttm).total_seconds() + elif new_state == State.QUEUED: + metric_name = "scheduled_duration" + if self.start_date is None: + # same comment as above + self.log.warning( + "cannot record %s for task %s because previous state change time has not been saved", + metric_name, + self.task_id, + ) + return + timing = (timezone.utcnow() - self.start_date).total_seconds() + else: + raise NotImplementedError("no metric emission setup for state %s", new_state) + + # send metric twice, once (legacy) with tags in the name and once with tags as tags + Stats.timing(f"dag.{self.dag_id}.{self.task_id}.{metric_name}", timing) + Stats.timing(f"task.{metric_name}", timing, tags={"task_id": self.task_id, "dag_id": self.dag_id}) + # Ensure we unset next_method and next_kwargs to ensure that any # retries don't re-use them. def clear_next_method_args(self) -> None: diff --git a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst index 3d00ebbae93c3..630a7ab12800c 100644 --- a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst +++ b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst @@ -164,7 +164,9 @@ Timers Name Description =================================================== ======================================================================== ``dagrun.dependency-check.`` Milliseconds taken to check DAG dependencies -``dag...duration`` Seconds taken to finish a task +``dag...duration`` Seconds taken to run a task +``dag...scheduled_duration`` Seconds a task spends in the Scheduled state, before being Queued +``dag...queued_duration`` Seconds a task spends in the Queued state, before being Running ``dag_processing.last_duration.`` Seconds taken to load the given DAG file ``dagrun.duration.success.`` Seconds taken for a DagRun to reach success state ``dagrun.duration.failed.`` Milliseconds taken for a DagRun to reach failed state