Skip to content
2 changes: 2 additions & 0 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,8 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) -
},
synchronize_session=False,
)
for ti in executable_tis:
ti.emit_state_change_metric(State.QUEUED)

for ti in executable_tis:
make_transient(ti)
Expand Down
48 changes: 48 additions & 0 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1352,7 +1352,9 @@ def check_and_change_state_before_execution(

if not test_mode:
session.add(Log(State.RUNNING, self))

self.state = State.RUNNING
self.emit_state_change_metric(State.RUNNING)
self.external_executor_id = external_executor_id
self.end_date = None
if not test_mode:
Expand Down Expand Up @@ -1392,6 +1394,52 @@ def _log_state(self, lead_msg: str = "") -> None:
self._date_or_empty("end_date"),
)

def emit_state_change_metric(self, new_state: TaskInstanceState):
"""
Sends a time metric representing how much time a given state transition took.
The previous state and metric name is deduced from the state the task was put in.

:param new_state: The state that has just been set for this task.
We do not use `self.state`, because sometimes the state is updated directly in the DB and not in
the local TaskInstance object.
Supported states: QUEUED and RUNNING
"""
if self.end_date:
# if the task has an end date, it means that this is not its first round.
# we send the state transition time metric only on the first try, otherwise it gets more complex.
return

# switch on state and deduce which metric to send
if new_state == State.RUNNING:
metric_name = "queued_duration"
if self.queued_dttm is None:
# this should not really happen except in tests or rare cases,
# but we don't want to create errors just for a metric, so we just skip it
self.log.warning(
"cannot record %s for task %s because previous state change time has not been saved",
metric_name,
self.task_id,
)
return
timing = (timezone.utcnow() - self.queued_dttm).total_seconds()
elif new_state == State.QUEUED:
metric_name = "scheduled_duration"
if self.start_date is None:
# same comment as above
self.log.warning(
"cannot record %s for task %s because previous state change time has not been saved",
metric_name,
self.task_id,
)
return
timing = (timezone.utcnow() - self.start_date).total_seconds()
else:
raise NotImplementedError("no metric emission setup for state %s", new_state)

# send metric twice, once (legacy) with tags in the name and once with tags as tags
Stats.timing(f"dag.{self.dag_id}.{self.task_id}.{metric_name}", timing)
Stats.timing(f"task.{metric_name}", timing, tags={"task_id": self.task_id, "dag_id": self.dag_id})
Comment on lines +1440 to +1441
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


# Ensure we unset next_method and next_kwargs to ensure that any
# retries don't re-use them.
def clear_next_method_args(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ Timers
Name Description
=================================================== ========================================================================
``dagrun.dependency-check.<dag_id>`` Milliseconds taken to check DAG dependencies
``dag.<dag_id>.<task_id>.duration`` Seconds taken to finish a task
``dag.<dag_id>.<task_id>.duration`` Seconds taken to run a task
``dag.<dag_id>.<task_id>.scheduled_duration`` Seconds a task spends in the Scheduled state, before being Queued
``dag.<dag_id>.<task_id>.queued_duration`` Seconds a task spends in the Queued state, before being Running
Comment on lines +167 to +169
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Non-blocking) Just curious, are these lines manual changes or from some automated change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added them manually, there is nothing enforcing sync between this doc and the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, that was what I thought. I went looking for something like this when I started on the OTel work and didn't find it. That's buried pretty deep. 👍

``dag_processing.last_duration.<dag_file>`` Seconds taken to load the given DAG file
``dagrun.duration.success.<dag_id>`` Seconds taken for a DagRun to reach success state
``dagrun.duration.failed.<dag_id>`` Milliseconds taken for a DagRun to reach failed state
Expand Down