From 939222bc64bdfa6418589c653ff0b2f75858bea4 Mon Sep 17 00:00:00 2001 From: Valentyn Tymofieiev Date: Tue, 10 Jun 2025 18:15:25 -0700 Subject: [PATCH] Allow only one thread at a time to start the VLLM server. --- .../ml/inference/vllm_inference.py | 38 ++++++++++--------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/vllm_inference.py b/sdks/python/apache_beam/ml/inference/vllm_inference.py index c4d5aa2dcd37..0bb6ccd6108e 100644 --- a/sdks/python/apache_beam/ml/inference/vllm_inference.py +++ b/sdks/python/apache_beam/ml/inference/vllm_inference.py @@ -114,28 +114,30 @@ def __init__(self, model_name: str, vllm_server_kwargs: dict[str, str]): self._server_started = False self._server_process = None self._server_port: int = -1 + self._server_process_lock = threading.RLock() self.start_server() def start_server(self, retries=3): - if not self._server_started: - server_cmd = [ - sys.executable, - '-m', - 'vllm.entrypoints.openai.api_server', - '--model', - self._model_name, - '--port', - '{{PORT}}', - ] - for k, v in self._vllm_server_kwargs.items(): - server_cmd.append(f'--{k}') - # Only add values for commands with value part. - if v is not None: - server_cmd.append(v) - self._server_process, self._server_port = start_process(server_cmd) - - self.check_connectivity(retries) + with self._server_process_lock: + if not self._server_started: + server_cmd = [ + sys.executable, + '-m', + 'vllm.entrypoints.openai.api_server', + '--model', + self._model_name, + '--port', + '{{PORT}}', + ] + for k, v in self._vllm_server_kwargs.items(): + server_cmd.append(f'--{k}') + # Only add values for commands with value part. + if v is not None: + server_cmd.append(v) + self._server_process, self._server_port = start_process(server_cmd) + + self.check_connectivity(retries) def get_server_port(self) -> int: if not self._server_started: