Skip to content

Conversation

@gongshaotian
Copy link
Collaborator

@gongshaotian gongshaotian commented Nov 21, 2025

Motivation

In RL training tasks, we have enabled recording of routing information during the inference process and directly utilized the experts selected during inference in the training process to alleviate the consistency issue between training and inference.
image

Origin PR: #5321

Modifications

Add RoutingReplayManager and RoutingStore class to support routing replay

  • RoutingStore: The client used for interacting with Routing Store has two routing storage modes based on File System and RDMA(WIP).
  • RoutingReplayManager: Implement request level routing table management within FastDeploy.

Model support:

  • EB5
  • EB4.5
  • GLM

Usage or Command

Add a startup switch for route replay:


    --routing-replay-config '{"enable_routing_replay":true, "routing_store_type":"local", "local_store_dir":"xxx/routing_replay_output"}'

Accuracy Tests

Checklist

  • Add at least a tag in the PR title.
    • Tag list: [[FDConfig],[APIServer],[Engine], [Scheduler], [PD Disaggregation], [Executor], [Graph Optimization], [Speculative Decoding], [RL], [Models], [Quantization], [Loader], [OP], [KVCache], [DataProcessor], [BugFix], [Docs], [CI], [Optimization], [Feature], [Benchmark], [Others], [XPU], [HPU], [GCU], [DCU], [Iluvatar], [Metax]]
    • You can add new tags based on the PR content, but the semantics must be clear.
  • Format your code, run pre-commit before commit.
  • Add unit tests. Please write the reason in this PR if no unit tests.
  • Provide accuracy results.
  • If the current PR is submitting to the release branch, make sure the PR has been submitted to the develop branch, then cherry-pick it to the release branch with the [Cherry-Pick] PR tag.

@paddle-bot
Copy link

paddle-bot bot commented Nov 21, 2025

Thanks for your contribution!

@gongshaotian gongshaotian changed the title [Cherry-Pick][RL] Support R3 [Cherry-Pick][RL] Support Rollout Routing Replay Nov 21, 2025
@gongshaotian
Copy link
Collaborator Author

gongshaotian commented Dec 4, 2025

Develop分支PR已提交,代码在RL环境中验证通过,申请skip @YuanRisheng

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds support for Rollout Routing Replay (R3) in RL training scenarios by recording expert routing decisions during inference and replaying them during training. This helps maintain consistency between the inference and training phases.

Key changes:

  • Introduces RoutingReplayManager for request-level routing table management and RoutingStore classes for persistent storage (local file system and RDMA placeholder)
  • Adds configuration support via RoutingReplayConfig with command-line arguments
  • Integrates routing capture hooks across all MOE backend implementations

Reviewed changes

Copilot reviewed 18 out of 18 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
fastdeploy/config.py Adds RoutingReplayConfig class with configuration options for routing replay storage
fastdeploy/engine/args_utils.py Adds CLI argument parsing and config creation for routing replay functionality
fastdeploy/engine/engine.py Passes routing replay config to worker processes
fastdeploy/worker/worker_process.py Adds routing_replay_config argument and initializes RoutingReplayConfig
fastdeploy/rl/rollout_config.py Adds routing_replay_config parameter to rollout configuration
fastdeploy/model_executor/forward_meta.py Adds routing_replay_table field to ForwardMeta for passing table buffer
fastdeploy/model_executor/layers/moe/routing_indices_cache.py New file implementing RoutingReplayManager, RoutingStore classes, and Triton kernel for routing storage
fastdeploy/model_executor/layers/moe/moe.py Adds enable_routing_replay flag and integrates hookfunc for routing capture
fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py Adds topk_ids_hookfunc parameter to base MOE method signatures
fastdeploy/model_executor/layers/moe/fused_moe_cutlass_backend.py Adds topk_ids_hookfunc parameter and hook invocation for Cutlass backend
fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py Adds topk_ids_hookfunc parameter and hook invocation for DeepGemm backend
fastdeploy/model_executor/layers/moe/fused_moe_marlin_backend.py Adds topk_ids_hookfunc parameter and hook invocation for Marlin backend
fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py Adds topk_ids_hookfunc parameter and hook invocation for Triton backend
fastdeploy/model_executor/layers/moe/fused_moe_wint2_backend.py Adds topk_ids_hookfunc parameter and hook invocation for Wint2 backend
fastdeploy/model_executor/layers/moe/fused_moe_xpu_backend.py Adds topk_ids_hookfunc parameter to XPU backend methods
fastdeploy/model_executor/layers/backends/metax/moe/fused_moe_triton_metax_backend.py Adds topk_ids_hookfunc parameter and hook invocation for Metax backend
fastdeploy/model_executor/layers/backends/gcu/moe/fused_moe_method_gcu_backend.py Adds topk_ids_hookfunc parameter to GCU backend methods
fastdeploy/model_executor/layers/backends/dcu/fused_moe_triton_backends.py Adds topk_ids_hookfunc parameter and hook invocation for DCU backend
Comments suppressed due to low confidence (2)

