From 7aa538ead857a26ca3c43fa5b5b084e75f75c9c8 Mon Sep 17 00:00:00 2001 From: vincbeck Date: Wed, 15 May 2024 14:50:50 -0400 Subject: [PATCH 01/10] Add metrics about task CPU and memory usage --- airflow/task/task_runner/standard_task_runner.py | 16 ++++++++++++++++ .../logging-monitoring/metrics.rst | 2 ++ 2 files changed, 18 insertions(+) diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py index 00252acf428fa..1deded22111e4 100644 --- a/airflow/task/task_runner/standard_task_runner.py +++ b/airflow/task/task_runner/standard_task_runner.py @@ -21,6 +21,7 @@ import logging import os +import threading from typing import TYPE_CHECKING import psutil @@ -29,6 +30,7 @@ from airflow.api_internal.internal_api_call import InternalApiConfig from airflow.models.taskinstance import TaskReturnCode from airflow.settings import CAN_FORK +from airflow.stats import Stats from airflow.task.task_runner.base_task_runner import BaseTaskRunner from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager from airflow.utils.process_utils import reap_process_group, set_new_process_group @@ -53,6 +55,11 @@ def start(self): else: self.process = self._start_by_exec() + if self.process: + log_reader = threading.Thread(target=self._read_task_utilization) + log_reader.daemon = True + log_reader.start() + def _start_by_exec(self) -> psutil.Process: subprocess = self.run_command() self.process = psutil.Process(subprocess.pid) @@ -186,3 +193,12 @@ def get_process_pid(self) -> int: if self.process is None: raise RuntimeError("Process is not started yet") return self.process.pid + + def _read_task_utilization(self): + while True: + dag_id = self._task_instance.dag_id + task_id = self._task_instance.task_id + mem_usage = self.process.memory_percent() + cpu_usage = self.process.cpu_percent(interval=1) + Stats.gauge(f"task.mem_usage_percent.{dag_id}.{task_id}", mem_usage) + Stats.gauge(f"task.cpu_usage_percent.{dag_id}.{task_id}", cpu_usage) diff --git a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst index efe565094a648..f95c3a981cee6 100644 --- a/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst +++ b/docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst @@ -242,6 +242,8 @@ Name Description ``pool.scheduled_tasks`` Number of scheduled tasks in the pool. Metric with pool_name tagging. ``pool.starving_tasks.`` Number of starving tasks in the pool ``pool.starving_tasks`` Number of starving tasks in the pool. Metric with pool_name tagging. +``task.cpu_usage_percent..`` Percentage of CPU used by a task +``task.mem_usage_percent..`` Percentage of memory used by a task ``triggers.running.`` Number of triggers currently running for a triggerer (described by hostname) ``triggers.running`` Number of triggers currently running for a triggerer (described by hostname). Metric with hostname tagging. From 45215150b221e7de481d4be10d4bb507cec26627 Mon Sep 17 00:00:00 2001 From: vincbeck Date: Wed, 15 May 2024 15:22:51 -0400 Subject: [PATCH 02/10] Rename variable --- airflow/task/task_runner/standard_task_runner.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py index 1deded22111e4..f272ddae71ef3 100644 --- a/airflow/task/task_runner/standard_task_runner.py +++ b/airflow/task/task_runner/standard_task_runner.py @@ -56,9 +56,9 @@ def start(self): self.process = self._start_by_exec() if self.process: - log_reader = threading.Thread(target=self._read_task_utilization) - log_reader.daemon = True - log_reader.start() + resource_monitor = threading.Thread(target=self._read_task_utilization) + resource_monitor.daemon = True + resource_monitor.start() def _start_by_exec(self) -> psutil.Process: subprocess = self.run_command() From f031976106d7c9cd2b3cc0aaa14aae987c25997e Mon Sep 17 00:00:00 2001 From: vincbeck Date: Thu, 16 May 2024 11:44:14 -0400 Subject: [PATCH 03/10] Use `oneshot` --- .../task/task_runner/standard_task_runner.py | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py index f272ddae71ef3..021e5bba2e3fa 100644 --- a/airflow/task/task_runner/standard_task_runner.py +++ b/airflow/task/task_runner/standard_task_runner.py @@ -195,10 +195,18 @@ def get_process_pid(self) -> int: return self.process.pid def _read_task_utilization(self): + dag_id = self._task_instance.dag_id + task_id = self._task_instance.task_id + while True: - dag_id = self._task_instance.dag_id - task_id = self._task_instance.task_id - mem_usage = self.process.memory_percent() - cpu_usage = self.process.cpu_percent(interval=1) - Stats.gauge(f"task.mem_usage_percent.{dag_id}.{task_id}", mem_usage) - Stats.gauge(f"task.cpu_usage_percent.{dag_id}.{task_id}", cpu_usage) + try: + with self.process.oneshot(): + mem_usage = self.process.memory_percent() + cpu_usage = self.process.cpu_percent(interval=1) + + Stats.gauge(f"task.mem_usage_percent.{dag_id}.{task_id}", mem_usage) + Stats.gauge(f"task.cpu_usage_percent.{dag_id}.{task_id}", cpu_usage) + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + except AttributeError: + pass From f735ca57c6ce065a9d2cadefdc822cd2d2815508 Mon Sep 17 00:00:00 2001 From: vincbeck Date: Thu, 16 May 2024 11:50:37 -0400 Subject: [PATCH 04/10] Remove `_percent` --- airflow/task/task_runner/standard_task_runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py index 021e5bba2e3fa..098d1a4b2b531 100644 --- a/airflow/task/task_runner/standard_task_runner.py +++ b/airflow/task/task_runner/standard_task_runner.py @@ -204,8 +204,8 @@ def _read_task_utilization(self): mem_usage = self.process.memory_percent() cpu_usage = self.process.cpu_percent(interval=1) - Stats.gauge(f"task.mem_usage_percent.{dag_id}.{task_id}", mem_usage) - Stats.gauge(f"task.cpu_usage_percent.{dag_id}.{task_id}", cpu_usage) + Stats.gauge(f"task.mem_usage.{dag_id}.{task_id}", mem_usage) + Stats.gauge(f"task.cpu_usage.{dag_id}.{task_id}", cpu_usage) except (psutil.NoSuchProcess, psutil.AccessDenied): pass except AttributeError: From 4b86598038010a999373da36666e186521011e20 Mon Sep 17 00:00:00 2001 From: vincbeck Date: Thu, 16 May 2024 13:05:53 -0400 Subject: [PATCH 05/10] Add unit tests --- tests/task/task_runner/test_standard_task_runner.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/task/task_runner/test_standard_task_runner.py b/tests/task/task_runner/test_standard_task_runner.py index ab9b882e1f3c5..0a2ada733f674 100644 --- a/tests/task/task_runner/test_standard_task_runner.py +++ b/tests/task/task_runner/test_standard_task_runner.py @@ -96,8 +96,9 @@ def clean_listener_manager(self): yield get_listener_manager().clear() + @mock.patch.object(StandardTaskRunner, "_read_task_utilization") @patch("airflow.utils.log.file_task_handler.FileTaskHandler._init_file") - def test_start_and_terminate(self, mock_init): + def test_start_and_terminate(self, mock_init, mock_read_task_utilization): mock_init.return_value = "/tmp/any" Job = mock.Mock() Job.job_type = None @@ -131,6 +132,7 @@ def test_start_and_terminate(self, mock_init): assert not psutil.pid_exists(process.pid), f"{process} is still alive" assert task_runner.return_code() is not None + mock_read_task_utilization.assert_called() @pytest.mark.db_test def test_notifies_about_start_and_stop(self, tmp_path): @@ -260,8 +262,9 @@ def test_ol_does_not_block_xcoms(self, tmp_path): assert f.readline() == "on_task_instance_success\n" assert f.readline() == "listener\n" + @mock.patch.object(StandardTaskRunner, "_read_task_utilization") @patch("airflow.utils.log.file_task_handler.FileTaskHandler._init_file") - def test_start_and_terminate_run_as_user(self, mock_init): + def test_start_and_terminate_run_as_user(self, mock_init, mock_read_task_utilization): mock_init.return_value = "/tmp/any" Job = mock.Mock() Job.job_type = None @@ -296,6 +299,7 @@ def test_start_and_terminate_run_as_user(self, mock_init): assert not psutil.pid_exists(process.pid), f"{process} is still alive" assert task_runner.return_code() is not None + mock_read_task_utilization.assert_called() @propagate_task_logger() @patch("airflow.utils.log.file_task_handler.FileTaskHandler._init_file") From 5763140641b1c6f7cec2bd99b9f5351d220d2d0d Mon Sep 17 00:00:00 2001 From: vincbeck Date: Thu, 16 May 2024 15:36:18 -0400 Subject: [PATCH 06/10] Break out when process is done --- .../task/task_runner/standard_task_runner.py | 4 ++-- .../task_runner/test_standard_task_runner.py | 24 +++++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py index 098d1a4b2b531..6ef69d60bb234 100644 --- a/airflow/task/task_runner/standard_task_runner.py +++ b/airflow/task/task_runner/standard_task_runner.py @@ -207,6 +207,6 @@ def _read_task_utilization(self): Stats.gauge(f"task.mem_usage.{dag_id}.{task_id}", mem_usage) Stats.gauge(f"task.cpu_usage.{dag_id}.{task_id}", cpu_usage) except (psutil.NoSuchProcess, psutil.AccessDenied): - pass + break except AttributeError: - pass + raise diff --git a/tests/task/task_runner/test_standard_task_runner.py b/tests/task/task_runner/test_standard_task_runner.py index 0a2ada733f674..18807520208aa 100644 --- a/tests/task/task_runner/test_standard_task_runner.py +++ b/tests/task/task_runner/test_standard_task_runner.py @@ -448,6 +448,30 @@ def test_parsing_context(self): "_AIRFLOW_PARSING_CONTEXT_TASK_ID=task1\n" ) + @mock.patch("airflow.task.task_runner.standard_task_runner.Stats.gauge") + @patch("airflow.utils.log.file_task_handler.FileTaskHandler._init_file") + def test_read_task_utilization(self, mock_init, mock_stats): + mock_init.return_value = "/tmp/any" + Job = mock.Mock() + Job.job_type = None + Job.task_instance = mock.MagicMock() + Job.task_instance.task_id = "task_id" + Job.task_instance.dag_id = "dag_id" + Job.task_instance.run_as_user = None + Job.task_instance.command_as_list.return_value = [ + "airflow", + "tasks", + "run", + "test_on_kill", + "task1", + "2016-01-01", + ] + job_runner = LocalTaskJobRunner(job=Job, task_instance=Job.task_instance) + task_runner = StandardTaskRunner(job_runner) + task_runner.start() + task_runner._read_task_utilization() + assert mock_stats.call_count == 2 + @staticmethod def _procs_in_pgroup(pgid): for proc in psutil.process_iter(attrs=["pid", "name"]): From 487372b48cfe592b04bbcb9135bc6bbf0a2d1605 Mon Sep 17 00:00:00 2001 From: Vincent <97131062+vincbeck@users.noreply.github.com> Date: Fri, 17 May 2024 09:42:14 -0400 Subject: [PATCH 07/10] Update airflow/task/task_runner/standard_task_runner.py Co-authored-by: Andrey Anshin --- airflow/task/task_runner/standard_task_runner.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py index 6ef69d60bb234..6cab45f9ada46 100644 --- a/airflow/task/task_runner/standard_task_runner.py +++ b/airflow/task/task_runner/standard_task_runner.py @@ -206,7 +206,5 @@ def _read_task_utilization(self): Stats.gauge(f"task.mem_usage.{dag_id}.{task_id}", mem_usage) Stats.gauge(f"task.cpu_usage.{dag_id}.{task_id}", cpu_usage) - except (psutil.NoSuchProcess, psutil.AccessDenied): + except (psutil.NoSuchProcess, psutil.AccessDenied, AttributeError): break - except AttributeError: - raise From 44dc20accb49f0eaeb2217597a5882f7bccf7e67 Mon Sep 17 00:00:00 2001 From: vincbeck Date: Fri, 17 May 2024 09:47:57 -0400 Subject: [PATCH 08/10] Wrap up while in try expect --- airflow/task/task_runner/standard_task_runner.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py index 6cab45f9ada46..45c3363adc6e0 100644 --- a/airflow/task/task_runner/standard_task_runner.py +++ b/airflow/task/task_runner/standard_task_runner.py @@ -22,6 +22,7 @@ import logging import os import threading +import time from typing import TYPE_CHECKING import psutil @@ -198,13 +199,14 @@ def _read_task_utilization(self): dag_id = self._task_instance.dag_id task_id = self._task_instance.task_id - while True: - try: + try: + while True: with self.process.oneshot(): mem_usage = self.process.memory_percent() - cpu_usage = self.process.cpu_percent(interval=1) + cpu_usage = self.process.cpu_percent() Stats.gauge(f"task.mem_usage.{dag_id}.{task_id}", mem_usage) Stats.gauge(f"task.cpu_usage.{dag_id}.{task_id}", cpu_usage) - except (psutil.NoSuchProcess, psutil.AccessDenied, AttributeError): - break + time.sleep(1) + except (psutil.NoSuchProcess, psutil.AccessDenied, AttributeError): + return From 5ac55037763a9d78bd70cf7e6eccbe1495a82440 Mon Sep 17 00:00:00 2001 From: vincbeck Date: Fri, 17 May 2024 12:39:16 -0400 Subject: [PATCH 09/10] Fix unit test --- tests/task/task_runner/test_standard_task_runner.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/task/task_runner/test_standard_task_runner.py b/tests/task/task_runner/test_standard_task_runner.py index 18807520208aa..381aefc7c17d5 100644 --- a/tests/task/task_runner/test_standard_task_runner.py +++ b/tests/task/task_runner/test_standard_task_runner.py @@ -28,6 +28,7 @@ import psutil import pytest +from airflow.exceptions import AirflowTaskTimeout from airflow.jobs.job import Job from airflow.jobs.local_task_job_runner import LocalTaskJobRunner from airflow.listeners.listener import get_listener_manager @@ -469,7 +470,11 @@ def test_read_task_utilization(self, mock_init, mock_stats): job_runner = LocalTaskJobRunner(job=Job, task_instance=Job.task_instance) task_runner = StandardTaskRunner(job_runner) task_runner.start() - task_runner._read_task_utilization() + try: + with timeout(1): + task_runner._read_task_utilization() + except AirflowTaskTimeout: + pass assert mock_stats.call_count == 2 @staticmethod From b5a9c41e94299ee3dd23e0c4d0c9ed9abf737c21 Mon Sep 17 00:00:00 2001 From: vincbeck Date: Tue, 21 May 2024 14:33:56 -0400 Subject: [PATCH 10/10] Add log and increase sleep time to 5 secs --- airflow/task/task_runner/standard_task_runner.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py index 45c3363adc6e0..5ecf1ad64cebb 100644 --- a/airflow/task/task_runner/standard_task_runner.py +++ b/airflow/task/task_runner/standard_task_runner.py @@ -207,6 +207,7 @@ def _read_task_utilization(self): Stats.gauge(f"task.mem_usage.{dag_id}.{task_id}", mem_usage) Stats.gauge(f"task.cpu_usage.{dag_id}.{task_id}", cpu_usage) - time.sleep(1) + time.sleep(5) except (psutil.NoSuchProcess, psutil.AccessDenied, AttributeError): + self.log.info("Process not found (most likely exited), stop collecting metrics") return