Skip to content

[PD Disaggregation] support DP via v1 router and decouple DP and EP#5197

Merged
Jiang-Jia-Jun merged 11 commits intoPaddlePaddle:developfrom
liyonghua0910:develop+router_v1_dp
Dec 4, 2025
Merged

[PD Disaggregation] support DP via v1 router and decouple DP and EP#5197
Jiang-Jia-Jun merged 11 commits intoPaddlePaddle:developfrom
liyonghua0910:develop+router_v1_dp

Conversation

@liyonghua0910
Copy link
Collaborator

@liyonghua0910 liyonghua0910 commented Nov 24, 2025

Motivation

支持 V1 调度下用 Router 跑非 EP 模型的 DP 并行模式

Modifications

修改了部分 EP 与 DP 耦合的逻辑

Usage or Command

可以用 Router 按照 examples/splitwise/start_v1_dp2.sh 的示例方式启动。

bash examples/splitwise/start_v1_dp2.sh

Accuracy Tests

image

Checklist

  • Add at least a tag in the PR title.
    • Tag list: [[FDConfig],[APIServer],[Engine], [Scheduler], [PD Disaggregation], [Executor], [Graph Optimization], [Speculative Decoding], [RL], [Models], [Quantization], [Loader], [OP], [KVCache], [DataProcessor], [BugFix], [Docs], [CI], [Optimization], [Feature], [Benchmark], [Others], [XPU], [HPU], [GCU], [DCU], [Iluvatar], [Metax]]
    • You can add new tags based on the PR content, but the semantics must be clear.
  • Format your code, run pre-commit before commit.
  • Add unit tests. Please write the reason in this PR if no unit tests.
  • Provide accuracy results.
  • If the current PR is submitting to the release branch, make sure the PR has been submitted to the develop branch, then cherry-pick it to the release branch with the [Cherry-Pick] PR tag.

@paddle-bot
Copy link

paddle-bot bot commented Nov 24, 2025

Thanks for your contribution!

@codecov-commenter
Copy link

codecov-commenter commented Nov 24, 2025

Codecov Report

❌ Patch coverage is 10.00000% with 63 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (develop@209006e). Learn more about missing BASE report.

Files with missing lines Patch % Lines
fastdeploy/splitwise/splitwise_connector.py 8.33% 21 Missing and 1 partial ⚠️
fastdeploy/engine/common_engine.py 0.00% 17 Missing and 1 partial ⚠️
fastdeploy/cache_manager/cache_messager.py 14.28% 6 Missing ⚠️
fastdeploy/engine/expert_service.py 0.00% 4 Missing ⚠️
fastdeploy/router/router.py 0.00% 4 Missing ⚠️
fastdeploy/worker/worker_process.py 0.00% 2 Missing and 2 partials ⚠️
fastdeploy/inter_communicator/zmq_server.py 0.00% 3 Missing ⚠️
...stdeploy/inter_communicator/engine_worker_queue.py 50.00% 1 Missing ⚠️
fastdeploy/output/token_processor.py 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             develop    #5197   +/-   ##
==========================================
  Coverage           ?   59.72%           
==========================================
  Files              ?      325           
  Lines              ?    40285           
  Branches           ?     6100           
==========================================
  Hits               ?    24059           
  Misses             ?    14337           
  Partials           ?     1889           
Flag Coverage Δ
GPU 59.72% <10.00%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

args = parse_args()
rank_id = args.rank + args.local_data_parallel_id * args.mp_num
logger = get_logger("cache_messager", f"cache_messager_rank{rank_id}.log")
logger = get_logger("cache_messager", f"cache_messager_tprank{args.rank}.log")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multi_api_server启动多 dp,不同 tp 的 log 在不同目录?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

每个 dp 各有一个目录,每个 dp 内部 cache messager 按 tp 数量启动,所以每个 dp 的目录下有分 tp0, tp1, ...

+ f" --speculative_config '{self.speculative_config.to_json_string()}'"
+ (" --create_cache_tensor" if create_cache_tensor else "")
+ f" >{log_dir}/launch_cache_transfer_manager_{int(device_ids[i])}.log 2>&1"
+ f" >{log_dir}/launch_cache_transfer_manager_tprank{i}.log 2>&1"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是用 local_tp_rank 逻辑序号代替原本的 device_id 物理序号的文件名后缀,和上面统一

time.sleep(0.005)
self.llm_logger.info(f"ask D resource for req_id: {task.request_id}")
self.llm_logger.debug(f"P has allocated resources for request: {task.request_id}")
self.llm_logger.debug(f"P is asking D to allocate resource for request: {task.request_id}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个文件新增的 log 有点多,可以稍微减少点


env = os.environ.copy()
env["FD_LOG_DIR"] = env.get("FD_LOG_DIR", "log") + f"/log_{i}"
env["FD_LOG_DIR"] = env.get("FD_LOG_DIR", "log") + f"/server_{i}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这修改会不会导致其他同学突然找不到 log 目录? log_xx 挺直观,非必要可以不修改

self.cfg.parallel_config.enable_expert_parallel
and self.cfg.parallel_config.data_parallel_size > 1
):
elif self.cfg.parallel_config.data_parallel_size > 1:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

