From f6245324557cc88cd22a32a3837a497e5c52db01 Mon Sep 17 00:00:00 2001 From: ltd0924 <32387785+ltd0924@users.noreply.github.com> Date: Thu, 21 Aug 2025 16:38:32 +0800 Subject: [PATCH 1/6] Update expert_service.py --- fastdeploy/engine/expert_service.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/fastdeploy/engine/expert_service.py b/fastdeploy/engine/expert_service.py index 63b1b15beba..8b4874c46b1 100644 --- a/fastdeploy/engine/expert_service.py +++ b/fastdeploy/engine/expert_service.py @@ -59,8 +59,8 @@ def __init__(self, cfg, local_data_parallel_id): self.cfg.disaggregate_info = None self.scheduler = cfg.scheduler_config.scheduler() - - self.scheduler.reset_nodeid(f"{self.scheduler.infer.nodeid}_{local_data_parallel_id!s}") + if cfg.scheduler_config.name == "splitwise": + self.scheduler.reset_nodeid(f"{self.scheduler.infer.nodeid}_{local_data_parallel_id!s}") self.cfg.parallel_config.local_data_parallel_id = local_data_parallel_id @@ -143,11 +143,11 @@ def start(self, ipc_signal_suffix, local_data_parallel_id): self.token_processor.run() self.cfg.init_cache_info() - - role = self.cfg.splitwise_role - host_ip = self.cfg.host_ip - disaggregate = self.cfg.disaggregate_info - self.scheduler.start(role, host_ip, disaggregate) + if cfg.scheduler_config.name == "splitwise": + role = self.cfg.splitwise_role + host_ip = self.cfg.host_ip + disaggregate = self.cfg.disaggregate_info + self.scheduler.start(role, host_ip, disaggregate) self.cfg.print() console_logger.info(f"Worker processes are launched with {time.time() - start_time} seconds.") @@ -363,6 +363,10 @@ def start_expert_service(cfg, local_data_parallel_id, ipc_signal_suffix): expert_service = ExpertService(cfg, local_data_parallel_id) try: expert_service.start(ipc_signal_suffix, local_data_parallel_id) - expert_service.split_connector.start_receiver() + if cfg.splitwise_role != "mixed": + expert_service.split_connector.start_receiver() + else: + while True: + time.sleep(100) except Exception as e: llm_logger.exception(f"Expert service failed to start: {e}") From cae84d46a2e3baf6a820230449963056c0c37651 Mon Sep 17 00:00:00 2001 From: ltd0924 <32387785+ltd0924@users.noreply.github.com> Date: Thu, 21 Aug 2025 16:42:05 +0800 Subject: [PATCH 2/6] Update engine.py --- fastdeploy/engine/engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index 1ae1af568fe..a7d7c383aba 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -123,7 +123,7 @@ def __init__(self, cfg): cfg.max_num_seqs, cfg, cfg.tensor_parallel_size, cfg.splitwise_role ) - os.environ["INFERENCE_MSG_QUEUE_ID"] = str(self.cfg.engine_worker_queue_port) + os.environ["INFERENCE_MSG_QUEUE_ID"] = str(self.cfg.engine_worker_queue_port + self.cfg.worker_num_per_node * self.cfg.node_rank) self.split_connector = SplitwiseConnector(cfg, self.scheduler, self.engine_worker_queue, self.resource_manager) From 8553f20aa71e23f49abb67c831271d144e3f1b89 Mon Sep 17 00:00:00 2001 From: ltd0924 <32387785+ltd0924@users.noreply.github.com> Date: Thu, 21 Aug 2025 16:45:25 +0800 Subject: [PATCH 3/6] Update engine.py --- fastdeploy/engine/engine.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index a7d7c383aba..261bfffcbb6 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -123,8 +123,10 @@ def __init__(self, cfg): cfg.max_num_seqs, cfg, cfg.tensor_parallel_size, cfg.splitwise_role ) - os.environ["INFERENCE_MSG_QUEUE_ID"] = str(self.cfg.engine_worker_queue_port + self.cfg.worker_num_per_node * self.cfg.node_rank) - + os.environ["INFERENCE_MSG_QUEUE_ID"] = str( + self.cfg.engine_worker_queue_port + self.cfg.worker_num_per_node * self.cfg.node_rank + ) + self.split_connector = SplitwiseConnector(cfg, self.scheduler, self.engine_worker_queue, self.resource_manager) self.token_processor = TokenProcessor( From 41b71c78e46f7dc15aaf99f9da9119e6904856e3 Mon Sep 17 00:00:00 2001 From: ltd0924 <32387785+ltd0924@users.noreply.github.com> Date: Thu, 21 Aug 2025 16:54:54 +0800 Subject: [PATCH 4/6] Update engine.py --- fastdeploy/engine/engine.py | 1 + 1 file changed, 1 insertion(+) diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index 261bfffcbb6..da00ba5ec1d 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -135,6 +135,7 @@ def __init__(self, cfg): engine_worker_queue=self.engine_worker_queue, split_connector=self.split_connector, ) + self.token_processor.set_resource_manager(self.resource_manager) self.is_started = False From 1f58f8d56a22e8a5e81c3d88016c7eb791edd768 Mon Sep 17 00:00:00 2001 From: ltd0924 <32387785+ltd0924@users.noreply.github.com> Date: Thu, 21 Aug 2025 16:55:24 +0800 Subject: [PATCH 5/6] Update expert_service.py --- fastdeploy/engine/expert_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fastdeploy/engine/expert_service.py b/fastdeploy/engine/expert_service.py index 8b4874c46b1..0032780b967 100644 --- a/fastdeploy/engine/expert_service.py +++ b/fastdeploy/engine/expert_service.py @@ -143,7 +143,7 @@ def start(self, ipc_signal_suffix, local_data_parallel_id): self.token_processor.run() self.cfg.init_cache_info() - if cfg.scheduler_config.name == "splitwise": + if self.cfg.scheduler_config.name == "splitwise": role = self.cfg.splitwise_role host_ip = self.cfg.host_ip disaggregate = self.cfg.disaggregate_info From 7ef21d37663cd72b50a51f9975f6524c81373a4b Mon Sep 17 00:00:00 2001 From: ltd0924 <32387785+ltd0924@users.noreply.github.com> Date: Thu, 21 Aug 2025 16:57:41 +0800 Subject: [PATCH 6/6] Update engine.py --- fastdeploy/engine/engine.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index da00ba5ec1d..3b1b1e0ce9a 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -126,7 +126,6 @@ def __init__(self, cfg): os.environ["INFERENCE_MSG_QUEUE_ID"] = str( self.cfg.engine_worker_queue_port + self.cfg.worker_num_per_node * self.cfg.node_rank ) - self.split_connector = SplitwiseConnector(cfg, self.scheduler, self.engine_worker_queue, self.resource_manager) self.token_processor = TokenProcessor( @@ -135,7 +134,6 @@ def __init__(self, cfg): engine_worker_queue=self.engine_worker_queue, split_connector=self.split_connector, ) - self.token_processor.set_resource_manager(self.resource_manager) self.is_started = False