From a1c49a1875bd087d3ea541f5ec5c6f6e157fdac8 Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Fri, 8 Aug 2025 05:34:00 +0000 Subject: [PATCH 1/3] [fix] fix terminal hangs when worker process is dead --- fastdeploy/engine/engine.py | 11 ++++++++++ fastdeploy/entrypoints/openai/api_server.py | 23 +++++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index 44466797a3d..de73974ad59 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -964,7 +964,9 @@ def _exit_sub_services(self): exit sub services """ self.running = False + llm_logger.info("Engine shut down, exiting sub services...") + # Clear cache manager process if hasattr(self, "cache_manager_processes"): self.resource_manager.cache_manager.shm_cache_task_flag_broadcast.clear() self.resource_manager.cache_manager.cache_ready_signal.clear() @@ -974,6 +976,8 @@ def _exit_sub_services(self): os.killpg(p.pid, signal.SIGTERM) except Exception as e: print(f"Error extracting file: {e}") + + # Clear signals self.worker_ready_signal.clear() self.exist_task_signal.clear() self.exist_swapped_task_signal.clear() @@ -982,15 +986,22 @@ def _exit_sub_services(self): if hasattr(self, "get_profile_block_num_signal"): self.get_profile_block_num_signal.clear() self.model_weights_status_signal.clear() + + # Clear worker process if hasattr(self, "worker_proc") and self.worker_proc is not None: try: os.killpg(self.worker_proc.pid, signal.SIGTERM) except Exception as e: print(f"Error extracting sub services: {e}") + # Clear engine worker queue self.engine_worker_queue.cleanup() + + # Clear zmq server if hasattr(self, "zmq_server") and self.zmq_server is not None: self.zmq_server.close() + + # Clear DP processes if hasattr(self, "dp_processed"): for p in self.dp_processed: p.join() diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index 0f9467c5498..241580329cd 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -15,6 +15,7 @@ """ import os +import signal import threading import time from contextlib import asynccontextmanager @@ -392,6 +393,27 @@ def launch_controller_server(): time.sleep(1) +def launch_worker_monitor(): + """ + Detect whether worker process is alive. If not, stop the API serverby triggering llm_engine. + """ + + def _monitor(): + global llm_engine + while True: + if hasattr(llm_engine, "worker_proc") and llm_engine.worker_proc.poll() is not None: + console_logger.error( + f"Worker process has died in the background (code={llm_engine.worker_proc.returncode}). API server is forced to stop." + ) + os.kill(os.getpid(), signal.SIGINT) + break + time.sleep(1) + + worker_monitor_thread = threading.Thread(target=_monitor, daemon=True) + worker_monitor_thread.start() + time.sleep(1) + + def main(): """main函数""" @@ -399,6 +421,7 @@ def main(): if load_engine() is None: return + launch_worker_monitor() launch_controller_server() launch_metrics_server() launch_api_server() From 75967fc991498e313fd8daaa1e4f505992643565 Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Mon, 25 Aug 2025 10:45:49 +0000 Subject: [PATCH 2/3] [chore] change sleep time of monitor --- fastdeploy/entrypoints/openai/api_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index 6a01e2f9655..341217ca579 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -523,7 +523,7 @@ def _monitor(): ) os.kill(os.getpid(), signal.SIGINT) break - time.sleep(1) + time.sleep(5) worker_monitor_thread = threading.Thread(target=_monitor, daemon=True) worker_monitor_thread.start() From 604805cd6d6e3ef72fbedc756395ebcadbd91fcb Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Mon, 20 Oct 2025 18:20:44 +0800 Subject: [PATCH 3/3] [chore] remove redundant comments --- fastdeploy/engine/engine.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index 8efe488d5cb..1cc2cfb8ae2 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -388,7 +388,6 @@ def _exit_sub_services(self): self.running = False llm_logger.info("Engine shut down, exiting sub services...") - # Clear cache manager process if hasattr(self, "cache_manager_processes"): self.engine.resource_manager.cache_manager.shm_cache_task_flag_broadcast.clear() self.engine.resource_manager.cache_manager.cache_ready_signal.clear() @@ -407,7 +406,6 @@ def _exit_sub_services(self): if hasattr(self, "get_profile_block_num_signal"): self.get_profile_block_num_signal.clear() - # Clear worker process if hasattr(self, "worker_proc") and self.worker_proc is not None: try: pgid = os.getpgid(self.worker_proc.pid) @@ -415,11 +413,9 @@ def _exit_sub_services(self): except Exception as e: console_logger.error(f"Error extracting sub services: {e}, {str(traceback.format_exc())}") - # Clear zmq server if hasattr(self, "zmq_server") and self.zmq_server is not None: self.zmq_server.close() - # Clear DP processes if hasattr(self, "dp_processed"): for p in self.dp_processed: console_logger.info(f"Waiting for worker {p.pid} to exit")