diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index 4dec5b982b2..1cc2cfb8ae2 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -386,6 +386,7 @@ def _exit_sub_services(self): exit sub services """ self.running = False + llm_logger.info("Engine shut down, exiting sub services...") if hasattr(self, "cache_manager_processes"): self.engine.resource_manager.cache_manager.shm_cache_task_flag_broadcast.clear() @@ -404,6 +405,7 @@ def _exit_sub_services(self): if hasattr(self, "get_profile_block_num_signal"): self.get_profile_block_num_signal.clear() + if hasattr(self, "worker_proc") and self.worker_proc is not None: try: pgid = os.getpgid(self.worker_proc.pid) @@ -413,6 +415,7 @@ def _exit_sub_services(self): if hasattr(self, "zmq_server") and self.zmq_server is not None: self.zmq_server.close() + if hasattr(self, "dp_processed"): for p in self.dp_processed: console_logger.info(f"Waiting for worker {p.pid} to exit") diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index 766a07b6acf..f208b6634a4 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -17,6 +17,7 @@ import asyncio import json import os +import signal import threading import time import traceback @@ -650,6 +651,27 @@ def launch_controller_server(): time.sleep(1) +def launch_worker_monitor(): + """ + Detect whether worker process is alive. If not, stop the API serverby triggering llm_engine. + """ + + def _monitor(): + global llm_engine + while True: + if hasattr(llm_engine, "worker_proc") and llm_engine.worker_proc.poll() is not None: + console_logger.error( + f"Worker process has died in the background (code={llm_engine.worker_proc.returncode}). API server is forced to stop." + ) + os.kill(os.getpid(), signal.SIGINT) + break + time.sleep(5) + + worker_monitor_thread = threading.Thread(target=_monitor, daemon=True) + worker_monitor_thread.start() + time.sleep(1) + + def main(): """main函数""" if args.local_data_parallel_id == 0: @@ -663,6 +685,7 @@ def main(): console_logger.info(f"Launching chat completion service at http://{args.host}:{args.port}/v1/chat/completions") console_logger.info(f"Launching completion service at http://{args.host}:{args.port}/v1/completions") + launch_worker_monitor() launch_controller_server() launch_metrics_server() launch_api_server()