From 2a979196bf19f866d952d563b19369d95491b426 Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Sat, 14 Oct 2023 01:56:37 +0200 Subject: [PATCH 01/23] Refactor commands to unify daemon context handling --- airflow/cli/commands/celery_command.py | 87 ++++++------------ airflow/cli/commands/daemon_utils.py | 88 +++++++++++++++++++ airflow/cli/commands/dag_processor_command.py | 32 ++----- airflow/cli/commands/internal_api_command.py | 85 +++++++----------- airflow/cli/commands/kerberos_command.py | 27 +----- airflow/cli/commands/scheduler_command.py | 47 +++------- airflow/cli/commands/triggerer_command.py | 48 +++------- airflow/cli/commands/webserver_command.py | 86 +++++++----------- 8 files changed, 217 insertions(+), 283 deletions(-) create mode 100644 airflow/cli/commands/daemon_utils.py diff --git a/airflow/cli/commands/celery_command.py b/airflow/cli/commands/celery_command.py index eb53d6f60db68..837296e75fcbd 100644 --- a/airflow/cli/commands/celery_command.py +++ b/airflow/cli/commands/celery_command.py @@ -23,19 +23,18 @@ from contextlib import contextmanager from multiprocessing import Process -import daemon import psutil import sqlalchemy.exc from celery import maybe_patch_concurrency # type: ignore[attr-defined] from celery.app.defaults import DEFAULT_TASK_LOG_FMT from celery.signals import after_setup_logger -from daemon.pidfile import TimeoutPIDLockFile from lockfile.pidlockfile import read_pid_from_pidfile, remove_existing_pidfile from airflow import settings +from airflow.cli.commands.daemon_utils import run_command_with_daemon_mode from airflow.configuration import conf from airflow.utils import cli as cli_utils -from airflow.utils.cli import setup_locations, setup_logging +from airflow.utils.cli import setup_locations from airflow.utils.providers_configuration_loader import providers_configuration_loaded from airflow.utils.serve_logs import serve_logs @@ -68,28 +67,7 @@ def flower(args): if args.flower_conf: options.append(f"--conf={args.flower_conf}") - if args.daemon: - pidfile, stdout, stderr, _ = setup_locations( - process="flower", - pid=args.pid, - stdout=args.stdout, - stderr=args.stderr, - log=args.log_file, - ) - with open(stdout, "a") as stdout, open(stderr, "a") as stderr: - stdout.truncate(0) - stderr.truncate(0) - - ctx = daemon.DaemonContext( - pidfile=TimeoutPIDLockFile(pidfile, -1), - stdout=stdout, - stderr=stderr, - umask=int(settings.DAEMON_UMASK, 8), - ) - with ctx: - celery_app.start(options) - else: - celery_app.start(options) + run_command_with_daemon_mode(args, "flower", lambda: celery_app.start(options)) @contextmanager @@ -152,15 +130,6 @@ def worker(args): if autoscale is None and conf.has_option("celery", "worker_autoscale"): autoscale = conf.get("celery", "worker_autoscale") - # Setup locations - pid_file_path, stdout, stderr, log_file = setup_locations( - process=WORKER_PROCESS_NAME, - pid=args.pid, - stdout=args.stdout, - stderr=args.stderr, - log=args.log_file, - ) - if hasattr(celery_app.backend, "ResultSession"): # Pre-create the database tables now, otherwise SQLA via Celery has a # race condition where one of the subprocesses can die with "Table @@ -181,6 +150,10 @@ def worker(args): celery_log_level = conf.get("logging", "CELERY_LOGGING_LEVEL") if not celery_log_level: celery_log_level = conf.get("logging", "LOGGING_LEVEL") + + # Setup pid file location + worker_pid_file_path, _, _, _ = setup_locations(process=WORKER_PROCESS_NAME, pid=args.pid) + # Setup Celery worker options = [ "worker", @@ -195,7 +168,7 @@ def worker(args): "--loglevel", celery_log_level, "--pidfile", - pid_file_path, + worker_pid_file_path, ] if autoscale: options.extend(["--autoscale", autoscale]) @@ -214,33 +187,31 @@ def worker(args): # executed. maybe_patch_concurrency(["-P", pool]) - if args.daemon: - # Run Celery worker as daemon - handle = setup_logging(log_file) - - with open(stdout, "a") as stdout_handle, open(stderr, "a") as stderr_handle: - if args.umask: - umask = args.umask - else: - umask = conf.get("celery", "worker_umask", fallback=settings.DAEMON_UMASK) - - stdout_handle.truncate(0) - stderr_handle.truncate(0) - - daemon_context = daemon.DaemonContext( - files_preserve=[handle], - umask=int(umask, 8), - stdout=stdout_handle, - stderr=stderr_handle, - ) - with daemon_context, _serve_logs(skip_serve_logs): - celery_app.worker_main(options) + _, stdout, stderr, log_file = setup_locations( + process=WORKER_PROCESS_NAME, + stdout=args.stdout, + stderr=args.stderr, + log=args.log_file, + ) - else: - # Run Celery worker in the same process + def run_celery_worker(): with _serve_logs(skip_serve_logs): celery_app.worker_main(options) + if args.umask: + umask = args.umask + else: + umask = conf.get("celery", "worker_umask", fallback=settings.DAEMON_UMASK) + + run_command_with_daemon_mode( + args, + WORKER_PROCESS_NAME, + run_celery_worker, + should_setup_logging=True, + umask=umask, + pid_file=worker_pid_file_path, + ) + @cli_utils.action_cli @providers_configuration_loaded diff --git a/airflow/cli/commands/daemon_utils.py b/airflow/cli/commands/daemon_utils.py new file mode 100644 index 0000000000000..743716f31783f --- /dev/null +++ b/airflow/cli/commands/daemon_utils.py @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import signal +from argparse import Namespace +from typing import Callable + +from daemon import daemon +from daemon.pidfile import TimeoutPIDLockFile + +from airflow import settings +from airflow.utils.cli import setup_locations, setup_logging, sigint_handler, sigquit_handler +from airflow.utils.process_utils import check_if_pidfile_process_is_running + + +def run_command_with_daemon_mode( + args: Namespace, + process_name: str, + callback: Callable, + should_setup_logging: bool = False, + umask: str = settings.DAEMON_UMASK, + pid_file: str = None, +): + """ + Parameters + ---------- + args : Namespace + The set of arguments passed to the original CLI command + process_name : str + Process name used in naming log and PID files for the daemon + callback : Callable + The actual command to run with or without daemon context + should_setup_logging : bool + If true, then a log file handler for the daemon process will be created + umask : str + File access creation mask ("umask") to set for the process on daemon start + pid_file : str + If specified, this file path us used to store daemon process PID. + If not specified, a file path is generated with the default pattern. + """ + if args.daemon: + pid, stdout, stderr, log_file = setup_locations( + process=process_name, pid=args.pid, stdout=args.stdout, stderr=args.stderr, log=args.log_file + ) + if pid_file: + pid = pid_file + + # Check if the process is already running; if not but a pidfile exists, clean it up + check_if_pidfile_process_is_running(pid_file=pid, process_name=process_name) + + files_preserve = [] + if should_setup_logging: + handle = setup_logging(log_file) + files_preserve.append(handle) + with open(stdout, "a") as stdout_handle, open(stderr, "a") as stderr_handle: + stdout_handle.truncate(0) + stderr_handle.truncate(0) + + ctx = daemon.DaemonContext( + pidfile=TimeoutPIDLockFile(pid, -1), + files_preserve=files_preserve, + stdout=stdout_handle, + stderr=stderr_handle, + umask=int(umask, 8), + ) + + with ctx: + callback() + else: + signal.signal(signal.SIGINT, sigint_handler) + signal.signal(signal.SIGTERM, sigint_handler) + signal.signal(signal.SIGQUIT, sigquit_handler) + callback() diff --git a/airflow/cli/commands/dag_processor_command.py b/airflow/cli/commands/dag_processor_command.py index cf880f6622e91..249fb422f9f09 100644 --- a/airflow/cli/commands/dag_processor_command.py +++ b/airflow/cli/commands/dag_processor_command.py @@ -21,16 +21,12 @@ from datetime import timedelta from typing import Any -import daemon -from daemon.pidfile import TimeoutPIDLockFile - -from airflow import settings +from airflow.cli.commands.daemon_utils import run_command_with_daemon_mode from airflow.configuration import conf from airflow.dag_processing.manager import DagFileProcessorManager from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner from airflow.jobs.job import Job, run_job from airflow.utils import cli as cli_utils -from airflow.utils.cli import setup_locations, setup_logging from airflow.utils.providers_configuration_loader import providers_configuration_loaded log = logging.getLogger(__name__) @@ -66,23 +62,9 @@ def dag_processor(args): job_runner = _create_dag_processor_job_runner(args) - if args.daemon: - pid, stdout, stderr, log_file = setup_locations( - "dag-processor", args.pid, args.stdout, args.stderr, args.log_file - ) - handle = setup_logging(log_file) - with open(stdout, "a") as stdout_handle, open(stderr, "a") as stderr_handle: - stdout_handle.truncate(0) - stderr_handle.truncate(0) - - ctx = daemon.DaemonContext( - pidfile=TimeoutPIDLockFile(pid, -1), - files_preserve=[handle], - stdout=stdout_handle, - stderr=stderr_handle, - umask=int(settings.DAEMON_UMASK, 8), - ) - with ctx: - run_job(job=job_runner.job, execute_callable=job_runner._execute) - else: - run_job(job=job_runner.job, execute_callable=job_runner._execute) + run_command_with_daemon_mode( + args, + "dag-processor", + lambda: run_job(job=job_runner.job, execute_callable=job_runner._execute), + should_setup_logging=True, + ) diff --git a/airflow/cli/commands/internal_api_command.py b/airflow/cli/commands/internal_api_command.py index 73ed2e250130b..422ba0ef62e37 100644 --- a/airflow/cli/commands/internal_api_command.py +++ b/airflow/cli/commands/internal_api_command.py @@ -24,13 +24,10 @@ import sys import textwrap from contextlib import suppress -from pathlib import Path from tempfile import gettempdir from time import sleep -import daemon import psutil -from daemon.pidfile import TimeoutPIDLockFile from flask import Flask from flask_appbuilder import SQLA from flask_caching import Cache @@ -40,14 +37,14 @@ from airflow import settings from airflow.api_internal.internal_api_call import InternalApiConfig +from airflow.cli.commands.daemon_utils import run_command_with_daemon_mode from airflow.cli.commands.webserver_command import GunicornMonitor from airflow.configuration import conf from airflow.exceptions import AirflowConfigException from airflow.logging_config import configure_logging from airflow.models import import_all_models from airflow.utils import cli as cli_utils -from airflow.utils.cli import setup_locations, setup_logging -from airflow.utils.process_utils import check_if_pidfile_process_is_running +from airflow.utils.cli import setup_locations from airflow.utils.providers_configuration_loader import providers_configuration_loaded from airflow.www.extensions.init_dagbag import init_dagbag from airflow.www.extensions.init_jinja_globals import init_jinja_globals @@ -81,12 +78,6 @@ def internal_api(args): host=args.hostname, ) else: - pid_file, stdout, stderr, log_file = setup_locations( - "internal-api", args.pid, args.stdout, args.stderr, args.log_file - ) - - # Check if Internal APi is already running if not, remove old pidfile - check_if_pidfile_process_is_running(pid_file=pid_file, process_name="internal-api") log.info( textwrap.dedent( @@ -101,6 +92,8 @@ def internal_api(args): ) ) + gunicorn_master_pid_file, _, _, _ = setup_locations("internal-api-gunicorn-master") + run_args = [ sys.executable, "-m", @@ -116,7 +109,7 @@ def internal_api(args): "--name", "airflow-internal-api", "--pid", - pid_file, + gunicorn_master_pid_file, "--access-logfile", str(access_logfile), "--error-logfile", @@ -137,9 +130,7 @@ def internal_api(args): # then have a copy of the app run_args += ["--preload"] - gunicorn_master_proc: psutil.Process | None = None - - def kill_proc(signum, _): + def kill_proc(signum, gunicorn_master_proc): log.info("Received signal: %s. Closing gunicorn.", signum) gunicorn_master_proc.terminate() with suppress(TimeoutError): @@ -148,14 +139,14 @@ def kill_proc(signum, _): gunicorn_master_proc.kill() sys.exit(0) - def monitor_gunicorn(gunicorn_master_pid: int): + def monitor_gunicorn(gunicorn_master_proc: psutil.Process | subprocess.Popen): # Register signal handlers - signal.signal(signal.SIGINT, kill_proc) - signal.signal(signal.SIGTERM, kill_proc) + signal.signal(signal.SIGINT, lambda signum, _: kill_proc(signum, gunicorn_master_proc)) + signal.signal(signal.SIGTERM, lambda signum, _: kill_proc(signum, gunicorn_master_proc)) # These run forever until SIG{INT, TERM, KILL, ...} signal is sent GunicornMonitor( - gunicorn_master_pid=gunicorn_master_pid, + gunicorn_master_pid=gunicorn_master_proc.pid, num_workers_expected=num_workers, master_timeout=120, worker_refresh_interval=30, @@ -163,45 +154,33 @@ def monitor_gunicorn(gunicorn_master_pid: int): reload_on_plugin_change=False, ).start() + def start_and_monitor_gunicorn(args): + if args.daemon: + subprocess.Popen(run_args, close_fds=True) + + # Reading pid of gunicorn master as it will be different that + # the one of process spawned above. + gunicorn_master_proc_pid = None + while not gunicorn_master_proc_pid: + sleep(0.1) + gunicorn_master_proc_pid = read_pid_from_pidfile(gunicorn_master_pid_file) + + # Run Gunicorn monitor + gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid) + monitor_gunicorn(gunicorn_master_proc) + else: + with subprocess.Popen(run_args, close_fds=True) as gunicorn_master_proc: + monitor_gunicorn(gunicorn_master_proc) + if args.daemon: # This makes possible errors get reported before daemonization os.environ["SKIP_DAGS_PARSING"] = "True" - app = create_app(None) + create_app(None) os.environ.pop("SKIP_DAGS_PARSING") - handle = setup_logging(log_file) - - pid_path = Path(pid_file) - pidlock_path = pid_path.with_name(f"{pid_path.stem}-monitor{pid_path.suffix}") - - with open(stdout, "a") as stdout, open(stderr, "a") as stderr: - stdout.truncate(0) - stderr.truncate(0) - - ctx = daemon.DaemonContext( - pidfile=TimeoutPIDLockFile(pidlock_path, -1), - files_preserve=[handle], - stdout=stdout, - stderr=stderr, - umask=int(settings.DAEMON_UMASK, 8), - ) - with ctx: - subprocess.Popen(run_args, close_fds=True) - - # Reading pid of gunicorn main process as it will be different that - # the one of process spawned above. - gunicorn_master_proc_pid = None - while not gunicorn_master_proc_pid: - sleep(0.1) - gunicorn_master_proc_pid = read_pid_from_pidfile(pid_file) - - # Run Gunicorn monitor - gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid) - monitor_gunicorn(gunicorn_master_proc.pid) - - else: - with subprocess.Popen(run_args, close_fds=True) as gunicorn_master_proc: - monitor_gunicorn(gunicorn_master_proc.pid) + run_command_with_daemon_mode( + args, "internal-api", lambda: start_and_monitor_gunicorn(args), should_setup_logging=True + ) def create_app(config=None, testing=False): diff --git a/airflow/cli/commands/kerberos_command.py b/airflow/cli/commands/kerberos_command.py index 4dd63d52ebdd5..bad0aa88a35de 100644 --- a/airflow/cli/commands/kerberos_command.py +++ b/airflow/cli/commands/kerberos_command.py @@ -17,13 +17,10 @@ """Kerberos command.""" from __future__ import annotations -import daemon -from daemon.pidfile import TimeoutPIDLockFile - from airflow import settings +from airflow.cli.commands.daemon_utils import run_command_with_daemon_mode from airflow.security import kerberos as krb from airflow.utils import cli as cli_utils -from airflow.utils.cli import setup_locations from airflow.utils.providers_configuration_loader import providers_configuration_loaded @@ -33,22 +30,6 @@ def kerberos(args): """Start a kerberos ticket renewer.""" print(settings.HEADER) - if args.daemon: - pid, stdout, stderr, _ = setup_locations( - "kerberos", args.pid, args.stdout, args.stderr, args.log_file - ) - with open(stdout, "a") as stdout_handle, open(stderr, "a") as stderr_handle: - stdout_handle.truncate(0) - stderr_handle.truncate(0) - - ctx = daemon.DaemonContext( - pidfile=TimeoutPIDLockFile(pid, -1), - stdout=stdout_handle, - stderr=stderr_handle, - umask=int(settings.DAEMON_UMASK, 8), - ) - - with ctx: - krb.run(principal=args.principal, keytab=args.keytab) - else: - krb.run(principal=args.principal, keytab=args.keytab) + run_command_with_daemon_mode( + args, "kerberos", lambda: krb.run(principal=args.principal, keytab=args.keytab) + ) diff --git a/airflow/cli/commands/scheduler_command.py b/airflow/cli/commands/scheduler_command.py index fd25951ad3228..38cf2115dbc09 100644 --- a/airflow/cli/commands/scheduler_command.py +++ b/airflow/cli/commands/scheduler_command.py @@ -18,31 +18,33 @@ from __future__ import annotations import logging -import signal +from argparse import Namespace from contextlib import contextmanager from multiprocessing import Process -import daemon -from daemon.pidfile import TimeoutPIDLockFile - from airflow import settings from airflow.api_internal.internal_api_call import InternalApiConfig +from airflow.cli.commands.daemon_utils import run_command_with_daemon_mode from airflow.configuration import conf from airflow.executors.executor_loader import ExecutorLoader from airflow.jobs.job import Job, run_job from airflow.jobs.scheduler_job_runner import SchedulerJobRunner from airflow.utils import cli as cli_utils -from airflow.utils.cli import process_subdir, setup_locations, setup_logging, sigint_handler, sigquit_handler +from airflow.utils.cli import process_subdir from airflow.utils.providers_configuration_loader import providers_configuration_loaded from airflow.utils.scheduler_health import serve_health_check log = logging.getLogger(__name__) -def _run_scheduler_job(job_runner: SchedulerJobRunner, *, skip_serve_logs: bool) -> None: +def _run_scheduler_job(args) -> None: + job_runner = SchedulerJobRunner( + job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs, do_pickle=args.do_pickle + ) + ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor) InternalApiConfig.force_database_direct_access() enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK") - with _serve_logs(skip_serve_logs), _serve_health_check(enable_health_check): + with _serve_logs(args.skip_serve_logs), _serve_health_check(enable_health_check): try: run_job(job=job_runner.job, execute_callable=job_runner._execute) except Exception: @@ -51,38 +53,13 @@ def _run_scheduler_job(job_runner: SchedulerJobRunner, *, skip_serve_logs: bool) @cli_utils.action_cli @providers_configuration_loaded -def scheduler(args): +def scheduler(args: Namespace): """Start Airflow Scheduler.""" print(settings.HEADER) - job_runner = SchedulerJobRunner( - job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs, do_pickle=args.do_pickle + run_command_with_daemon_mode( + args, "scheduler", lambda: _run_scheduler_job(args), should_setup_logging=True ) - ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor) - - if args.daemon: - pid, stdout, stderr, log_file = setup_locations( - "scheduler", args.pid, args.stdout, args.stderr, args.log_file - ) - handle = setup_logging(log_file) - with open(stdout, "a") as stdout_handle, open(stderr, "a") as stderr_handle: - stdout_handle.truncate(0) - stderr_handle.truncate(0) - - ctx = daemon.DaemonContext( - pidfile=TimeoutPIDLockFile(pid, -1), - files_preserve=[handle], - stdout=stdout_handle, - stderr=stderr_handle, - umask=int(settings.DAEMON_UMASK, 8), - ) - with ctx: - _run_scheduler_job(job_runner, skip_serve_logs=args.skip_serve_logs) - else: - signal.signal(signal.SIGINT, sigint_handler) - signal.signal(signal.SIGTERM, sigint_handler) - signal.signal(signal.SIGQUIT, sigquit_handler) - _run_scheduler_job(job_runner, skip_serve_logs=args.skip_serve_logs) @contextmanager diff --git a/airflow/cli/commands/triggerer_command.py b/airflow/cli/commands/triggerer_command.py index 5ddb4e23b6633..96f68b860d316 100644 --- a/airflow/cli/commands/triggerer_command.py +++ b/airflow/cli/commands/triggerer_command.py @@ -17,21 +17,17 @@ """Triggerer command.""" from __future__ import annotations -import signal from contextlib import contextmanager from functools import partial from multiprocessing import Process from typing import Generator -import daemon -from daemon.pidfile import TimeoutPIDLockFile - from airflow import settings +from airflow.cli.commands.daemon_utils import run_command_with_daemon_mode from airflow.configuration import conf from airflow.jobs.job import Job, run_job from airflow.jobs.triggerer_job_runner import TriggererJobRunner from airflow.utils import cli as cli_utils -from airflow.utils.cli import setup_locations, setup_logging, sigint_handler, sigquit_handler from airflow.utils.providers_configuration_loader import providers_configuration_loaded from airflow.utils.serve_logs import serve_logs @@ -51,6 +47,12 @@ def _serve_logs(skip_serve_logs: bool = False) -> Generator[None, None, None]: sub_proc.terminate() +def triggerer_run(skip_serve_logs: bool, capacity: int, triggerer_heartrate: float): + with _serve_logs(skip_serve_logs): + triggerer_job_runner = TriggererJobRunner(job=Job(heartrate=triggerer_heartrate), capacity=capacity) + run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute) + + @cli_utils.action_cli @providers_configuration_loaded def triggerer(args): @@ -59,33 +61,9 @@ def triggerer(args): print(settings.HEADER) triggerer_heartrate = conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC") - if args.daemon: - pid, stdout, stderr, log_file = setup_locations( - "triggerer", args.pid, args.stdout, args.stderr, args.log_file - ) - handle = setup_logging(log_file) - with open(stdout, "a") as stdout_handle, open(stderr, "a") as stderr_handle: - stdout_handle.truncate(0) - stderr_handle.truncate(0) - - daemon_context = daemon.DaemonContext( - pidfile=TimeoutPIDLockFile(pid, -1), - files_preserve=[handle], - stdout=stdout_handle, - stderr=stderr_handle, - umask=int(settings.DAEMON_UMASK, 8), - ) - with daemon_context, _serve_logs(args.skip_serve_logs): - triggerer_job_runner = TriggererJobRunner( - job=Job(heartrate=triggerer_heartrate), capacity=args.capacity - ) - run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute) - else: - signal.signal(signal.SIGINT, sigint_handler) - signal.signal(signal.SIGTERM, sigint_handler) - signal.signal(signal.SIGQUIT, sigquit_handler) - with _serve_logs(args.skip_serve_logs): - triggerer_job_runner = TriggererJobRunner( - job=Job(heartrate=triggerer_heartrate), capacity=args.capacity - ) - run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute) + run_command_with_daemon_mode( + args, + "triggerer", + lambda: triggerer_run(args.skip_serve_logs, args.capacity, triggerer_heartrate), + should_setup_logging=True, + ) diff --git a/airflow/cli/commands/webserver_command.py b/airflow/cli/commands/webserver_command.py index beae141a6b2a8..5b0ccf303ed5a 100644 --- a/airflow/cli/commands/webserver_command.py +++ b/airflow/cli/commands/webserver_command.py @@ -25,23 +25,20 @@ import textwrap import time from contextlib import suppress -from pathlib import Path from time import sleep from typing import TYPE_CHECKING, NoReturn -import daemon import psutil -from daemon.pidfile import TimeoutPIDLockFile from lockfile.pidlockfile import read_pid_from_pidfile from airflow import settings +from airflow.cli.commands.daemon_utils import run_command_with_daemon_mode from airflow.configuration import conf from airflow.exceptions import AirflowException, AirflowWebServerTimeout from airflow.utils import cli as cli_utils -from airflow.utils.cli import setup_locations, setup_logging +from airflow.utils.cli import setup_locations from airflow.utils.hashlib_wrapper import md5 from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.utils.process_utils import check_if_pidfile_process_is_running from airflow.utils.providers_configuration_loader import providers_configuration_loaded if TYPE_CHECKING: @@ -368,13 +365,6 @@ def webserver(args): ) else: - pid_file, stdout, stderr, log_file = setup_locations( - "webserver", args.pid, args.stdout, args.stderr, args.log_file - ) - - # Check if webserver is already running if not, remove old pidfile - check_if_pidfile_process_is_running(pid_file=pid_file, process_name="webserver") - print( textwrap.dedent( f"""\ @@ -388,6 +378,8 @@ def webserver(args): ) ) + gunicorn_master_pid_file, _, _, _ = setup_locations("webserver-gunicorn-master") + run_args = [ sys.executable, "-m", @@ -403,7 +395,7 @@ def webserver(args): "--name", "airflow-webserver", "--pid", - pid_file, + gunicorn_master_pid_file, "--config", "python:airflow.www.gunicorn_config", ] @@ -437,9 +429,7 @@ def webserver(args): # all writing to the database at the same time, we use the --preload option. run_args += ["--preload"] - gunicorn_master_proc: psutil.Process | subprocess.Popen - - def kill_proc(signum: int, frame: types.FrameType | None) -> NoReturn: + def kill_proc(signum: int, gunicorn_master_proc: psutil.Process | subprocess.Popen) -> NoReturn: log.info("Received signal: %s. Closing gunicorn.", signum) gunicorn_master_proc.terminate() with suppress(TimeoutError): @@ -452,14 +442,14 @@ def kill_proc(signum: int, frame: types.FrameType | None) -> NoReturn: gunicorn_master_proc.kill() sys.exit(0) - def monitor_gunicorn(gunicorn_master_pid: int) -> NoReturn: + def monitor_gunicorn(gunicorn_master_proc: psutil.Process | subprocess.Popen) -> NoReturn: # Register signal handlers - signal.signal(signal.SIGINT, kill_proc) - signal.signal(signal.SIGTERM, kill_proc) + signal.signal(signal.SIGINT, lambda signum, _: kill_proc(signum, gunicorn_master_proc)) + signal.signal(signal.SIGTERM, lambda signum, _: kill_proc(signum, gunicorn_master_proc)) # These run forever until SIG{INT, TERM, KILL, ...} signal is sent GunicornMonitor( - gunicorn_master_pid=gunicorn_master_pid, + gunicorn_master_pid=gunicorn_master_proc.pid, num_workers_expected=num_workers, master_timeout=conf.getint("webserver", "web_server_master_timeout"), worker_refresh_interval=conf.getint("webserver", "worker_refresh_interval", fallback=30), @@ -469,42 +459,30 @@ def monitor_gunicorn(gunicorn_master_pid: int) -> NoReturn: ), ).start() + def start_and_monitor_gunicorn(args): + if args.daemon: + subprocess.Popen(run_args, close_fds=True) + + # Reading pid of gunicorn master as it will be different that + # the one of process spawned above. + gunicorn_master_proc_pid = None + while not gunicorn_master_proc_pid: + sleep(0.1) + gunicorn_master_proc_pid = read_pid_from_pidfile(gunicorn_master_pid_file) + + # Run Gunicorn monitor + gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid) + monitor_gunicorn(gunicorn_master_proc) + else: + with subprocess.Popen(run_args, close_fds=True) as gunicorn_master_proc: + monitor_gunicorn(gunicorn_master_proc) + if args.daemon: # This makes possible errors get reported before daemonization os.environ["SKIP_DAGS_PARSING"] = "True" - app = create_app(None) + create_app(None) os.environ.pop("SKIP_DAGS_PARSING") - handle = setup_logging(log_file) - - pid_path = Path(pid_file) - pidlock_path = pid_path.with_name(f"{pid_path.stem}-monitor{pid_path.suffix}") - - with open(stdout, "a") as stdout, open(stderr, "a") as stderr: - stdout.truncate(0) - stderr.truncate(0) - - ctx = daemon.DaemonContext( - pidfile=TimeoutPIDLockFile(pidlock_path, -1), - files_preserve=[handle], - stdout=stdout, - stderr=stderr, - umask=int(settings.DAEMON_UMASK, 8), - ) - with ctx: - subprocess.Popen(run_args, close_fds=True) - - # Reading pid of gunicorn master as it will be different that - # the one of process spawned above. - gunicorn_master_proc_pid = None - while not gunicorn_master_proc_pid: - sleep(0.1) - gunicorn_master_proc_pid = read_pid_from_pidfile(pid_file) - - # Run Gunicorn monitor - gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid) - monitor_gunicorn(gunicorn_master_proc.pid) - - else: - with subprocess.Popen(run_args, close_fds=True) as gunicorn_master_proc: - monitor_gunicorn(gunicorn_master_proc.pid) + run_command_with_daemon_mode( + args, "webserver", lambda: start_and_monitor_gunicorn(args), should_setup_logging=True + ) From 81eb06514cfdbe962591bb7df47a340f4264a911 Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Sat, 14 Oct 2023 20:12:22 +0200 Subject: [PATCH 02/23] fix static checks --- airflow/cli/commands/webserver_command.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/cli/commands/webserver_command.py b/airflow/cli/commands/webserver_command.py index 5b0ccf303ed5a..b065981b399f2 100644 --- a/airflow/cli/commands/webserver_command.py +++ b/airflow/cli/commands/webserver_command.py @@ -42,7 +42,7 @@ from airflow.utils.providers_configuration_loader import providers_configuration_loaded if TYPE_CHECKING: - import types + pass log = logging.getLogger(__name__) From a386497155b7a6860c54803d0b9ff91a42d71cf2 Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Sat, 14 Oct 2023 20:18:58 +0200 Subject: [PATCH 03/23] fix --- airflow/cli/commands/webserver_command.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/airflow/cli/commands/webserver_command.py b/airflow/cli/commands/webserver_command.py index b065981b399f2..589869863945a 100644 --- a/airflow/cli/commands/webserver_command.py +++ b/airflow/cli/commands/webserver_command.py @@ -26,7 +26,7 @@ import time from contextlib import suppress from time import sleep -from typing import TYPE_CHECKING, NoReturn +from typing import NoReturn import psutil from lockfile.pidlockfile import read_pid_from_pidfile @@ -41,9 +41,6 @@ from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.providers_configuration_loader import providers_configuration_loaded -if TYPE_CHECKING: - pass - log = logging.getLogger(__name__) From 68e36ecca4d674d8ed2dbaad0ebe3d018e17b579 Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Sat, 14 Oct 2023 22:05:11 +0200 Subject: [PATCH 04/23] fix --- airflow/cli/commands/daemon_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/cli/commands/daemon_utils.py b/airflow/cli/commands/daemon_utils.py index 743716f31783f..f91fef93ea589 100644 --- a/airflow/cli/commands/daemon_utils.py +++ b/airflow/cli/commands/daemon_utils.py @@ -18,7 +18,7 @@ import signal from argparse import Namespace -from typing import Callable +from typing import Callable, Union from daemon import daemon from daemon.pidfile import TimeoutPIDLockFile @@ -34,7 +34,7 @@ def run_command_with_daemon_mode( callback: Callable, should_setup_logging: bool = False, umask: str = settings.DAEMON_UMASK, - pid_file: str = None, + pid_file: Union[str, None] = None, ): """ Parameters From c3a461821fc76d3d0097e8b3364ea696f6639c66 Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Sun, 15 Oct 2023 08:52:31 +0200 Subject: [PATCH 05/23] fix --- airflow/cli/commands/daemon_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/cli/commands/daemon_utils.py b/airflow/cli/commands/daemon_utils.py index f91fef93ea589..d3c1d36877912 100644 --- a/airflow/cli/commands/daemon_utils.py +++ b/airflow/cli/commands/daemon_utils.py @@ -18,7 +18,7 @@ import signal from argparse import Namespace -from typing import Callable, Union +from typing import Callable from daemon import daemon from daemon.pidfile import TimeoutPIDLockFile @@ -34,7 +34,7 @@ def run_command_with_daemon_mode( callback: Callable, should_setup_logging: bool = False, umask: str = settings.DAEMON_UMASK, - pid_file: Union[str, None] = None, + pid_file: str | None = None, ): """ Parameters From 1774776e85a2f095e3e4bdc26f2143fc429dc38b Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Sun, 15 Oct 2023 08:54:43 +0200 Subject: [PATCH 06/23] format --- airflow/cli/commands/daemon_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/cli/commands/daemon_utils.py b/airflow/cli/commands/daemon_utils.py index d3c1d36877912..0c0bb8935f79b 100644 --- a/airflow/cli/commands/daemon_utils.py +++ b/airflow/cli/commands/daemon_utils.py @@ -36,6 +36,7 @@ def run_command_with_daemon_mode( umask: str = settings.DAEMON_UMASK, pid_file: str | None = None, ): + """ Parameters ---------- From d4b608f6de548ac89a9b8f5334c451c59202a4cc Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Sun, 15 Oct 2023 08:58:19 +0200 Subject: [PATCH 07/23] format --- airflow/cli/commands/daemon_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/cli/commands/daemon_utils.py b/airflow/cli/commands/daemon_utils.py index 0c0bb8935f79b..d3c1d36877912 100644 --- a/airflow/cli/commands/daemon_utils.py +++ b/airflow/cli/commands/daemon_utils.py @@ -36,7 +36,6 @@ def run_command_with_daemon_mode( umask: str = settings.DAEMON_UMASK, pid_file: str | None = None, ): - """ Parameters ---------- From e8e7ddb5c985713dfc2857e03e4eb77ec397d9dc Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Sun, 15 Oct 2023 09:04:05 +0200 Subject: [PATCH 08/23] update inclusivity filter 'gunicorn master process' is a third-party concept --- .pre-commit-config.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ba5d0a4456077..a5d99d54f4a02 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -511,6 +511,7 @@ repos: ^airflow/api_connexion/openapi/v1.yaml$| ^airflow/auth/managers/fab/security_manager/| ^airflow/cli/commands/webserver_command.py$| + ^airflow/cli/commands/internal_api_command.py$| ^airflow/config_templates/| ^airflow/models/baseoperator.py$| ^airflow/operators/__init__.py$| From 6dfb72009ef0587e944cd490e28a56b4853db1d5 Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Sun, 15 Oct 2023 11:23:41 +0200 Subject: [PATCH 09/23] format --- airflow/cli/commands/daemon_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/cli/commands/daemon_utils.py b/airflow/cli/commands/daemon_utils.py index d3c1d36877912..0c0bb8935f79b 100644 --- a/airflow/cli/commands/daemon_utils.py +++ b/airflow/cli/commands/daemon_utils.py @@ -36,6 +36,7 @@ def run_command_with_daemon_mode( umask: str = settings.DAEMON_UMASK, pid_file: str | None = None, ): + """ Parameters ---------- From 9c42a330659099790dc279acdbbf79cae4574bfc Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Sun, 15 Oct 2023 11:26:01 +0200 Subject: [PATCH 10/23] doc --- airflow/cli/commands/daemon_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/cli/commands/daemon_utils.py b/airflow/cli/commands/daemon_utils.py index 0c0bb8935f79b..53d3f8f5aa031 100644 --- a/airflow/cli/commands/daemon_utils.py +++ b/airflow/cli/commands/daemon_utils.py @@ -36,8 +36,8 @@ def run_command_with_daemon_mode( umask: str = settings.DAEMON_UMASK, pid_file: str | None = None, ): + """Run the command in daemon process if enabled or in this process if not. - """ Parameters ---------- args : Namespace From 06438cdf313d9806ce1d7a2849d4d2c760335916 Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Sun, 15 Oct 2023 16:13:50 +0200 Subject: [PATCH 11/23] update tests --- airflow/cli/commands/daemon_utils.py | 6 ++-- tests/cli/commands/test_kerberos_command.py | 30 +++++++++----------- tests/cli/commands/test_webserver_command.py | 10 +++---- 3 files changed, 22 insertions(+), 24 deletions(-) diff --git a/airflow/cli/commands/daemon_utils.py b/airflow/cli/commands/daemon_utils.py index 53d3f8f5aa031..6c90d8ec3b402 100644 --- a/airflow/cli/commands/daemon_utils.py +++ b/airflow/cli/commands/daemon_utils.py @@ -64,10 +64,10 @@ def run_command_with_daemon_mode( # Check if the process is already running; if not but a pidfile exists, clean it up check_if_pidfile_process_is_running(pid_file=pid, process_name=process_name) - files_preserve = [] if should_setup_logging: - handle = setup_logging(log_file) - files_preserve.append(handle) + files_preserve = [setup_logging(log_file)] + else: + files_preserve = None with open(stdout, "a") as stdout_handle, open(stderr, "a") as stderr_handle: stdout_handle.truncate(0) stderr_handle.truncate(0) diff --git a/tests/cli/commands/test_kerberos_command.py b/tests/cli/commands/test_kerberos_command.py index 41dce045fa1a2..00d24ca944f0f 100644 --- a/tests/cli/commands/test_kerberos_command.py +++ b/tests/cli/commands/test_kerberos_command.py @@ -36,9 +36,9 @@ def test_run_command(self, mock_krb): kerberos_command.kerberos(args) mock_krb.run.assert_called_once_with(keytab="/tmp/airflow.keytab", principal="PRINCIPAL") - @mock.patch("airflow.cli.commands.kerberos_command.TimeoutPIDLockFile") - @mock.patch("airflow.cli.commands.kerberos_command.setup_locations") - @mock.patch("airflow.cli.commands.kerberos_command.daemon") + @mock.patch("airflow.cli.commands.daemon_utils.TimeoutPIDLockFile") + @mock.patch("airflow.cli.commands.daemon_utils.setup_locations") + @mock.patch("airflow.cli.commands.daemon_utils.daemon") @mock.patch("airflow.cli.commands.kerberos_command.krb") @conf_vars({("core", "executor"): "CeleryExecutor"}) def test_run_command_daemon(self, mock_krb, mock_daemon, mock_setup_locations, mock_pid_file): @@ -66,13 +66,14 @@ def test_run_command_daemon(self, mock_krb, mock_daemon, mock_setup_locations, m ] ) mock_open = mock.mock_open() - with mock.patch("airflow.cli.commands.kerberos_command.open", mock_open): + with mock.patch("airflow.cli.commands.daemon_utils.open", mock_open): kerberos_command.kerberos(args) mock_krb.run.assert_called_once_with(keytab="/tmp/airflow.keytab", principal="PRINCIPAL") - assert mock_daemon.mock_calls == [ + assert mock_daemon.mock_calls[:3] == [ mock.call.DaemonContext( pidfile=mock_pid_file.return_value, + files_preserve=None, stderr=mock_open.return_value, stdout=mock_open.return_value, umask=0o077, @@ -81,18 +82,15 @@ def test_run_command_daemon(self, mock_krb, mock_daemon, mock_setup_locations, m mock.call.DaemonContext().__exit__(None, None, None), ] - mock_setup_locations.assert_has_calls( - [ - mock.call( - "kerberos", - "/tmp/kerberos.pid", - "/tmp/kerberos-stdout.log", - "/tmp/kerberos-stderr.log", - "/tmp/kerberos.log", - ) - ] + assert mock_setup_locations.mock_calls[0] == mock.call( + process="kerberos", + pid="/tmp/kerberos.pid", + stdout="/tmp/kerberos-stdout.log", + stderr="/tmp/kerberos-stderr.log", + log="/tmp/kerberos.log", ) - mock_pid_file.assert_has_calls([mock.call(mock_setup_locations.return_value[0], -1)]) + + mock_pid_file.mock_calls[0] = mock.call(mock_setup_locations.return_value[0], -1) assert mock_open.mock_calls == [ mock.call(mock_setup_locations.return_value[1], "a"), mock.call().__enter__(), diff --git a/tests/cli/commands/test_webserver_command.py b/tests/cli/commands/test_webserver_command.py index f325924bcfba4..a2aee71a4cce2 100644 --- a/tests/cli/commands/test_webserver_command.py +++ b/tests/cli/commands/test_webserver_command.py @@ -238,8 +238,8 @@ def test_cli_webserver_background(self, tmp_path): AIRFLOW__CORE__LOAD_EXAMPLES="False", AIRFLOW__WEBSERVER__WORKERS="1", ): - pidfile_webserver = tmp_path / "pidflow-webserver.pid" - pidfile_monitor = tmp_path / "pidflow-webserver-monitor.pid" + pidfile_webserver_gunicorn_master = tmp_path / "pidflow-webserver-gunicorn-master.pid" + pidfile_webserver_main = tmp_path / "pidflow-webserver.pid" stdout = tmp_path / "airflow-webserver.out" stderr = tmp_path / "airflow-webserver.err" logfile = tmp_path / "airflow-webserver.log" @@ -252,7 +252,7 @@ def test_cli_webserver_background(self, tmp_path): "webserver", "--daemon", "--pid", - os.fspath(pidfile_webserver), + os.fspath(pidfile_webserver_gunicorn_master), "--stdout", os.fspath(stdout), "--stderr", @@ -263,9 +263,9 @@ def test_cli_webserver_background(self, tmp_path): ) assert proc.poll() is None - pid_monitor = self._wait_pidfile(pidfile_monitor) + pid_monitor = self._wait_pidfile(pidfile_webserver_main) console.print(f"[blue]Monitor started at {pid_monitor}") - pid_webserver = self._wait_pidfile(pidfile_webserver) + pid_webserver = self._wait_pidfile(pidfile_webserver_gunicorn_master) console.print(f"[blue]Webserver started at {pid_webserver}") console.print("[blue]Running airflow webserver process:") # Assert that the webserver and gunicorn processes are running (by name rather than pid). From d65403d864737bf47c6ef53693b0b9cf23cc50fb Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Sun, 15 Oct 2023 16:25:11 +0200 Subject: [PATCH 12/23] fix tests --- airflow/cli/commands/internal_api_command.py | 2 +- airflow/cli/commands/webserver_command.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/airflow/cli/commands/internal_api_command.py b/airflow/cli/commands/internal_api_command.py index 422ba0ef62e37..2eb2978c033cf 100644 --- a/airflow/cli/commands/internal_api_command.py +++ b/airflow/cli/commands/internal_api_command.py @@ -92,7 +92,7 @@ def internal_api(args): ) ) - gunicorn_master_pid_file, _, _, _ = setup_locations("internal-api-gunicorn-master") + gunicorn_master_pid_file, _, _, _ = setup_locations("internal-api-gunicorn-master", pid=args.pid) run_args = [ sys.executable, diff --git a/airflow/cli/commands/webserver_command.py b/airflow/cli/commands/webserver_command.py index 589869863945a..6977b402b4fec 100644 --- a/airflow/cli/commands/webserver_command.py +++ b/airflow/cli/commands/webserver_command.py @@ -375,8 +375,7 @@ def webserver(args): ) ) - gunicorn_master_pid_file, _, _, _ = setup_locations("webserver-gunicorn-master") - + gunicorn_master_pid_file, _, _, _ = setup_locations("webserver-gunicorn-master", pid=args.pid) run_args = [ sys.executable, "-m", From 20c780eb83476c0edc4dc6b2473e9fd2f0121214 Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Sun, 15 Oct 2023 21:36:57 +0200 Subject: [PATCH 13/23] fix test --- tests/cli/commands/test_celery_command.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/cli/commands/test_celery_command.py b/tests/cli/commands/test_celery_command.py index ae968f1171c9d..e6480212e07a8 100644 --- a/tests/cli/commands/test_celery_command.py +++ b/tests/cli/commands/test_celery_command.py @@ -266,9 +266,9 @@ def test_run_command(self, mock_celery_app): ] ) - @mock.patch("airflow.cli.commands.celery_command.TimeoutPIDLockFile") - @mock.patch("airflow.cli.commands.celery_command.setup_locations") - @mock.patch("airflow.cli.commands.celery_command.daemon") + @mock.patch("airflow.cli.commands.kerberos_command.TimeoutPIDLockFile") + @mock.patch("airflow.cli.commands.kerberos_command.setup_locations") + @mock.patch("airflow.cli.commands.kerberos_command.daemon") @mock.patch("airflow.providers.celery.executors.celery_executor.app") def test_run_command_daemon(self, mock_celery_app, mock_daemon, mock_setup_locations, mock_pid_file): mock_setup_locations.return_value = ( From 64986d8510f94fdc4492bfe6a8977284afa4801a Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Thu, 19 Oct 2023 22:30:02 +0200 Subject: [PATCH 14/23] fix test --- tests/cli/commands/test_celery_command.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/cli/commands/test_celery_command.py b/tests/cli/commands/test_celery_command.py index e6480212e07a8..584251267cd2f 100644 --- a/tests/cli/commands/test_celery_command.py +++ b/tests/cli/commands/test_celery_command.py @@ -266,9 +266,9 @@ def test_run_command(self, mock_celery_app): ] ) - @mock.patch("airflow.cli.commands.kerberos_command.TimeoutPIDLockFile") - @mock.patch("airflow.cli.commands.kerberos_command.setup_locations") - @mock.patch("airflow.cli.commands.kerberos_command.daemon") + @mock.patch("airflow.cli.commands.daemon_utils.TimeoutPIDLockFile") + @mock.patch("airflow.cli.commands.daemon_utils.setup_locations") + @mock.patch("airflow.cli.commands.daemon_utils.daemon") @mock.patch("airflow.providers.celery.executors.celery_executor.app") def test_run_command_daemon(self, mock_celery_app, mock_daemon, mock_setup_locations, mock_pid_file): mock_setup_locations.return_value = ( From 167070e0b7b3c21072366f63ec97b7d571421dd5 Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Thu, 19 Oct 2023 23:45:14 +0200 Subject: [PATCH 15/23] change file naming --- airflow/cli/commands/daemon_utils.py | 2 +- airflow/cli/commands/internal_api_command.py | 15 +++++++++++---- airflow/cli/commands/webserver_command.py | 15 +++++++++++---- tests/cli/commands/test_webserver_command.py | 10 +++++----- 4 files changed, 28 insertions(+), 14 deletions(-) diff --git a/airflow/cli/commands/daemon_utils.py b/airflow/cli/commands/daemon_utils.py index 6c90d8ec3b402..2bbb0a458f0f3 100644 --- a/airflow/cli/commands/daemon_utils.py +++ b/airflow/cli/commands/daemon_utils.py @@ -56,7 +56,7 @@ def run_command_with_daemon_mode( """ if args.daemon: pid, stdout, stderr, log_file = setup_locations( - process=process_name, pid=args.pid, stdout=args.stdout, stderr=args.stderr, log=args.log_file + process=process_name, stdout=args.stdout, stderr=args.stderr, log=args.log_file ) if pid_file: pid = pid_file diff --git a/airflow/cli/commands/internal_api_command.py b/airflow/cli/commands/internal_api_command.py index 2eb2978c033cf..afdb55f775610 100644 --- a/airflow/cli/commands/internal_api_command.py +++ b/airflow/cli/commands/internal_api_command.py @@ -24,6 +24,7 @@ import sys import textwrap from contextlib import suppress +from pathlib import Path from tempfile import gettempdir from time import sleep @@ -92,7 +93,7 @@ def internal_api(args): ) ) - gunicorn_master_pid_file, _, _, _ = setup_locations("internal-api-gunicorn-master", pid=args.pid) + pid_file, _, _, _ = setup_locations("internal-api", pid=args.pid) run_args = [ sys.executable, @@ -109,7 +110,7 @@ def internal_api(args): "--name", "airflow-internal-api", "--pid", - gunicorn_master_pid_file, + pid_file, "--access-logfile", str(access_logfile), "--error-logfile", @@ -163,7 +164,7 @@ def start_and_monitor_gunicorn(args): gunicorn_master_proc_pid = None while not gunicorn_master_proc_pid: sleep(0.1) - gunicorn_master_proc_pid = read_pid_from_pidfile(gunicorn_master_pid_file) + gunicorn_master_proc_pid = read_pid_from_pidfile(pid_file) # Run Gunicorn monitor gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid) @@ -178,8 +179,14 @@ def start_and_monitor_gunicorn(args): create_app(None) os.environ.pop("SKIP_DAGS_PARSING") + pid_file_path = Path(pid_file) + monitor_pid_file = str(pid_file_path.with_name(f"{pid_file_path.stem}-monitor{pid_file_path.suffix}")) run_command_with_daemon_mode( - args, "internal-api", lambda: start_and_monitor_gunicorn(args), should_setup_logging=True + args, + "internal-api", + lambda: start_and_monitor_gunicorn(args), + should_setup_logging=True, + pid_file=monitor_pid_file, ) diff --git a/airflow/cli/commands/webserver_command.py b/airflow/cli/commands/webserver_command.py index 6977b402b4fec..0a313929f1097 100644 --- a/airflow/cli/commands/webserver_command.py +++ b/airflow/cli/commands/webserver_command.py @@ -25,6 +25,7 @@ import textwrap import time from contextlib import suppress +from pathlib import Path from time import sleep from typing import NoReturn @@ -375,7 +376,7 @@ def webserver(args): ) ) - gunicorn_master_pid_file, _, _, _ = setup_locations("webserver-gunicorn-master", pid=args.pid) + pid_file, _, _, _ = setup_locations("webserver", pid=args.pid) run_args = [ sys.executable, "-m", @@ -391,7 +392,7 @@ def webserver(args): "--name", "airflow-webserver", "--pid", - gunicorn_master_pid_file, + pid_file, "--config", "python:airflow.www.gunicorn_config", ] @@ -464,7 +465,7 @@ def start_and_monitor_gunicorn(args): gunicorn_master_proc_pid = None while not gunicorn_master_proc_pid: sleep(0.1) - gunicorn_master_proc_pid = read_pid_from_pidfile(gunicorn_master_pid_file) + gunicorn_master_proc_pid = read_pid_from_pidfile(pid_file) # Run Gunicorn monitor gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid) @@ -479,6 +480,12 @@ def start_and_monitor_gunicorn(args): create_app(None) os.environ.pop("SKIP_DAGS_PARSING") + pid_file_path = Path(pid_file) + monitor_pid_file = str(pid_file_path.with_name(f"{pid_file_path.stem}-monitor{pid_file_path.suffix}")) run_command_with_daemon_mode( - args, "webserver", lambda: start_and_monitor_gunicorn(args), should_setup_logging=True + args, + "webserver", + lambda: start_and_monitor_gunicorn(args), + should_setup_logging=True, + pid_file=monitor_pid_file, ) diff --git a/tests/cli/commands/test_webserver_command.py b/tests/cli/commands/test_webserver_command.py index a2aee71a4cce2..f325924bcfba4 100644 --- a/tests/cli/commands/test_webserver_command.py +++ b/tests/cli/commands/test_webserver_command.py @@ -238,8 +238,8 @@ def test_cli_webserver_background(self, tmp_path): AIRFLOW__CORE__LOAD_EXAMPLES="False", AIRFLOW__WEBSERVER__WORKERS="1", ): - pidfile_webserver_gunicorn_master = tmp_path / "pidflow-webserver-gunicorn-master.pid" - pidfile_webserver_main = tmp_path / "pidflow-webserver.pid" + pidfile_webserver = tmp_path / "pidflow-webserver.pid" + pidfile_monitor = tmp_path / "pidflow-webserver-monitor.pid" stdout = tmp_path / "airflow-webserver.out" stderr = tmp_path / "airflow-webserver.err" logfile = tmp_path / "airflow-webserver.log" @@ -252,7 +252,7 @@ def test_cli_webserver_background(self, tmp_path): "webserver", "--daemon", "--pid", - os.fspath(pidfile_webserver_gunicorn_master), + os.fspath(pidfile_webserver), "--stdout", os.fspath(stdout), "--stderr", @@ -263,9 +263,9 @@ def test_cli_webserver_background(self, tmp_path): ) assert proc.poll() is None - pid_monitor = self._wait_pidfile(pidfile_webserver_main) + pid_monitor = self._wait_pidfile(pidfile_monitor) console.print(f"[blue]Monitor started at {pid_monitor}") - pid_webserver = self._wait_pidfile(pidfile_webserver_gunicorn_master) + pid_webserver = self._wait_pidfile(pidfile_webserver) console.print(f"[blue]Webserver started at {pid_webserver}") console.print("[blue]Running airflow webserver process:") # Assert that the webserver and gunicorn processes are running (by name rather than pid). From 03099894105e60693a8034c57f1dfa3f4da81f71 Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Fri, 20 Oct 2023 00:46:52 +0200 Subject: [PATCH 16/23] fix tests and static checks --- airflow/cli/commands/webserver_command.py | 1 - tests/cli/commands/test_kerberos_command.py | 1 - 2 files changed, 2 deletions(-) diff --git a/airflow/cli/commands/webserver_command.py b/airflow/cli/commands/webserver_command.py index 18b300e654d96..c0dc61d21e64c 100644 --- a/airflow/cli/commands/webserver_command.py +++ b/airflow/cli/commands/webserver_command.py @@ -40,7 +40,6 @@ from airflow.utils.cli import setup_locations from airflow.utils.hashlib_wrapper import md5 from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.utils.process_utils import check_if_pidfile_process_is_running from airflow.utils.providers_configuration_loader import providers_configuration_loaded log = logging.getLogger(__name__) diff --git a/tests/cli/commands/test_kerberos_command.py b/tests/cli/commands/test_kerberos_command.py index 00d24ca944f0f..14eb1676bdc72 100644 --- a/tests/cli/commands/test_kerberos_command.py +++ b/tests/cli/commands/test_kerberos_command.py @@ -84,7 +84,6 @@ def test_run_command_daemon(self, mock_krb, mock_daemon, mock_setup_locations, m assert mock_setup_locations.mock_calls[0] == mock.call( process="kerberos", - pid="/tmp/kerberos.pid", stdout="/tmp/kerberos-stdout.log", stderr="/tmp/kerberos-stderr.log", log="/tmp/kerberos.log", From c43ec397ef71de8e7062d7d013ddbab7e74398d4 Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Fri, 20 Oct 2023 08:29:56 +0200 Subject: [PATCH 17/23] format --- airflow/cli/commands/internal_api_command.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/cli/commands/internal_api_command.py b/airflow/cli/commands/internal_api_command.py index afdb55f775610..c196995bfbb99 100644 --- a/airflow/cli/commands/internal_api_command.py +++ b/airflow/cli/commands/internal_api_command.py @@ -79,7 +79,6 @@ def internal_api(args): host=args.hostname, ) else: - log.info( textwrap.dedent( f"""\ From 329ac99abde087ac4c17684aebda5556cb78af1b Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Fri, 20 Oct 2023 11:02:33 +0200 Subject: [PATCH 18/23] fix test --- tests/cli/commands/test_celery_command.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/cli/commands/test_celery_command.py b/tests/cli/commands/test_celery_command.py index 584251267cd2f..02f26d7f23f69 100644 --- a/tests/cli/commands/test_celery_command.py +++ b/tests/cli/commands/test_celery_command.py @@ -305,7 +305,7 @@ def test_run_command_daemon(self, mock_celery_app, mock_daemon, mock_setup_locat ] ) mock_open = mock.mock_open() - with mock.patch("airflow.cli.commands.celery_command.open", mock_open): + with mock.patch("airflow.cli.commands.daemon_utils.open", mock_open): celery_command.flower(args) mock_celery_app.start.assert_called_once_with( @@ -320,11 +320,12 @@ def test_run_command_daemon(self, mock_celery_app, mock_daemon, mock_setup_locat "--conf=flower_config", ] ) - assert mock_daemon.mock_calls == [ + assert mock_daemon.mock_calls[:3] == [ mock.call.DaemonContext( pidfile=mock_pid_file.return_value, - stderr=mock_open.return_value, + files_preserve=None, stdout=mock_open.return_value, + stderr=mock_open.return_value, umask=0o077, ), mock.call.DaemonContext().__enter__(), @@ -333,11 +334,10 @@ def test_run_command_daemon(self, mock_celery_app, mock_daemon, mock_setup_locat assert mock_setup_locations.mock_calls == [ mock.call( - log="/tmp/flower.log", - pid="/tmp/flower.pid", process="flower", - stderr="/tmp/flower-stderr.log", stdout="/tmp/flower-stdout.log", + stderr="/tmp/flower-stderr.log", + log="/tmp/flower.log", ) ] mock_pid_file.assert_has_calls([mock.call(mock_setup_locations.return_value[0], -1)]) From 845b97379265cee154b7bf0366b6243cec5cccf6 Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Sat, 21 Oct 2023 14:41:00 +0200 Subject: [PATCH 19/23] reStructuredText --- airflow/cli/commands/daemon_utils.py | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/airflow/cli/commands/daemon_utils.py b/airflow/cli/commands/daemon_utils.py index 2bbb0a458f0f3..a41382be5fc3f 100644 --- a/airflow/cli/commands/daemon_utils.py +++ b/airflow/cli/commands/daemon_utils.py @@ -36,22 +36,14 @@ def run_command_with_daemon_mode( umask: str = settings.DAEMON_UMASK, pid_file: str | None = None, ): - """Run the command in daemon process if enabled or in this process if not. + """Run the command in a daemon process if daemon mode enabled or within this process if not. - Parameters - ---------- - args : Namespace - The set of arguments passed to the original CLI command - process_name : str - Process name used in naming log and PID files for the daemon - callback : Callable - The actual command to run with or without daemon context - should_setup_logging : bool - If true, then a log file handler for the daemon process will be created - umask : str - File access creation mask ("umask") to set for the process on daemon start - pid_file : str - If specified, this file path us used to store daemon process PID. + :param args: the set of arguments passed to the original CLI command + :param process_name: process name used in naming log and PID files for the daemon + :param callback: the actual command to run with or without daemon context + :param should_setup_logging: if true, then a log file handler for the daemon process will be created + :param umask: file access creation mask ("umask") to set for the process on daemon start + :param pid_file: if specified, this file path us used to store daemon process PID. If not specified, a file path is generated with the default pattern. """ if args.daemon: From 02488e01534256510fd5c168b2f28819c7617476 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Dyl=C4=85g?= Date: Mon, 23 Oct 2023 18:51:35 +0200 Subject: [PATCH 20/23] Update airflow/cli/commands/celery_command.py Co-authored-by: Jarek Potiuk --- airflow/cli/commands/celery_command.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/cli/commands/celery_command.py b/airflow/cli/commands/celery_command.py index 837296e75fcbd..fa02ef2d00856 100644 --- a/airflow/cli/commands/celery_command.py +++ b/airflow/cli/commands/celery_command.py @@ -67,7 +67,7 @@ def flower(args): if args.flower_conf: options.append(f"--conf={args.flower_conf}") - run_command_with_daemon_mode(args, "flower", lambda: celery_app.start(options)) + run_command_with_daemon_option(args, "flower", lambda: celery_app.start(options)) @contextmanager From 617e39193f09cae3c7b1f6600d127c56c5baf351 Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Mon, 23 Oct 2023 18:57:51 +0200 Subject: [PATCH 21/23] rename command and use keyword-only params --- airflow/cli/commands/celery_command.py | 14 ++++++++------ airflow/cli/commands/daemon_utils.py | 3 ++- airflow/cli/commands/dag_processor_command.py | 10 +++++----- airflow/cli/commands/internal_api_command.py | 10 +++++----- airflow/cli/commands/kerberos_command.py | 8 +++++--- airflow/cli/commands/scheduler_command.py | 9 ++++++--- airflow/cli/commands/triggerer_command.py | 10 +++++----- airflow/cli/commands/webserver_command.py | 10 +++++----- 8 files changed, 41 insertions(+), 33 deletions(-) diff --git a/airflow/cli/commands/celery_command.py b/airflow/cli/commands/celery_command.py index fa02ef2d00856..5e3e01042a070 100644 --- a/airflow/cli/commands/celery_command.py +++ b/airflow/cli/commands/celery_command.py @@ -31,7 +31,7 @@ from lockfile.pidlockfile import read_pid_from_pidfile, remove_existing_pidfile from airflow import settings -from airflow.cli.commands.daemon_utils import run_command_with_daemon_mode +from airflow.cli.commands.daemon_utils import run_command_with_daemon_option from airflow.configuration import conf from airflow.utils import cli as cli_utils from airflow.utils.cli import setup_locations @@ -67,7 +67,9 @@ def flower(args): if args.flower_conf: options.append(f"--conf={args.flower_conf}") - run_command_with_daemon_option(args, "flower", lambda: celery_app.start(options)) + run_command_with_daemon_option( + args=args, process_name="flower", callback=lambda: celery_app.start(options) + ) @contextmanager @@ -203,10 +205,10 @@ def run_celery_worker(): else: umask = conf.get("celery", "worker_umask", fallback=settings.DAEMON_UMASK) - run_command_with_daemon_mode( - args, - WORKER_PROCESS_NAME, - run_celery_worker, + run_command_with_daemon_option( + args=args, + process_name=WORKER_PROCESS_NAME, + callback=run_celery_worker, should_setup_logging=True, umask=umask, pid_file=worker_pid_file_path, diff --git a/airflow/cli/commands/daemon_utils.py b/airflow/cli/commands/daemon_utils.py index a41382be5fc3f..9184b1f7db5c7 100644 --- a/airflow/cli/commands/daemon_utils.py +++ b/airflow/cli/commands/daemon_utils.py @@ -28,7 +28,8 @@ from airflow.utils.process_utils import check_if_pidfile_process_is_running -def run_command_with_daemon_mode( +def run_command_with_daemon_option( + *, args: Namespace, process_name: str, callback: Callable, diff --git a/airflow/cli/commands/dag_processor_command.py b/airflow/cli/commands/dag_processor_command.py index 249fb422f9f09..85bef2727d10b 100644 --- a/airflow/cli/commands/dag_processor_command.py +++ b/airflow/cli/commands/dag_processor_command.py @@ -21,7 +21,7 @@ from datetime import timedelta from typing import Any -from airflow.cli.commands.daemon_utils import run_command_with_daemon_mode +from airflow.cli.commands.daemon_utils import run_command_with_daemon_option from airflow.configuration import conf from airflow.dag_processing.manager import DagFileProcessorManager from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner @@ -62,9 +62,9 @@ def dag_processor(args): job_runner = _create_dag_processor_job_runner(args) - run_command_with_daemon_mode( - args, - "dag-processor", - lambda: run_job(job=job_runner.job, execute_callable=job_runner._execute), + run_command_with_daemon_option( + args=args, + process_name="dag-processor", + callback=lambda: run_job(job=job_runner.job, execute_callable=job_runner._execute), should_setup_logging=True, ) diff --git a/airflow/cli/commands/internal_api_command.py b/airflow/cli/commands/internal_api_command.py index c196995bfbb99..031cf63a23f0c 100644 --- a/airflow/cli/commands/internal_api_command.py +++ b/airflow/cli/commands/internal_api_command.py @@ -38,7 +38,7 @@ from airflow import settings from airflow.api_internal.internal_api_call import InternalApiConfig -from airflow.cli.commands.daemon_utils import run_command_with_daemon_mode +from airflow.cli.commands.daemon_utils import run_command_with_daemon_option from airflow.cli.commands.webserver_command import GunicornMonitor from airflow.configuration import conf from airflow.exceptions import AirflowConfigException @@ -180,10 +180,10 @@ def start_and_monitor_gunicorn(args): pid_file_path = Path(pid_file) monitor_pid_file = str(pid_file_path.with_name(f"{pid_file_path.stem}-monitor{pid_file_path.suffix}")) - run_command_with_daemon_mode( - args, - "internal-api", - lambda: start_and_monitor_gunicorn(args), + run_command_with_daemon_option( + args=args, + process_name="internal-api", + callback=lambda: start_and_monitor_gunicorn(args), should_setup_logging=True, pid_file=monitor_pid_file, ) diff --git a/airflow/cli/commands/kerberos_command.py b/airflow/cli/commands/kerberos_command.py index bad0aa88a35de..8d33e7f8efffb 100644 --- a/airflow/cli/commands/kerberos_command.py +++ b/airflow/cli/commands/kerberos_command.py @@ -18,7 +18,7 @@ from __future__ import annotations from airflow import settings -from airflow.cli.commands.daemon_utils import run_command_with_daemon_mode +from airflow.cli.commands.daemon_utils import run_command_with_daemon_option from airflow.security import kerberos as krb from airflow.utils import cli as cli_utils from airflow.utils.providers_configuration_loader import providers_configuration_loaded @@ -30,6 +30,8 @@ def kerberos(args): """Start a kerberos ticket renewer.""" print(settings.HEADER) - run_command_with_daemon_mode( - args, "kerberos", lambda: krb.run(principal=args.principal, keytab=args.keytab) + run_command_with_daemon_option( + args=args, + process_name="kerberos", + callback=lambda: krb.run(principal=args.principal, keytab=args.keytab), ) diff --git a/airflow/cli/commands/scheduler_command.py b/airflow/cli/commands/scheduler_command.py index 38cf2115dbc09..fef0b97b2d111 100644 --- a/airflow/cli/commands/scheduler_command.py +++ b/airflow/cli/commands/scheduler_command.py @@ -24,7 +24,7 @@ from airflow import settings from airflow.api_internal.internal_api_call import InternalApiConfig -from airflow.cli.commands.daemon_utils import run_command_with_daemon_mode +from airflow.cli.commands.daemon_utils import run_command_with_daemon_option from airflow.configuration import conf from airflow.executors.executor_loader import ExecutorLoader from airflow.jobs.job import Job, run_job @@ -57,8 +57,11 @@ def scheduler(args: Namespace): """Start Airflow Scheduler.""" print(settings.HEADER) - run_command_with_daemon_mode( - args, "scheduler", lambda: _run_scheduler_job(args), should_setup_logging=True + run_command_with_daemon_option( + args=args, + process_name="scheduler", + callback=lambda: _run_scheduler_job(args), + should_setup_logging=True, ) diff --git a/airflow/cli/commands/triggerer_command.py b/airflow/cli/commands/triggerer_command.py index 96f68b860d316..3479480dbf8ac 100644 --- a/airflow/cli/commands/triggerer_command.py +++ b/airflow/cli/commands/triggerer_command.py @@ -23,7 +23,7 @@ from typing import Generator from airflow import settings -from airflow.cli.commands.daemon_utils import run_command_with_daemon_mode +from airflow.cli.commands.daemon_utils import run_command_with_daemon_option from airflow.configuration import conf from airflow.jobs.job import Job, run_job from airflow.jobs.triggerer_job_runner import TriggererJobRunner @@ -61,9 +61,9 @@ def triggerer(args): print(settings.HEADER) triggerer_heartrate = conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC") - run_command_with_daemon_mode( - args, - "triggerer", - lambda: triggerer_run(args.skip_serve_logs, args.capacity, triggerer_heartrate), + run_command_with_daemon_option( + args=args, + process_name="triggerer", + callback=lambda: triggerer_run(args.skip_serve_logs, args.capacity, triggerer_heartrate), should_setup_logging=True, ) diff --git a/airflow/cli/commands/webserver_command.py b/airflow/cli/commands/webserver_command.py index c0dc61d21e64c..4cb7939fd7fb5 100644 --- a/airflow/cli/commands/webserver_command.py +++ b/airflow/cli/commands/webserver_command.py @@ -33,7 +33,7 @@ from lockfile.pidlockfile import read_pid_from_pidfile from airflow import settings -from airflow.cli.commands.daemon_utils import run_command_with_daemon_mode +from airflow.cli.commands.daemon_utils import run_command_with_daemon_option from airflow.configuration import conf from airflow.exceptions import AirflowException, AirflowWebServerTimeout from airflow.utils import cli as cli_utils @@ -481,10 +481,10 @@ def start_and_monitor_gunicorn(args): pid_file_path = Path(pid_file) monitor_pid_file = str(pid_file_path.with_name(f"{pid_file_path.stem}-monitor{pid_file_path.suffix}")) - run_command_with_daemon_mode( - args, - "webserver", - lambda: start_and_monitor_gunicorn(args), + run_command_with_daemon_option( + args=args, + process_name="webserver", + callback=lambda: start_and_monitor_gunicorn(args), should_setup_logging=True, pid_file=monitor_pid_file, ) From b196afebe866c08d4d31269c28a3cee7567e2d7c Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Mon, 23 Oct 2023 19:07:03 +0200 Subject: [PATCH 22/23] missing types --- airflow/cli/commands/internal_api_command.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/cli/commands/internal_api_command.py b/airflow/cli/commands/internal_api_command.py index 031cf63a23f0c..e2d33e9a1a4c8 100644 --- a/airflow/cli/commands/internal_api_command.py +++ b/airflow/cli/commands/internal_api_command.py @@ -130,7 +130,7 @@ def internal_api(args): # then have a copy of the app run_args += ["--preload"] - def kill_proc(signum, gunicorn_master_proc): + def kill_proc(signum: int, gunicorn_master_proc: psutil.Process | subprocess.Popen): log.info("Received signal: %s. Closing gunicorn.", signum) gunicorn_master_proc.terminate() with suppress(TimeoutError): From b4a2f7bff6d391c073379f7a0e884098745378ba Mon Sep 17 00:00:00 2001 From: "daniel.dylag" Date: Mon, 23 Oct 2023 19:08:49 +0200 Subject: [PATCH 23/23] fix kill_proc copy&paste correct implementation from webserver_command.py --- airflow/cli/commands/internal_api_command.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/airflow/cli/commands/internal_api_command.py b/airflow/cli/commands/internal_api_command.py index e2d33e9a1a4c8..f558c89cab797 100644 --- a/airflow/cli/commands/internal_api_command.py +++ b/airflow/cli/commands/internal_api_command.py @@ -135,7 +135,11 @@ def kill_proc(signum: int, gunicorn_master_proc: psutil.Process | subprocess.Pop gunicorn_master_proc.terminate() with suppress(TimeoutError): gunicorn_master_proc.wait(timeout=30) - if gunicorn_master_proc.is_running(): + if isinstance(gunicorn_master_proc, subprocess.Popen): + still_running = gunicorn_master_proc.poll() is not None + else: + still_running = gunicorn_master_proc.is_running() + if still_running: gunicorn_master_proc.kill() sys.exit(0)