From 133efbe98157e1a96176b6cb283a7c451b8f6ba5 Mon Sep 17 00:00:00 2001 From: kevin Date: Fri, 28 Nov 2025 19:09:33 +0800 Subject: [PATCH 1/8] fix mm to_dict bug --- fastdeploy/engine/request.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/fastdeploy/engine/request.py b/fastdeploy/engine/request.py index 04efe90e1ee..9b26f8f48b9 100644 --- a/fastdeploy/engine/request.py +++ b/fastdeploy/engine/request.py @@ -16,6 +16,7 @@ from __future__ import annotations +import copy import time import traceback from dataclasses import asdict, dataclass, fields @@ -269,6 +270,21 @@ def __eq__(self, other): def to_dict(self) -> dict: """convert Request into a serializable dict""" + multimodal_inputs = copy.deepcopy(self.multimodal_inputs) + if ( + isinstance(multimodal_inputs, dict) + and isinstance(multimodal_inputs.get("mm_positions"), list) + and len(multimodal_inputs["mm_positions"]) > 0 + ): + # if mm_positions is ImagePosition, convert to dict + try: + for i, mm_pos in enumerate(multimodal_inputs["mm_positions"]): + multimodal_inputs["mm_positions"][i] = ( + asdict(mm_pos) if isinstance(mm_pos, ImagePosition) else mm_pos + ) + except Exception as e: + data_processor_logger.error(f"Convert ImagePosition to dict error: {e}, {str(traceback.format_exc())}") + data = { "request_id": self.request_id, "prompt": self.prompt, @@ -282,7 +298,7 @@ def to_dict(self) -> dict: "arrival_time": self.arrival_time, "preprocess_start_time": self.preprocess_start_time, "preprocess_end_time": self.preprocess_end_time, - "multimodal_inputs": self.multimodal_inputs, + "multimodal_inputs": multimodal_inputs, "multimodal_data": self.multimodal_data, "disable_chat_template": self.disable_chat_template, "disaggregate_info": self.disaggregate_info, From bda1e7037f1f12b53bfe6bd739473c8507930d41 Mon Sep 17 00:00:00 2001 From: kevin Date: Mon, 1 Dec 2025 21:12:30 +0800 Subject: [PATCH 2/8] pd support async download --- fastdeploy/engine/common_engine.py | 37 ++++++++++++++++--- .../engine/sched/resource_manager_v1.py | 8 ++-- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index b7a96489fba..1bf27b5c035 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -718,6 +718,10 @@ def _fetch_request(): self.llm_logger.debug(f"get tasks from {type(self.scheduler)}: {tasks}") if self.cfg.scheduler_config.splitwise_role != "mixed": + if self.cfg.scheduler_config.splitwise_role == "prefill": + for task in tasks: + # start async preprocess + self.resource_manager.apply_async_preprocess(task) need_delete_tasks = [] if envs.FD_OFFLINE_PERF_TEST_FOR_PD: for task in tasks: @@ -770,15 +774,36 @@ def _fetch_request(): self.split_connector.send_cache_info_to_messager(tasks, 0) # ensure cache tasks has sent to cache_messager need_check_req_ids = [task.request_id for task in tasks] + finished_ids, delete_tasks_list = [], [] while need_check_req_ids: - req_ids = self.engine_worker_queue.get_finished_add_cache_task_req() - self.llm_logger.info(f"get_finished_add_cache_task_req: {req_ids}") - if req_ids: - for req_id in req_ids: - assert req_id in need_check_req_ids - need_check_req_ids.remove(req_id) + finished_ids.extend(self.engine_worker_queue.get_finished_add_cache_task_req()) + self.llm_logger.info(f"get_finished_add_cache_task_req: {finished_ids}") + if finished_ids: + for task in tasks: + result = self.resource_manager.waiting_async_process(task) + if result is None: + self.scheduler.put_results( + [ + RequestOutput( + request_id=task.request_id, + finished=True, + error_code=task.error_code, + error_msg=task.error_message, + ) + ] + ) + delete_tasks_list.append(task) + elif result is False: + if task.request_id in finished_ids: + need_check_req_ids.remove(task.request_id) + finished_ids.remove(task.request_id) else: time.sleep(0.001) + + for tmp_task in delete_tasks_list: + tasks.remove(tmp_task) + # release resource in P + self.resource_manager.pre_recycle_resource(tmp_task.request_id) # Fetch requests and add them to the scheduling queue if tasks: for task in tasks: diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index fbe30d24bcf..12ae16e86ff 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -653,7 +653,7 @@ def _allocate_decode_and_extend(): ): break if request.status == RequestStatus.WAITING: - result = self._waiting_async_process(request) + result = self.waiting_async_process(request) if result is None: error_reqs.append((request.request_id, request.error_message)) self.waiting.popleft() @@ -761,7 +761,7 @@ def _allocate_decode_and_extend(): return scheduled_reqs, error_reqs - def _waiting_async_process(self, request: Request) -> None: + def waiting_async_process(self, request: Request) -> None: """ Check if async preprocessing is complete for a request. Args: @@ -780,7 +780,7 @@ def _waiting_async_process(self, request: Request) -> None: request.async_process_futures = [] return False - def _apply_async_preprocess(self, request: Request) -> None: + def apply_async_preprocess(self, request: Request) -> None: request.async_process_futures.append(self.async_preprocess_pool.submit(self._download_features, request)) def _has_features_info(self, task): @@ -903,7 +903,7 @@ def get_prefix_cached_blocks(self, request: Request): def add_request(self, request: Request) -> None: with self.lock: - self._apply_async_preprocess(request) + self.apply_async_preprocess(request) self.waiting.append(request) self.requests[request.request_id] = request From 59ef434665d93980c74793c14e164f0bd7d7f6b3 Mon Sep 17 00:00:00 2001 From: kevin Date: Wed, 3 Dec 2025 11:07:57 +0800 Subject: [PATCH 3/8] update code --- fastdeploy/engine/args_utils.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/fastdeploy/engine/args_utils.py b/fastdeploy/engine/args_utils.py index 77def0feb4d..3b5ed41d9d4 100644 --- a/fastdeploy/engine/args_utils.py +++ b/fastdeploy/engine/args_utils.py @@ -486,8 +486,6 @@ def __post_init__(self): self.tokenizer = self.model if self.splitwise_role == "decode": self.enable_prefix_caching = False - if self.speculative_config is not None: - self.enable_prefix_caching = False if not current_platform.is_cuda() and not current_platform.is_xpu() and not current_platform.is_intel_hpu(): self.enable_prefix_caching = False # if self.dynamic_load_weight: From dea58b8cc5701c765f60a76146fe72c08a66f5d8 Mon Sep 17 00:00:00 2001 From: kevin Date: Wed, 3 Dec 2025 11:39:33 +0800 Subject: [PATCH 4/8] update test case --- tests/v1/test_resource_manager_v1.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/v1/test_resource_manager_v1.py b/tests/v1/test_resource_manager_v1.py index 6d2ae88e950..038a18b403e 100644 --- a/tests/v1/test_resource_manager_v1.py +++ b/tests/v1/test_resource_manager_v1.py @@ -54,7 +54,7 @@ def setUp(self): def test_waiting_async_process_no_futures(self): """Test when there are no async process futures""" - result = self.manager._waiting_async_process(self.request) + result = self.manager.waiting_async_process(self.request) self.assertFalse(result) def test_waiting_async_process_future_done_no_error(self): @@ -63,7 +63,7 @@ def test_waiting_async_process_future_done_no_error(self): future.set_result(True) self.request.async_process_futures = [future] - result = self.manager._waiting_async_process(self.request) + result = self.manager.waiting_async_process(self.request) self.assertFalse(result) self.assertEqual(len(self.request.async_process_futures), 0) @@ -74,7 +74,7 @@ def test_waiting_async_process_future_done_with_error(self): self.request.async_process_futures = [future] self.request.error_message = "Download failed" - result = self.manager._waiting_async_process(self.request) + result = self.manager.waiting_async_process(self.request) self.assertIsNone(result) def test_waiting_async_process_future_not_done(self): @@ -82,7 +82,7 @@ def test_waiting_async_process_future_not_done(self): future = concurrent.futures.Future() self.request.async_process_futures = [future] - result = self.manager._waiting_async_process(self.request) + result = self.manager.waiting_async_process(self.request) self.assertTrue(result) self.assertEqual(len(self.request.async_process_futures), 1) @@ -90,7 +90,7 @@ def test_apply_async_preprocess(self): """Test applying async preprocess""" with patch.object(self.manager.async_preprocess_pool, "submit") as mock_submit: mock_submit.return_value = "mock_future" - self.manager._apply_async_preprocess(self.request) + self.manager.apply_async_preprocess(self.request) mock_submit.assert_called_once_with(self.manager._download_features, self.request) self.assertEqual(len(self.request.async_process_futures), 1) From 6e883150cd5730780d702a8982850bd9e6d57e93 Mon Sep 17 00:00:00 2001 From: kevin Date: Wed, 3 Dec 2025 13:16:45 +0800 Subject: [PATCH 5/8] update log --- fastdeploy/engine/common_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 1bf27b5c035..db981b518d2 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -777,7 +777,7 @@ def _fetch_request(): finished_ids, delete_tasks_list = [], [] while need_check_req_ids: finished_ids.extend(self.engine_worker_queue.get_finished_add_cache_task_req()) - self.llm_logger.info(f"get_finished_add_cache_task_req: {finished_ids}") + self.llm_logger.debug(f"get_finished_add_cache_task_req: {finished_ids}") if finished_ids: for task in tasks: result = self.resource_manager.waiting_async_process(task) From 4f1a015c8a33bbf8bcb9df2ee4d661560dbe7761 Mon Sep 17 00:00:00 2001 From: kevin Date: Wed, 3 Dec 2025 13:39:44 +0800 Subject: [PATCH 6/8] Revert "update log" This reverts commit 6e883150cd5730780d702a8982850bd9e6d57e93. --- fastdeploy/engine/common_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index db981b518d2..1bf27b5c035 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -777,7 +777,7 @@ def _fetch_request(): finished_ids, delete_tasks_list = [], [] while need_check_req_ids: finished_ids.extend(self.engine_worker_queue.get_finished_add_cache_task_req()) - self.llm_logger.debug(f"get_finished_add_cache_task_req: {finished_ids}") + self.llm_logger.info(f"get_finished_add_cache_task_req: {finished_ids}") if finished_ids: for task in tasks: result = self.resource_manager.waiting_async_process(task) From 549f8ef6c6af7c94c3888731a43a5aaf8a9b1adf Mon Sep 17 00:00:00 2001 From: kevin Date: Thu, 4 Dec 2025 13:05:29 +0800 Subject: [PATCH 7/8] update code --- fastdeploy/engine/request.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/fastdeploy/engine/request.py b/fastdeploy/engine/request.py index 9b26f8f48b9..4e3d2b04eee 100644 --- a/fastdeploy/engine/request.py +++ b/fastdeploy/engine/request.py @@ -183,6 +183,22 @@ def __init__( self.error_message = None self.error_code = None + def __getstate__(self): + """ + Custom getstate method for pickle support. + Handles unpicklable attributes by filtering them from __dict__. + """ + # Create a filtered dictionary without problematic attributes + filtered_dict = {} + for key, value in self.__dict__.items(): + # Skip attributes that are known to contain unpicklable objects + if key == "async_process_futures": + filtered_dict[key] = [] + else: + filtered_dict[key] = value + + return filtered_dict + @classmethod def from_dict(cls, d: dict): data_processor_logger.debug(f"{d}") From d3c7fb030151b08c0f682980ce29d6b5701a585b Mon Sep 17 00:00:00 2001 From: kevin Date: Fri, 5 Dec 2025 13:47:36 +0800 Subject: [PATCH 8/8] fix mtp bug --- fastdeploy/spec_decode/mtp.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/fastdeploy/spec_decode/mtp.py b/fastdeploy/spec_decode/mtp.py index 611c3ab5f93..420f1472105 100644 --- a/fastdeploy/spec_decode/mtp.py +++ b/fastdeploy/spec_decode/mtp.py @@ -176,9 +176,7 @@ def initialize_kv_cache(self, main_model_num_blocks, profile: bool = False): if kv_cache_quant_type == "block_wise_fp8": kv_cache_scale_shape = [key_cache_shape[0], key_cache_shape[1], key_cache_shape[2]] local_rank = self.local_rank % self.parallel_config.tensor_parallel_size - if not profile and ( - self.cache_config.enable_prefix_caching or self.scheduler_config.splitwise_role != "mixed" - ): + if not profile and self.scheduler_config.splitwise_role != "mixed": cache_kvs_list = [] for i in range( self.num_main_model_layers,