-
Notifications
You must be signed in to change notification settings - Fork 691
[Optimize] Robust stability for PD deployment #5338
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
[Optimize] Robust stability for PD deployment #5338
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR enhances the robustness and stability of PD (Prefill-Decode) disaggregated deployment by adding comprehensive error handling, health monitoring, and timeout mechanisms to prevent resource leaks and service disruptions.
Key Changes:
- Added communication status tracking for P-D and D-P message exchanges with proper error handling and retry logic
- Implemented health check mechanism for token processor to detect cache messenger process failures
- Added timeout-based resource reclamation in D instance to prevent block leakage when P fails to send first token
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
fastdeploy/splitwise/splitwise_connector.py |
Modified message sending functions to return success status, configured ZMQ immediate mode to prevent message caching, added error tracking |
fastdeploy/splitwise/internal_adapter_utils.py |
Added health check command handler to monitor token processor health status |
fastdeploy/output/token_processor.py |
Implemented health monitoring with timestamps tracking batch processing lifecycle, repositioned resource manager logging |
fastdeploy/envs.py |
Added timeout configuration for first token from P (300s) and token processor health check (120s), reduced prefill wait time from 30s to 5s |
fastdeploy/engine/sched/resource_manager_v1.py |
Added background thread to monitor and recycle preallocated resources that timeout waiting for first token, implemented duplicate request detection |
fastdeploy/engine/common_engine.py |
Added duplicate request ID validation, communication failure handling with error reporting to scheduler, fixed error handling for D resource allocation failures |
| self._close_connection(addr) | ||
|
|
||
| except Exception as e: | ||
| self.logger.error(f"Message preparation failed: {e}") |
Copilot
AI
Dec 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When an exception occurs in the outer try block (line 176-177), the function returns is_successful which is still True. The function should set is_successful = False before returning, or return False explicitly when an exception is caught.
| self.logger.error(f"Message preparation failed: {e}") | |
| self.logger.error(f"Message preparation failed: {e}") | |
| is_successful = False |
| try: | ||
| with self.lock: | ||
| need_recycle_request_ids = [] | ||
| for request_id, timestamp in self.preallocated_requests_timestamp.items(): |
Copilot
AI
Dec 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iterating over self.preallocated_requests_timestamp.items() while potentially modifying it in another thread can cause a RuntimeError ("dictionary changed size during iteration"). Even though this is inside a lock, the dictionary could be modified by other methods like add_prefilled_request (line 1082) after the lock is released but before the next iteration of the while loop. Consider using list(self.preallocated_requests_timestamp.items()) to create a snapshot for iteration.
| for request_id, timestamp in self.preallocated_requests_timestamp.items(): | |
| for request_id, timestamp in list(self.preallocated_requests_timestamp.items()): |
| """ | ||
| with self.health_lock: | ||
| if self.timestamp_for_alive_after_handle_batch is None: # has entered handle batch | ||
| if time.time() - self.timestamp_for_alive_before_handle_batch > envs.FD_TOKEN_PROCESSOR_HEALTH_TIMEOUT: |
Copilot
AI
Dec 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The health check has a potential race condition. If timestamp_for_alive_before_handle_batch is None when checked, the subtraction time.time() - self.timestamp_for_alive_before_handle_batch will raise a TypeError. This can happen before the first batch is processed or when exiting batch processing. Add a null check: if self.timestamp_for_alive_before_handle_batch is not None and time.time() - self.timestamp_for_alive_before_handle_batch > ...
| if time.time() - self.timestamp_for_alive_before_handle_batch > envs.FD_TOKEN_PROCESSOR_HEALTH_TIMEOUT: | |
| if ( | |
| self.timestamp_for_alive_before_handle_batch is not None | |
| and time.time() - self.timestamp_for_alive_before_handle_batch > envs.FD_TOKEN_PROCESSOR_HEALTH_TIMEOUT | |
| ): |
| for task in tasks: | ||
| if self.resource_manager.has_existed_request(task.request_id): | ||
| self.llm_logger.error( | ||
| f"request_id: {task.request_id} has been added to scheduler, recieved requests with same request_id." |
Copilot
AI
Dec 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: "recieved" should be "received"
| f"request_id: {task.request_id} has been added to scheduler, recieved requests with same request_id." | |
| f"request_id: {task.request_id} has been added to scheduler, received requests with same request_id." |
| is_successful = self.split_connector.send_splitwise_tasks([task], task.idx) | ||
| if not is_successful: # Send request for block ids to D failed |
Copilot
AI
Dec 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return value type of send_splitwise_tasks is being checked as a boolean, but the function actually returns a dictionary. In Python, non-empty dictionaries are truthy, which means if not is_successful will be False even when all send operations failed. This check should verify that all operations succeeded, e.g., if not is_successful or not all(is_successful.values()):
| for task in tasks: | ||
| if self.resource_manager.has_existed_request(task.request_id): | ||
| self.llm_logger.error( | ||
| f"request_id: {task.request_id} has been added to scheduler, recieved requests with same request_id." |
Copilot
AI
Dec 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: "recieved" should be "received"
| f"request_id: {task.request_id} has been added to scheduler, recieved requests with same request_id." | |
| f"request_id: {task.request_id} has been added to scheduler, received requests with same request_id." |
| @@ -150,7 +150,11 @@ | |||
| "FD_ENABLE_PDL": lambda: int(os.getenv("FD_ENABLE_PDL", "1")), | |||
| # "Number of tokens in the group for Mixture of Experts (MoE) computation processing on HPU" | |||
| "FD_HPU_CHUNK_SIZE": lambda: int(os.getenv("FD_HPU_CHUNK_SIZE", "64")), | |||
Copilot
AI
Dec 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The default value for FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS was changed from 30 seconds to 5 seconds. This is a significant behavior change that reduces the wait time by 83%. Consider documenting the rationale for this change in the PR description, as it may impact production deployments and timeout behavior.
| "FD_HPU_CHUNK_SIZE": lambda: int(os.getenv("FD_HPU_CHUNK_SIZE", "64")), | |
| "FD_HPU_CHUNK_SIZE": lambda: int(os.getenv("FD_HPU_CHUNK_SIZE", "64")), | |
| # [IMPORTANT] Default value changed from 30 seconds to 5 seconds (reduced by 83%). | |
| # Rationale: Shorter wait time improves responsiveness and aligns with observed resource decode times. | |
| # WARNING: This may impact production deployments and timeout behavior. Adjust via environment variable if needed. |
| request = self.requests[request_id] | ||
| self.prerelease_resource(request) |
Copilot
AI
Dec 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential KeyError if request_id has already been removed from self.requests between the dictionary iteration and this access. The code should check if the request_id exists: if request_id in self.requests: request = self.requests[request_id]; self.prerelease_resource(request). Otherwise, the monitoring thread could crash when a request is removed by another thread.
| request = self.requests[request_id] | |
| self.prerelease_resource(request) | |
| if request_id in self.requests: | |
| request = self.requests[request_id] | |
| self.prerelease_resource(request) |
| self.split_connector.send_cache_info_to_prefill([task]) | ||
| send_info_to_p_successful = self.split_connector.send_cache_info_to_prefill([task]) | ||
| if send_info_to_p_successful is False: | ||
| self.resource_manager.pre_recycle_resource(task.request_id) |
Copilot
AI
Dec 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling pre_recycle_resource without checking if the request was actually allocated. At this point (line 1182-1184), is_success is False, which means preallocate_resource_in_d was never called or failed. Recycling resources that were never allocated could lead to corruption of the resource manager state. This code block should only recycle if resources were actually preallocated.
| splitwise_task_send_status[task.request_id] = is_successful | ||
| task.disaggregate_info["cache_info"] = decode_diagg | ||
| task.disaggregate_info["role"] = "prefill" | ||
| return splitwise_task_send_status |
Copilot
AI
Dec 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function returns a dictionary mapping request IDs to success status, but callers expect a boolean. This API design mismatch causes incorrect behavior - an empty dictionary evaluates to False, but a non-empty dictionary always evaluates to True regardless of success status. Consider either: (1) returning a single boolean value (e.g., all(splitwise_task_send_status.values()) if the dict is not empty, else True), or (2) updating all callers to properly handle the dictionary return type.
| return splitwise_task_send_status | |
| return all(splitwise_task_send_status.values()) if splitwise_task_send_status else True |
| need_delete_tasks = [] | ||
| if envs.FD_OFFLINE_PERF_TEST_FOR_PD: | ||
| for task in tasks: | ||
| if self.resource_manager.has_existed_request(task.request_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
建议提前检查这个问题,比如请求插入 scheduler 之前就检查
| is_successful = self.split_connector.send_splitwise_tasks([task], task.idx) | ||
| if not is_successful: # Send request for block ids to D failed | ||
| self.llm_logger.error(f"{task.request_id} send request for block ids to D failed.") | ||
| self.scheduler.put_results( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这些给 scheduler 的错误信息,可以抽象一个简单函数,多处用到了
| self.llm_logger.info(f"Resource available, processing task {task.request_id}") | ||
| self.split_connector.send_cache_info_to_prefill([task]) | ||
| send_info_to_p_successful = self.split_connector.send_cache_info_to_prefill([task]) | ||
| if send_info_to_p_successful is False: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if not bool_value:
|
Thanks for your contribution! |
Motivation
Modifications
Usage or Command
Accuracy Tests
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.