Skip to content
3 changes: 3 additions & 0 deletions fastdeploy/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ def _exit_sub_services(self):
exit sub services
"""
self.running = False
llm_logger.info("Engine shut down, exiting sub services...")

if hasattr(self, "cache_manager_processes"):
self.engine.resource_manager.cache_manager.shm_cache_task_flag_broadcast.clear()
Expand All @@ -404,6 +405,7 @@ def _exit_sub_services(self):

if hasattr(self, "get_profile_block_num_signal"):
self.get_profile_block_num_signal.clear()

if hasattr(self, "worker_proc") and self.worker_proc is not None:
try:
pgid = os.getpgid(self.worker_proc.pid)
Expand All @@ -413,6 +415,7 @@ def _exit_sub_services(self):

if hasattr(self, "zmq_server") and self.zmq_server is not None:
self.zmq_server.close()

if hasattr(self, "dp_processed"):
for p in self.dp_processed:
console_logger.info(f"Waiting for worker {p.pid} to exit")
Expand Down
23 changes: 23 additions & 0 deletions fastdeploy/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import asyncio
import json
import os
import signal
import threading
import time
import traceback
Expand Down Expand Up @@ -650,6 +651,27 @@ def launch_controller_server():
time.sleep(1)


def launch_worker_monitor():
"""
Detect whether worker process is alive. If not, stop the API serverby triggering llm_engine.
"""

def _monitor():
global llm_engine
while True:
if hasattr(llm_engine, "worker_proc") and llm_engine.worker_proc.poll() is not None:
console_logger.error(
f"Worker process has died in the background (code={llm_engine.worker_proc.returncode}). API server is forced to stop."
)
os.kill(os.getpid(), signal.SIGINT)
break
time.sleep(5)

worker_monitor_thread = threading.Thread(target=_monitor, daemon=True)
worker_monitor_thread.start()
time.sleep(1)


def main():
"""main函数"""
if args.local_data_parallel_id == 0:
Expand All @@ -663,6 +685,7 @@ def main():
console_logger.info(f"Launching chat completion service at http://{args.host}:{args.port}/v1/chat/completions")
console_logger.info(f"Launching completion service at http://{args.host}:{args.port}/v1/completions")

launch_worker_monitor()
launch_controller_server()
launch_metrics_server()
launch_api_server()
Expand Down
Loading