From 67b463794355960a4d0e8e94eea4c4bcc5e4adf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Wed, 12 Apr 2023 13:38:25 -0700 Subject: [PATCH 1/7] add a metric for task adoption time --- airflow/models/taskinstance.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index c2ed7b786864c..3fe64a5b6f0d1 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1330,6 +1330,18 @@ def check_and_change_state_before_execution( if not test_mode: session.add(Log(State.RUNNING, self)) + + if not self.end_date: + # if the task has an end date, it means that this is not its first round. + # we send the adoption time metric only on the first try, otherwise it gets more complex. + adoption_time = (timezone.utcnow() - self.start_date).total_seconds() + Stats.timing(f"dag.{self.task.dag_id}.{self.task.task_id}.adoption_time", adoption_time) + Stats.timing( + "task.adoption_time", + adoption_time, + tags={"task_id": self.task.task_id, "dag_id": self.task.dag_id}, + ) + self.state = State.RUNNING self.external_executor_id = external_executor_id self.end_date = None From 1ec9c3bf15b8065045fba3097b4e445c7dbec40c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Wed, 12 Apr 2023 14:41:47 -0700 Subject: [PATCH 2/7] update doc accordingly --- airflow/config_templates/config.yml | 4 ++-- airflow/config_templates/default_airflow.cfg | 4 ++-- .../logging-monitoring/metrics.rst | 4 +++- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 29e02df9b3b6b..005f9c2b04070 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2052,8 +2052,8 @@ celery: Time in seconds after which adopted tasks which are queued in celery are assumed to be stalled, and are automatically rescheduled. This setting does the same thing as ``stalled_task_timeout`` but applies specifically to adopted tasks only. When set to 0, the ``stalled_task_timeout`` setting - also applies to adopted tasks. To calculate adoption time, subtract the - :ref:`task duration` from the task's :ref:`landing time`. + also applies to adopted tasks. + The task adoption time is sent as the ``dag...adoption_time`` metric. version_added: 2.0.0 type: integer example: ~ diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 257536cc038fd..c1d15c1e76b34 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -1036,8 +1036,8 @@ task_track_started = True # Time in seconds after which adopted tasks which are queued in celery are assumed to be stalled, # and are automatically rescheduled. This setting does the same thing as ``stalled_task_timeout`` but # applies specifically to adopted tasks only. When set to 0, the ``stalled_task_timeout`` setting -# also applies to adopted tasks. To calculate adoption time, subtract the -# :ref:`task duration` from the task's :ref:`landing time`. +# also applies to adopted tasks. +# The task adoption time is sent as the ``dag...adoption_time`` metric. task_adoption_timeout = 600 # Time in seconds after which tasks queued in celery are assumed to be stalled, and are automatically diff --git a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst index 650e31bdb070c..de6285546b726 100644 --- a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst +++ b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst @@ -165,7 +165,9 @@ Timers Name Description =================================================== ======================================================================== ``dagrun.dependency-check.`` Milliseconds taken to check DAG dependencies -``dag...duration`` Seconds taken to finish a task +``dag...duration`` Seconds taken to run a task +``dag...adoption_time`` Seconds a task spends in the Scheduled and Queued states -- + i.e. the time it takes for a task that is ready to be run to actually start running. ``dag_processing.last_duration.`` Seconds taken to load the given DAG file ``dagrun.duration.success.`` Seconds taken for a DagRun to reach success state ``dagrun.duration.failed.`` Milliseconds taken for a DagRun to reach failed state From 160b246b18fa289aefaa8ef2f3773679af13ceec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Fri, 21 Apr 2023 16:25:27 -0700 Subject: [PATCH 3/7] split metric in two --- airflow/config_templates/config.yml | 3 +- airflow/config_templates/default_airflow.cfg | 3 +- airflow/jobs/scheduler_job_runner.py | 5 ++- airflow/models/taskinstance.py | 41 ++++++++++++++----- .../logging-monitoring/metrics.rst | 4 +- 5 files changed, 40 insertions(+), 16 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 005f9c2b04070..521fcf3b3fbfd 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2053,7 +2053,8 @@ celery: and are automatically rescheduled. This setting does the same thing as ``stalled_task_timeout`` but applies specifically to adopted tasks only. When set to 0, the ``stalled_task_timeout`` setting also applies to adopted tasks. - The task adoption time is sent as the ``dag...adoption_time`` metric. + To calculate adoption time, add the ``dag...scheduled_duration`` and + ``dag...queued_duration`` metrics. version_added: 2.0.0 type: integer example: ~ diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index c1d15c1e76b34..516c04851634a 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -1037,7 +1037,8 @@ task_track_started = True # and are automatically rescheduled. This setting does the same thing as ``stalled_task_timeout`` but # applies specifically to adopted tasks only. When set to 0, the ``stalled_task_timeout`` setting # also applies to adopted tasks. -# The task adoption time is sent as the ``dag...adoption_time`` metric. +# To calculate adoption time, add the ``dag...scheduled_duration`` and +# ``dag...queued_duration`` metrics. task_adoption_timeout = 600 # Time in seconds after which tasks queued in celery are assumed to be stalled, and are automatically diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 2373c974f7271..552c7598ef54b 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -532,16 +532,19 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) - # set TIs to queued state filter_for_tis = TI.filter_for_tis(executable_tis) + now = timezone.utcnow() session.query(TI).filter(filter_for_tis).update( # TODO[ha]: should we use func.now()? How does that work with DB timezone # on mysql when it's not UTC? { TI.state: TaskInstanceState.QUEUED, - TI.queued_dttm: timezone.utcnow(), + TI.queued_dttm: now, TI.queued_by_job_id: self.job.id, }, synchronize_session=False, ) + for ti in executable_tis: + ti.emit_state_change_metric(State.QUEUED) for ti in executable_tis: make_transient(ti) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 3fe64a5b6f0d1..d935b1774a6c3 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1331,18 +1331,8 @@ def check_and_change_state_before_execution( if not test_mode: session.add(Log(State.RUNNING, self)) - if not self.end_date: - # if the task has an end date, it means that this is not its first round. - # we send the adoption time metric only on the first try, otherwise it gets more complex. - adoption_time = (timezone.utcnow() - self.start_date).total_seconds() - Stats.timing(f"dag.{self.task.dag_id}.{self.task.task_id}.adoption_time", adoption_time) - Stats.timing( - "task.adoption_time", - adoption_time, - tags={"task_id": self.task.task_id, "dag_id": self.task.dag_id}, - ) - self.state = State.RUNNING + self.emit_state_change_metric(State.RUNNING) self.external_executor_id = external_executor_id self.end_date = None if not test_mode: @@ -1382,6 +1372,35 @@ def _log_state(self, lead_msg: str = "") -> None: self._date_or_empty("end_date"), ) + def emit_state_change_metric(self, new_state: TaskInstanceState): + """ + Sends a time metric logging how much time a given state transition took. + The previous state and metric name is deduced from the state the task was put in. + + :param new_state: The state that has just been set for this task. + We do not use `self.state`, because sometimes the state is updated directly in the DB and not in + the local TaskInstance object. + Supported states: QUEUED and RUNNING + """ + if self.end_date: + # if the task has an end date, it means that this is not its first round. + # we send the state transition time metric only on the first try, otherwise it gets more complex. + return + + # switch on state and deduce which metric to send + if new_state == State.RUNNING: + metric_name = "queued_duration" + timing = (timezone.utcnow() - self.queued_dttm).total_seconds() + elif new_state == State.QUEUED: + metric_name = "scheduled_duration" + timing = (timezone.utcnow() - self.start_date).total_seconds() + else: + raise NotImplementedError + + # send metric twice, once (legacy) with tags in the name and once with tags as tags + Stats.timing(f"dag.{self.dag_id}.{self.task_id}.{metric_name}", timing) + Stats.timing(f"task.{metric_name}", timing, tags={"task_id": self.task_id, "dag_id": self.dag_id}) + # Ensure we unset next_method and next_kwargs to ensure that any # retries don't re-use them. def clear_next_method_args(self) -> None: diff --git a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst index de6285546b726..7568df8e49086 100644 --- a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst +++ b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst @@ -166,8 +166,8 @@ Name Description =================================================== ======================================================================== ``dagrun.dependency-check.`` Milliseconds taken to check DAG dependencies ``dag...duration`` Seconds taken to run a task -``dag...adoption_time`` Seconds a task spends in the Scheduled and Queued states -- - i.e. the time it takes for a task that is ready to be run to actually start running. +``dag...scheduled_duration`` Seconds a task spends in the Scheduled state, before being Queued +``dag...queued_duration`` Seconds a task spends in the Queued state, before being Running ``dag_processing.last_duration.`` Seconds taken to load the given DAG file ``dagrun.duration.success.`` Seconds taken for a DagRun to reach success state ``dagrun.duration.failed.`` Milliseconds taken for a DagRun to reach failed state From bcf115bd44da5f644f0b08102c4dd474fe15dbd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Fri, 21 Apr 2023 16:39:22 -0700 Subject: [PATCH 4/7] I didn't mean to commit this before --- airflow/jobs/scheduler_job_runner.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 552c7598ef54b..f22861f46be12 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -532,13 +532,12 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) - # set TIs to queued state filter_for_tis = TI.filter_for_tis(executable_tis) - now = timezone.utcnow() session.query(TI).filter(filter_for_tis).update( # TODO[ha]: should we use func.now()? How does that work with DB timezone # on mysql when it's not UTC? { TI.state: TaskInstanceState.QUEUED, - TI.queued_dttm: now, + TI.queued_dttm: timezone.utcnow(), TI.queued_by_job_id: self.job.id, }, synchronize_session=False, From 5c58f89e69addf56dd9bf017576ee1287a24adf0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Mon, 24 Apr 2023 11:59:41 -0700 Subject: [PATCH 5/7] add some guardrails when we compute time spent --- airflow/models/taskinstance.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index b3f2d532a54a1..bab549199c4bc 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1407,9 +1407,26 @@ def emit_state_change_metric(self, new_state: TaskInstanceState): # switch on state and deduce which metric to send if new_state == State.RUNNING: metric_name = "queued_duration" + if self.queued_dttm is None: + # this should not really happen except in tests or rare cases, + # but we don't want to create errors just for a metric, so we just skip it + self.log.warning( + "cannot record %s for task %s because previous state change time has not been saved", + metric_name, + self.task_id, + ) + return timing = (timezone.utcnow() - self.queued_dttm).total_seconds() elif new_state == State.QUEUED: metric_name = "scheduled_duration" + if self.start_date is None: + # same comment as above + self.log.warning( + "cannot record %s for task %s because previous state change time has not been saved", + metric_name, + self.task_id, + ) + return timing = (timezone.utcnow() - self.start_date).total_seconds() else: raise NotImplementedError From e4c640bfd7b35d4143c6d202a49bb96e88fed8e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= <114772123+vandonr-amz@users.noreply.github.com> Date: Thu, 27 Apr 2023 11:31:27 -0700 Subject: [PATCH 6/7] reword comment Co-authored-by: Niko Oliveira --- airflow/models/taskinstance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index bab549199c4bc..47629643fa972 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1391,7 +1391,7 @@ def _log_state(self, lead_msg: str = "") -> None: def emit_state_change_metric(self, new_state: TaskInstanceState): """ - Sends a time metric logging how much time a given state transition took. + Sends a time metric representing how much time a given state transition took. The previous state and metric name is deduced from the state the task was put in. :param new_state: The state that has just been set for this task. From 72bbdbf4e28cacd277bae5d468b8ef1dc26daa2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Thu, 27 Apr 2023 11:38:55 -0700 Subject: [PATCH 7/7] add a bit more info on notimplementedexception --- airflow/models/taskinstance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 47629643fa972..018a5ec735466 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1429,7 +1429,7 @@ def emit_state_change_metric(self, new_state: TaskInstanceState): return timing = (timezone.utcnow() - self.start_date).total_seconds() else: - raise NotImplementedError + raise NotImplementedError("no metric emission setup for state %s", new_state) # send metric twice, once (legacy) with tags in the name and once with tags as tags Stats.timing(f"dag.{self.dag_id}.{self.task_id}.{metric_name}", timing)