fastdeploy/model_executor/layers/moe/routing_indices_cache.py:326

        return RoutingStoreRDMA(fd_config=fd_config)

fastdeploy/config.py:1177

  • Unnecessary 'pass' statement.
        pass

self.routing_store.clear(rollout_id=rollout_id, layer_idx=layer_idx)

def get_request_from_store(self, request_id: str) -> List[paddle.Tensor]:
"""Get the routing indices of the reuest from store"""
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in docstring: "reuest" should be "request".

Suggested change
"""Get the routing indices of the reuest from store"""
"""Get the routing indices of the request from store"""

Copilot uses AI. Check for mistakes.
Comment on lines +227 to +233
def split_request_id(self, request_id: str):
"""Split the request id to get rollout id"""
chat_type, tmp_str = request_id.split("-", 1)
assert chat_type == "chatcmpl"
reversed_tmp_str = tmp_str[::-1].split("-", 5)
rollout_id = reversed_tmp_str[-1][::-1]
return rollout_id
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The split_request_id method lacks documentation explaining the expected format of request_id and what the returned rollout_id represents. The implementation uses specific parsing logic (reversing, splitting by "-", expecting "chatcmpl" prefix) that is not obvious from the docstring. Consider adding details like:

def split_request_id(self, request_id: str) -> str:
    """
    Split the request id to get rollout id.
    
    Expects request_id in the format: "chatcmpl-{rollout_id}-{suffix}-{suffix}-{suffix}-{suffix}-{suffix}"
    where the rollout_id is extracted by reversing the string, splitting on the last 5 hyphens,
    and returning the remaining prefix.
    
    Args:
        request_id: The full request identifier
        
    Returns:
        The extracted rollout_id portion
    """

Copilot uses AI. Check for mistakes.
Comment on lines +102 to +103
topk_ids_all = paddle.zeros([token_num_per_rank * tp_size, topk_ids.shape[1]], dtype=topk_ids.dtype)
paddle.distributed.all_gather(topk_ids_all, topk_ids, tp_group)
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The paddle.distributed.all_gather call is incorrect. According to PaddlePaddle's API, all_gather expects a list of tensors as the first argument, not a single tensor. The correct usage should be:

topk_ids_list = [paddle.zeros_like(topk_ids) for _ in range(tp_size)]
paddle.distributed.all_gather(topk_ids_list, topk_ids, tp_group)
topk_ids_all = paddle.concat(topk_ids_list, axis=0)
topk_ids = topk_ids_all[: batch_id_per_token.shape[0], :]

or use paddle.distributed.all_gather_object if that's the intended API.

Suggested change
topk_ids_all = paddle.zeros([token_num_per_rank * tp_size, topk_ids.shape[1]], dtype=topk_ids.dtype)
paddle.distributed.all_gather(topk_ids_all, topk_ids, tp_group)
topk_ids_list = [paddle.zeros_like(topk_ids) for _ in range(tp_size)]
paddle.distributed.all_gather(topk_ids_list, topk_ids, tp_group)
topk_ids_all = paddle.concat(topk_ids_list, axis=0)