多 dp 下也是复用get_output_ep算子读取?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_output_ep 和 get_output 的区别是删去了 rank>0 时直接 return 的逻辑,让每个 DP 都从自己的消息队列拿 token,个人感觉更贴合 DP 的概念而不是 EP


async def handle_splitwise_request(self, request_data: dict, endpoint_name: str):
logger.debug(f"Received request: {request_data}")
logger.info(f"Received request: {request_data}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议改成 debug,可以 info 输出 request_id,要不日志太大了

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这是 router 的日志,还没有分配 req id,我改回 debug 吧

task.disaggregate_info["cache_info"]["rdma"]["current_id"] = current_id
task.disaggregate_info["role"] = "decode"
self.logger.debug(f"send task to coupled instance, {addr}, {task}")
self.logger.info(f"send_splitwise_tasks: protocol=rdma, addr={addr}, task={task}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task 太大,改成 debug 吧

juncaipeng
juncaipeng previously approved these changes Dec 4, 2025
Copy link
Collaborator

@juncaipeng juncaipeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Collaborator

@juncaipeng juncaipeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ci 中有2卡机器,也可以使用0.3b 模型,添加一个Pdp2 Ddp2的ci 单侧,手动指定 num_block是可以起起来四个服务的。 ci都跑过了,可以先合入这个 pr,另外一个 pr 来加。

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR enables Data Parallel (DP) support via the v1 router for non-Expert Parallel (EP) models in PD disaggregation mode, decoupling DP and EP logic to allow DP to work independently of EP.

Key Changes:

  • Moved DP configuration logic outside EP-specific conditions in worker initialization
  • Updated logging throughout to distinguish between DP and EP contexts
  • Fixed device and port indexing to use local_data_parallel_id for DP-specific operations

Reviewed changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
fastdeploy/worker/worker_process.py Decouples DP configuration from EP, moves local_data_parallel_id calculation outside EP condition
fastdeploy/splitwise/splitwise_connector.py Updates logging and device/port indexing to use DP-specific IDs instead of assuming EP
fastdeploy/router/router.py Adds debug logging and fixes port type casting
fastdeploy/output/token_processor.py Removes EP condition for DP output handling
fastdeploy/inter_communicator/zmq_server.py Adds ZMQ error handling
fastdeploy/inter_communicator/engine_worker_queue.py Adds debug logging for task operations
fastdeploy/engine/expert_service.py Makes launched_expert_service_signal EP-specific
fastdeploy/engine/common_engine.py Updates logging to use DP-specific naming and improves log messages
fastdeploy/config.py Adds data_parallel_rank field and fixes pd_comm_port indexing
fastdeploy/cache_manager/prefix_cache_manager.py Updates log file naming from device ID to TP rank
fastdeploy/cache_manager/cache_transfer_manager.py Updates log file naming and removes unused rank_id calculation
fastdeploy/cache_manager/cache_messager.py Updates log file naming, improves logging, removes unused rank_id
examples/splitwise/utils.sh Adds port checking and health monitoring utilities
examples/splitwise/start_v1_dp2.sh New example script for v1 DP deployment
custom_ops/gpu_ops/get_output.cc Reformats indentation to match project style

check_ports() {
for port in "$@"; do
if ss -tuln | grep -q ":$port "; then
if $(is_port_free $port); then
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic in check_ports function is inverted. Currently, it returns an error when ports are free instead of when they are in use.

The function is_port_free returns 0 (success/true in bash) when a port is free, and 1 (failure/false) when it's occupied. However, the condition if $(is_port_free $port) treats return code 0 as true, so it prints the error message when the port is free, which is backwards.

Fix by inverting the condition:

if ! is_port_free $port; then
    echo "❌ Port $port is already in use"
    return 1
fi
Suggested change
if $(is_port_free $port); then
if ! is_port_free $port; then

Copilot uses AI. Check for mistakes.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@Jiang-Jia-Jun Jiang-Jia-Jun merged commit f4119d5 into PaddlePaddle:develop Dec 4, 2025
14 of 17 checks passed
liyonghua0910 added a commit to liyonghua0910/FastDeploy that referenced this pull request Dec 5, 2025
…addlePaddle#5197)

* [fix] support DP via v1 router and decouple DP and EP

* [fix] fix scripts

* [fix] reset model path

* [fix] dp use get_output_ep, fix router port type, update scripts

* [merge] merge with latest code

* [chore] remove some debug log

* [fix] fix code style check

* [fix] fix test_multi_api_server for log_dir name

* [chore] reduce logs

* Apply suggestions from code review

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants