[train] Support packing for CUDA IPC transfer with new inference codepath#1557
[train] Support packing for CUDA IPC transfer with new inference codepath#1557
Conversation
There was a problem hiding this comment.
Code Review
This pull request optimizes the weight transfer process by packing tensors within a chunk into a single contiguous CUDA buffer, which reduces the overhead of managing multiple IPC handles. The review feedback identifies a security risk in using pickle for deserialization and suggests refactoring the manual unpacking logic to reuse existing components. Further recommendations include replacing magic indices with comments and using consistent utility functions for GPU identification to improve maintainability.
| # --- unpack SkyRL packed CUDA IPC format --- | ||
| import base64 | ||
| import pickle | ||
|
|
||
| names = update_info["names"] | ||
| shapes = update_info["shapes"] | ||
| sizes = update_info["sizes"] | ||
| pickled = update_info["ipc_handles_pickled"] | ||
| handles = pickle.loads(base64.b64decode(pickled)) | ||
|
|
||
| device_index = torch.cuda.current_device() | ||
| physical_gpu_id = str(torch.cuda.get_device_properties(device_index).uuid) | ||
| if physical_gpu_id not in handles: | ||
| raise ValueError(f"IPC handle not found for GPU UUID {physical_gpu_id}. " f"Available: {list(handles)}") | ||
| func, args = handles[physical_gpu_id] | ||
| # Remap device index to the LOCAL current-device. | ||
| list_args = list(args) | ||
| list_args[6] = device_index | ||
| packed_tensor = func(*list_args) | ||
|
|
||
| weights: list[tuple[str, torch.Tensor]] = [] | ||
| offset = 0 | ||
| for name, shape, size in zip(names, shapes, sizes): | ||
| weights.append((name, packed_tensor[offset : offset + size].view(*shape))) | ||
| offset += size |
There was a problem hiding this comment.
This block manually implements the logic for unpacking the packed CUDA IPC format and rebuilding the tensors. This logic is already present in CudaIpcWeightTransferReceiver.receive_weights (in cuda_ipc_strategy.py). To improve maintainability and reduce duplication, consider refactoring this method to leverage the existing weight_transfer_engine (which is the receiver) to handle the unpacking of update_info. This would centralize the IPC handling logic and make it easier to update in the future.
There was a problem hiding this comment.
Old codepath will be removed soon
What does this PR do?
Support for CUDA IPC based weight transfer for the new inference codepath was added in #1512 but it sent tensors one at a time. This PR packs tensors in the same chunk together.
Test Plan
I manually ran FSDP and Megatron colocated weight sync tests and they pass:
uv run --isolated --extra megatron --extra dev -- pytest -s -vvv tests/backends/skyrl_train/gpu/gpu_ci/test_megatron_worker.py::test_megatron_policy_weight_sync[colocate_all]uv run --isolated --extra fsdp --extra dev -- pytest -s -vv tests/backends/skyrl_train/gpu/gpu_ci/test_policy_local_engines_e2e.py::test_policy_local_engines_e2e[colocate_nccl_fsdp2_vllm]