Copilot uses AI. Check for mistakes.
Comment on lines +312 to +314
shutil.rmtree(file_path)


Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using shutil.rmtree() on line 312 assumes all items in local_store_dir are directories, but os.listdir() returns both files and directories. If any files exist directly in local_store_dir, this will cause an error. Consider checking if the path is a directory or file:

for file_name in os.listdir(self.local_store_dir):
    file_path = os.path.join(self.local_store_dir, file_name)
    if os.path.isdir(file_path):
        shutil.rmtree(file_path)
    else:
        os.remove(file_path)
Suggested change
shutil.rmtree(file_path)
if os.path.isdir(file_path):
shutil.rmtree(file_path)
else:
os.remove(file_path)

Copilot uses AI. Check for mistakes.
elif fd_config.routing_replay_config.routing_store_type == "rdma":
return RoutingStoreRDMA(fd_config=fd_config)
else:
raise ValueError("Invalid store type")
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message "Invalid store type" is not very helpful. It should include what the invalid value was and what the valid options are. Consider:

raise ValueError(f"Invalid routing_store_type '{fd_config.routing_replay_config.routing_store_type}'. Valid options are: 'local', 'rdma'")
Suggested change
raise ValueError("Invalid store type")
raise ValueError(
f"Invalid routing_store_type '{fd_config.routing_replay_config.routing_store_type}'. Valid options are: 'local', 'rdma'"
)

Copilot uses AI. Check for mistakes.
eplb_args[k] = v
return EPLBConfig(eplb_args)

def create_routing_repaly_config(self) -> RoutingReplayConfig:
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in method name: "repaly" should be "replay". This should be create_routing_replay_config to match the naming convention used elsewhere (e.g., routing_replay_config attribute).

Suggested change
def create_routing_repaly_config(self) -> RoutingReplayConfig:
def create_routing_replay_config(self) -> RoutingReplayConfig:

Copilot uses AI. Check for mistakes.
Comment on lines +318 to +319
def __init__(self) -> None:
super().__init__()
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RoutingStoreRDMA.__init__() method is missing the required fd_config parameter. It should accept fd_config: FDConfig as a parameter like its parent class and sibling class RoutingStoreLocal. This will cause a runtime error when instantiating RoutingStoreRDMA on line 326.

Suggested fix:

def __init__(self, fd_config: FDConfig) -> None:
    super().__init__(fd_config=fd_config)
Suggested change
def __init__(self) -> None:
super().__init__()
def __init__(self, fd_config: FDConfig) -> None:
super().__init__(fd_config=fd_config)

Copilot uses AI. Check for mistakes.
self.local_store_dir: str = "./routing_replay_output"

# RDMA routing store
pass
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pass statement here serves no purpose and should be removed. If this is a placeholder for future RDMA configuration attributes, consider adding a comment instead:

# RDMA routing store
# TODO: Add RDMA-specific configuration parameters
Suggested change
pass
# TODO: Add RDMA-specific configuration parameters here

Copilot uses AI. Check for mistakes.
"--routing_replay_config",
type=json.loads,
default=None,
help="Configation of Rollout Routing Replay.",
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in help text: "Configation" should be "Configuration".

Suggested change
help="Configation of Rollout Routing Replay.",
help="Configuration of Rollout Routing Replay.",

Copilot uses AI. Check for mistakes.
self._clear_table_slot(batch_id)

def put_table_to_store(self):
"""Put the routin table"""
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in docstring: "routin" should be "routing".

Suggested change
"""Put the routin table"""
"""Put the routing table"""

Copilot uses AI. Check for mistakes.
@yuanlehome yuanlehome merged commit fbed0ef into PaddlePaddle:feature/experimental_feature_20250908 Dec 4, 2025
20 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants