[PD Disaggregation] Add device_id to distinguish the pipelines of sending kv signals in different services.#5508
[PD Disaggregation] Add device_id to distinguish the pipelines of sending kv signals in different services.#5508juncaipeng wants to merge 1 commit intoPaddlePaddle:developfrom
Conversation
|
Thanks for your contribution! |
| const paddle::Tensor &seq_lens_this_time_tensor, | ||
| const paddle::Tensor &seq_lens_decoder_tensor, | ||
| const int rank, | ||
| const int device_id, |
There was a problem hiding this comment.
只加了 device_id,其他都是 format自动修改
| #define MAX_BSZ 512 | ||
| void GetOutputKVSignal(const paddle::Tensor& x, | ||
| int64_t rank_id, | ||
| int64_t device_id, |
There was a problem hiding this comment.
只加了 device_id,其他都是 format自动修改
There was a problem hiding this comment.
Pull request overview
This PR adds a device_id parameter to distinguish KV signal communication pipelines for different services in the PD (Prefix Disaggregation) feature. Multiple P services can now operate independently on different devices without pipeline conflicts.
Key Changes:
- Thread
device_idparameter through all attention backend layers (Flash, MLA, XPU, Append) - Update IPC message queue ID calculation to incorporate device_id:
msg_id = 1024 + 1000 * device_id + rank - Add device_id parameter to C++ custom ops for both GPU and XPU platforms
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| fastdeploy/model_executor/layers/attention/*.py | Add device_id parameter to init_kv_signal_per_query calls in all attention backends |
| fastdeploy/model_executor/layers/attention/ops/init_kv_signal_per_query.py | Update function signature to accept device_id parameter |
| fastdeploy/cache_manager/cache_messager.py | Pass gpu_id to get_output_kv_signal and add debug logging |
| custom_ops/xpu_ops/src/ops/remote_cache_kv_ipc.h | Update msg_id calculation to include device_id |
| custom_ops/xpu_ops/src/ops/*.cc | Add device_id parameter to function signatures and pybind declarations |
| custom_ops/gpu_ops/remote_cache_kv_ipc.h | Update msg_id calculation to include device_id |
| custom_ops/gpu_ops/*.cc | Add device_id parameter, update pybind, and apply code formatting improvements |
| #include "paddle/extension.h" | ||
| #include "remote_cache_kv_ipc.h" | ||
| #include "paddle/phi/core/allocator.h" | ||
| #include "paddle/phi/core/dense_tensor.h" | ||
| #include "remote_cache_kv_ipc.h" |
There was a problem hiding this comment.
The include order has been changed - "remote_cache_kv_ipc.h" was moved after the paddle includes. While this change appears to follow a more standard convention (system/library headers before local headers), ensure this doesn't introduce any compilation issues due to dependencies between headers.
| if (!inited) { | ||
| // just init once | ||
| const int msg_id = 1024 + rank; | ||
| const int msg_id = 1024 + 1000 * device_id + rank; |
There was a problem hiding this comment.
The message queue ID calculation uses 1024 + 1000 * device_id + rank. This formula could lead to collisions if rank >= 1000 or if there are many devices. For example:
- device_id=0, rank=1024 would give msg_id=2048
- device_id=1, rank=24 would also give msg_id=2048
Consider using a safer calculation formula such as 1024 + device_id * 10000 + rank or documenting the valid ranges for device_id and rank to prevent collisions.
| const int msg_id = 1024 + 1000 * device_id + rank; | |
| // Use a larger multiplier to avoid collisions between device_id and rank | |
| const int msg_id = 1024 + device_id * 10000 + rank; |
| if (!inited) { | ||
| // just init once | ||
| const int msg_id = 1024 + rank; | ||
| const int msg_id = 1024 + 1000 * device_id + rank; |
There was a problem hiding this comment.
The message queue ID calculation uses 1024 + 1000 * device_id + rank. This formula could lead to collisions if rank >= 1000 or if there are many devices. For example:
- device_id=0, rank=1024 would give msg_id=2048
- device_id=1, rank=24 would also give msg_id=2048
Consider using a safer calculation formula such as 1024 + device_id * 10000 + rank or documenting the valid ranges for device_id and rank to prevent collisions.
| const int msg_id = 1024 + 1000 * device_id + rank; | |
| const int msg_id = 1024 + device_id * 10000 + rank; |
| int64_t device_id, | ||
| bool wait_flag) { | ||
| int msg_queue_id = 1024 + rank_id; | ||
| int msg_queue_id = 1024 + 1000 * device_id + rank_id; |
There was a problem hiding this comment.
The message queue ID calculation uses 1024 + 1000 * device_id + rank_id. This formula could lead to collisions if rank_id >= 1000 or if there are many devices. For example:
- device_id=0, rank_id=1024 would give msg_queue_id=2048
- device_id=1, rank_id=24 would also give msg_queue_id=2048
Consider using a safer calculation formula such as 1024 + device_id * 10000 + rank_id or documenting the valid ranges for device_id and rank_id to prevent collisions.
| int msg_queue_id = 1024 + 1000 * device_id + rank_id; | |
| int msg_queue_id = 1024 + device_id * 10000 + rank_id; |
| static struct msgdatakv msg_rcv; | ||
| static key_t key = ftok("/opt/", msg_queue_id); | ||
| static int msgid = msgget(key, IPC_CREAT | 0666); | ||
| int msg_queue_id = 1024 + 1000 * device_id + rank_id; |
There was a problem hiding this comment.
The message queue ID calculation uses 1024 + 1000 * device_id + rank_id. This formula could lead to collisions if rank_id >= 1000 or if there are many devices. For example:
- device_id=0, rank_id=1024 would give msg_queue_id=2048
- device_id=1, rank_id=24 would also give msg_queue_id=2048
Consider using a safer calculation formula such as 1024 + device_id * 10000 + rank_id or documenting the valid ranges for device_id and rank_id to prevent collisions.
| int msg_queue_id = 1024 + 1000 * device_id + rank_id; | |
| int msg_queue_id = 1024 + device_id * 10000 + rank_id; |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #5508 +/- ##
==========================================
Coverage ? 60.27%
==========================================
Files ? 329
Lines ? 41117
Branches ? 6261
==========================================
Hits ? 24782
Misses ? 14445
Partials ? 1890
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| while True: | ||
| try: | ||
| get_output_kv_signal(kv_signal_data, self.rank_id, 0) # wait_flag | ||
| get_output_kv_signal(kv_signal_data, self.rank_id, self.gpu_id, 0) # wait_flag |
There was a problem hiding this comment.
有个比较小众的问题,这里传 gpu_id 的话好像无法区分同一张卡上的不同 P 实例
|
refer to #5514 |
Motivation
多个p 服务需要基于 device_id 来区分发送 kv signal 的管道
Modifications
管道标记中考虑 device_id
Usage or Command
不变
Accuracy Tests
单侧覆盖
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.