feat(multimodal): move MM routing into vLLM frontend processor#8065
Conversation
- Add 18 unit tests for new multimodal routing modules: - test_routing_utils.py: block_mm_infos and routing_info_from_features - test_media_connector.py: ImageLoader LRU cache integration - test_mm_kwargs_transfer.py: metadata serialization and sender logic - Move launch script to examples/backends/vllm/launch/agg_multimodal_router.sh (alongside existing agg_multimodal.sh) - Rename _externally_processed -> externally_processed in handlers.py Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Krish Hung <krishung5@gmail.com>
- Remove examples/backends/vllm/mm_router_worker/ (replaced by frontend vLLM processor with in-process KV router) - Remove tests/mm_router/test_vllm_mm_router_e2e.py (tested the removed mm_router_worker architecture) - Update docs/features/multimodal/multimodal-kv-routing.md to describe the new frontend routing approach for vLLM, keeping TRT-LLM's mm_router_worker architecture as-is Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Krish Hung <krishung5@gmail.com>
Validates multimodal KV-aware routing end-to-end: Frontend (vLLM processor + KvRouter) → vLLM backend worker. Tests cover text-only overlap, repeated same/different images, staircase image counts, swapped image order, HTTP image URLs, and HTTP vs data URI parity.
The receiver stored all pickled kwargs items under the same dict key, so only the last image survived. vLLM then hit IndexError when accessing mm_kwargs for the 2nd/3rd image. Accumulate items as a list and build mm_kwargs with all of them.
Allows packing all workers onto GPU 0 for functional testing on single-GPU machines via SINGLE_GPU=true environment variable.
WalkthroughAdded new multimodal MM infrastructure (media connector, NIXL-based kwargs transfer, routing utilities) to support frontend-driven MM processing and KV cache routing. Removed standalone MM router worker. Integrated MM routing into frontend vLLM processor and backend handlers with NIXL tensor transfer support. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~65 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 11
🧹 Nitpick comments (1)
components/src/dynamo/common/tests/multimodal/test_mm_kwargs_transfer.py (1)
79-99: Strengthen this sender test and remove the inline import.This block only locks in the empty/skip branches, but the sender bug fixed in this PR was the multi-item overwrite path. Adding a two-feature
prepare()case here would protect that regression and lets you reuse the existing module-scopeMmKwargsSenderimport.As per coding guidelines, "keep all imports at module top (flag any imports inside functions/classes)".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/src/dynamo/common/tests/multimodal/test_mm_kwargs_transfer.py` around lines 79 - 99, Update the tests to remove the inline import and add a new case that covers the multi-item overwrite path: delete the from-dynamo... import inside test_prepare_with_no_data_returns_none and rely on the module-level MmKwargsSender import, and add a new async test (e.g., test_prepare_with_two_features_preserves_both) which creates two MagicMock features with non-None data and modality="image", calls MmKwargsSender().prepare([feat1, feat2], modality="image"), and asserts that returned meta is not None and that both futures (or their corresponding entries) are present and distinct so the previous multi-item overwrite bug is prevented.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/src/dynamo/common/multimodal/media_connector.py`:
- Around line 75-80: The cache hit path in fetch_image returns the cached PIL
Image directly and ignores the requested image_mode, causing inconsistent
behavior versus a miss; update the cache-hit branch in fetch_image (use key =
image_url.lower(), self._image_loader._image_cache and cache[key]) to check the
requested image_mode and, if provided and different from cache[key].mode, return
a converted image (use PIL Image.convert) rather than the cached object (do not
mutate cache); otherwise return the cached image as before, and still call
cache.move_to_end(key) to preserve LRU behavior.
- Around line 96-103: The current blanket except in
DynamoMediaConnector.fetch_image_async hides real errors; change it to only
catch the expected unsupported-source exception(s) (e.g., catch ValueError from
ImageLoader as e) and perform the debug log + return await
super().fetch_image_async(image_url, image_mode=image_mode) inside that handler;
for any other exception, log it (including the exception) and re-raise so
transient fetch/decode errors are not swallowed. Ensure you reference the same
image_url slicing and the call to super().fetch_image_async when implementing
the narrowed except and re-raise behavior.
In `@components/src/dynamo/common/multimodal/mm_kwargs_transfer.py`:
- Around line 200-223: The concurrent reads append pickled items in completion
order and can scramble the original metadata.tensor_specs order; modify the loop
that creates read tasks (around metadata.tensor_specs, _do_read, read_tasks, and
the "__pickled_kwargs_item__" handling) to capture the spec index and
pre-allocate a results list (or mapping) sized to metadata.tensor_specs for the
"__pickled_kwargs_item__" key, then assign each completed read into
results[name][index] instead of appending so items preserve the original spec
order; ensure non-pickled names continue to set results[name] = t as before and
then await asyncio.gather(*read_tasks).
In `@components/src/dynamo/common/multimodal/routing_utils.py`:
- Around line 47-54: The overlap check in constructing mm_objects uses an
inclusive end bound and should treat image_ranges as [img_start, img_end)
(exclusive end); change the condition in the list comprehension that builds
mm_objects (referencing mm_hashes, image_ranges, block_start, block_end,
img_start, img_end and the mm_objects variable) from "if block_end > img_start
and block_start <= img_end" to use an exclusive end comparison (e.g., ensure
block_start < img_end rather than <=) so a block starting exactly at img_end is
not considered overlapping; update the predicate accordingly to prevent
overstating cache overlap and misrouting.
In `@components/src/dynamo/frontend/vllm_processor.py`:
- Around line 193-207: The code currently sets nixl_transferred=True whenever
nixl_meta is non-None, which can mark a partial or wrong-modality transfer as
successful; update the block handling the result of
self._mm_kwargs_sender.prepare(vllm_preproc.mm_features, modality="image") so
that you only mark nixl_transferred=True when (1) the returned nixl_meta
indicates the same supported modality you intended (not just any non-None), and
(2) every feature that had data in vllm_preproc.mm_features was actually
included in nixl_meta.tensor_specs (i.e., count/IDs of transferred tensors
equals the count/IDs of non-None features). If the check fails, leave URL
fallback enabled and do not set nixl_transferred; apply the same guarded logic
in the analogous handling around lines 404-410 to avoid dropping
multi_modal_data on partial/mismatched transfers.
In `@components/src/dynamo/vllm/handlers.py`:
- Around line 1265-1274: The current fallback uses request["token_ids"] when
expanded_token_ids is missing, which misaligns mm_placeholders and transferred
kwargs; instead, when extra_args.get("expanded_token_ids") is falsy, do not set
expanded_token_ids to request["token_ids"] and instead abort the NIXL fast-path
(e.g., return/raise a signal so the normal processor path rebuilds the prompt).
Update the logic around expanded_token_ids in the handler so that the code that
relies on mm_placeholders uses only truly expanded_token_ids and that absence of
expanded_token_ids triggers exiting the fast path rather than falling back to
request["token_ids"].
In `@docs/features/multimodal/multimodal-kv-routing.md`:
- Around line 86-94: Add a row for the SINGLE_GPU environment variable to the
"Key environment variables" table: document the variable name `SINGLE_GPU`, set
its default to `false` (or `0`) and add a short description like "Enable
single-GPU packing mode for the launcher (use on 1‑GPU machines); accepts
true/false or 1/0" so readers know how to enable the single‑GPU path and what
values are supported; ensure the description matches the launcher behavior for
SINGLE_GPU.
In `@examples/backends/vllm/launch/agg_multimodal_router.sh`:
- Around line 17-18: The script currently sets strict mode with "set -euo
pipefail" but lacks the repo-standard process-group cleanup; immediately after
that line add the top-level trap 'echo Cleaning up...; kill 0' EXIT so all child
processes (not just direct parents) are killed on Ctrl+C or exit; replace or
remove the ad-hoc PID-list cleanup logic later in the script (references to the
current PID kill block around lines 74-84) so the single trap handles cleanup
consistently.
- Around line 28-29: Replace hard-coded default ports by allocating free ports
at runtime: for HTTP_PORT, KV_EVENTS_PORT_BASE and any worker system ports in
this script (e.g., the variables at the other commented locations) use the
repo's alloc_port helper when the env var is unset (i.e., set
HTTP_PORT="${HTTP_PORT:-$(alloc_port)}" and similarly for KV_EVENTS_PORT_BASE
and worker port vars), preserving the ability for callers to override by
exporting env vars; keep BLOCK_SIZE as-is but update the port assignments around
the references to HTTP_PORT, KV_EVENTS_PORT_BASE and the worker ports so they
export the dynamically allocated values instead of static defaults.
- Around line 49-50: The summary prints health URLs using VLLM_SYSTEM_PORT_BASE
+ i but the actual workers are started on 18079 + i*2, so override of
VLLM_SYSTEM_PORT_BASE has no effect; fix by deriving worker ports from
VLLM_SYSTEM_PORT_BASE everywhere: replace literal 18079 in the worker launch
logic with $((VLLM_SYSTEM_PORT_BASE - 2)) (so
worker_port=$((VLLM_SYSTEM_PORT_BASE - 2 + i*2)) or equivalent), and update the
summary/banner health URL computation to use the same expression
($((VLLM_SYSTEM_PORT_BASE - 2 + i*2))) instead of VLLM_SYSTEM_PORT_BASE + i;
apply the same change to the other occurrences mentioned (around the other
summaries/prints at the noted sections).
In `@tests/mm_router/test_vllm_mm_router_e2e.py`:
- Around line 184-187: Replace the fixed time.sleep with a real readiness check:
after entering the VLLMWorkerProcess context, poll the worker's health endpoint
(use vllm_port) or call an existing readiness helper (e.g.,
VLLMWorkerProcess.wait_until_ready or a project helper like
wait_for_service_health) until it reports healthy, then start FrontendProcess;
if no helper exists, use the provided test fixtures
(runtime_services_dynamic_ports / start_services_with_http / ManagedProcess)
instead of sleeping to wait for the worker to be ready before creating
FrontendProcess.
---
Nitpick comments:
In `@components/src/dynamo/common/tests/multimodal/test_mm_kwargs_transfer.py`:
- Around line 79-99: Update the tests to remove the inline import and add a new
case that covers the multi-item overwrite path: delete the from-dynamo... import
inside test_prepare_with_no_data_returns_none and rely on the module-level
MmKwargsSender import, and add a new async test (e.g.,
test_prepare_with_two_features_preserves_both) which creates two MagicMock
features with non-None data and modality="image", calls
MmKwargsSender().prepare([feat1, feat2], modality="image"), and asserts that
returned meta is not None and that both futures (or their corresponding entries)
are present and distinct so the previous multi-item overwrite bug is prevented.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: ca540856-1e39-4b32-bc13-36a2e7f1b14a
📒 Files selected for processing (18)
components/src/dynamo/common/multimodal/media_connector.pycomponents/src/dynamo/common/multimodal/mm_kwargs_transfer.pycomponents/src/dynamo/common/multimodal/routing_utils.pycomponents/src/dynamo/common/tests/multimodal/test_media_connector.pycomponents/src/dynamo/common/tests/multimodal/test_mm_kwargs_transfer.pycomponents/src/dynamo/common/tests/multimodal/test_routing_utils.pycomponents/src/dynamo/frontend/vllm_processor.pycomponents/src/dynamo/vllm/handlers.pydocs/features/multimodal/multimodal-kv-routing.mdexamples/backends/vllm/launch/agg_multimodal_router.shexamples/backends/vllm/mm_router_worker/README.mdexamples/backends/vllm/mm_router_worker/__init__.pyexamples/backends/vllm/mm_router_worker/__main__.pyexamples/backends/vllm/mm_router_worker/handler.pyexamples/backends/vllm/mm_router_worker/launch.shexamples/backends/vllm/mm_router_worker/mm_processor.pyexamples/backends/vllm/mm_router_worker/mm_router_worker.pytests/mm_router/test_vllm_mm_router_e2e.py
💤 Files with no reviewable changes (7)
- examples/backends/vllm/mm_router_worker/main.py
- examples/backends/vllm/mm_router_worker/init.py
- examples/backends/vllm/mm_router_worker/README.md
- examples/backends/vllm/mm_router_worker/launch.sh
- examples/backends/vllm/mm_router_worker/handler.py
- examples/backends/vllm/mm_router_worker/mm_router_worker.py
- examples/backends/vllm/mm_router_worker/mm_processor.py
Adds --dyn-preprocess-workers N to parallelize process_inputs() across N worker processes, each with its own GIL. Workers register mm_kwargs with NIXL directly in their own address space — only lightweight metadata (~330 bytes) crosses the process boundary, while the backend reads the 3.7MB pickled tensors via RDMA from worker memory. Default is 0 (serial path unchanged). Activated with e.g. --dyn-preprocess-workers 4 or PREPROCESS_WORKERS=4 in the launch script.
Threads share memory — eliminates all inter-process overhead (pickle, pipe, per-worker NIXL connectors, worker globals). CPU-bound numpy/PIL operations in process_inputs() release the GIL, allowing threads to overlap. NIXL registration stays in the main async event loop.
Register tensors directly with NIXL instead of pickling the entire MultiModalKwargsItem. Field metadata (type, batch_size, keep_on_cpu) is serialized as lightweight fields in TensorTransferSpec (~200 bytes) while tensor data transfers via RDMA zero-copy. This removes the 30-50ms pickle.dumps() per image that was the dominant NIXL overhead, reducing frontend per-request cost from ~50-70ms to ~5-10ms for the NIXL path.
This reverts commit 08c03b4.
Follow-up on furionw's review comment: the previous refactor unified the NIXL and SHM senders under MmKwargsSender(ABC), but vllm_processor still wrapped each `sender.prepare()` call with an NVTX annotation. Push the NVTX annotation into the base class via the template-method pattern. Subclasses declare their own class-level `_nvtx_label` / `_nvtx_color` attrs and implement `_prepare()`; the base's concrete `prepare()` wraps `_prepare()` with `_nvtx.annotate(...)` so the callsite in vllm_processor doesn't need to know which transport is in use. - MmKwargsSender: add `_nvtx_label`/`_nvtx_color` class attrs, make `prepare()` concrete (template), add abstract `_prepare()`. - MmKwargsNixlSender: override class attrs, rename `prepare` -> `_prepare`, drop the now-redundant outer `mm_nixl:sender_prepare` range. - MmKwargsShmSender: override class attrs, rename `prepare` -> `_prepare`, drop the now-redundant outer `mm_shm:sender_prepare` range. - vllm_processor: drop the `_nvtx.annotate(nvtx_label, color=nvtx_color)` wrap at the callsite; the sender owns it now. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`prompt` is assigned from two branches with different types: the pre-rendered MultiModalInput dict on the fast path, and the TokensPrompt/EmbedsPrompt/None tuple from _build_prompt_from_request. mypy inferred the variable's type from the first assignment and flagged the tuple unpack as incompatible. Declare `prompt: Any` above the if/else so both branches type-check. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…fe-core # Conflicts: # components/src/dynamo/vllm/handlers.py
The cleanup trap on EXIT/INT/TERM does `kill 0`, which sends SIGTERM to every process in the current process group — including the script itself. When the script catches that SIGTERM it re-enters the trap, prints "Cleaning up..." again, and fires `kill 0` again, spinning in an infinite loop that ends in a segfault when bash trips over itself. Clear the trap inside the handler (`trap - EXIT INT TERM`) so the second SIGTERM is a no-op. Reproduces on the agg_multimodal_router.sh launch any time Ctrl-C or a parent signal kills the script. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Matches the existing MmKwargsNixlSender / MmKwargsShmSender / MmKwargsShmReceiver naming convention. Addresses review comment on PR #8065. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous _try_receive_mm_kwargs_nixl dispatched to _receive_mm_kwargs_shm internally, so its name was misleading and the two receive branches had substantial duplicate logic (unpickle + validate + build EngineInput + inject). Renames the entry point to _try_receive_mm_kwargs (transport-agnostic) and factors the shared post-receive flow into a single _receive_mm_kwargs helper parameterized by transport kind. The transport-specific bits (receiver acquisition, RDMA-read vs shm-open, metadata validation) are now small branches; everything downstream is shared. Addresses review comment on PR #8065. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…SHM wire Three related changes that mirror the existing MmKwargsSender ABC pattern across the rest of the transfer layer, addressing follow-up review feedback on PR #8065. 1. Pydantic SHM metadata Added MmKwargsShmItem + MmKwargsShmTransferMetadata Pydantic models so the SHM wire format matches NIXL's MmKwargsTransferMetadata. Sender now emits metadata.model_dump(); receiver accepts the validated Pydantic object; handlers.py validates on entry. 2. MmKwargsReceiver ABC + NVTX template method Mirror of the existing MmKwargsSender ABC. Base class owns the per-request NVTX annotation via class-level _nvtx_label / _nvtx_color attrs; subclasses implement async _receive(metadata). Both MmKwargsNixlReceiver and MmKwargsShmReceiver now inherit. handlers.py's _receive_mm_kwargs drops its is_nixl / string-tag dispatch and takes a MmKwargsReceiver instance directly. 3. Sender _prepare() scaffold dedup The feature-iteration / mm_hash collection / None-data skip / pickle-dumps loop was duplicated across NIXL and SHM senders. Moved into MmKwargsSender.prepare() as a template method that delegates transport-specific work to two small hooks: - _encode_item(idx, pickled) -> (encoded_item, cleanup_item) - _assemble_extra_args(modality, encoded_items, mm_hashes) Each subclass now implements only its transport-unique bits. Tested: 144/144 unit tests pass, 30/30 e2e tests pass (all 10 scenarios across shm / nixl / disabled transport modes). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
@furionw Thanks for the comments. I made some changes and hoped this makes the NIXL and SHM transfer paths more consolidated and structured. Lmk what you think! High-level structure of the multimodal Wire protocolBoth transports now use Pydantic
Sender side (frontend)Concrete senders
Receiver side (backend)Concrete receivers
Handler dispatch (vllm/handlers.py)
|
Add the vllm-project/vllm#39502 patch instructions to the MM KV routing guide so users who install Dynamo find the setup steps in the checked-in docs (previously only in the PR description). Clarify the Overview to cover both vLLM and TRT-LLM rather than sounding vLLM-only, and scope Transfer Mode Details to vLLM (TRT-LLM backends re-run their own preprocessing and are unaffected by DYNAMO_MM_TRANSFER). Drop the stale "Known Limitations" bullet that referenced the same upstream API — the new Prerequisites section is now the authoritative pointer. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…fe-core # Conflicts: # components/src/dynamo/vllm/handlers.py
|
Thanks for working on these! just want to make sure you two are aware of these parallel work and coordinate
|
Overview
Moves multimodal-aware KV routing from the separate MM Router Worker process into the frontend's vLLM processor, eliminating the extra network hop and redundant image processing for multimodal requests.
Architecture
The frontend runs the full vLLM
process_inputs()pipeline, then transfers pre-processedmm_kwargsto the backend via shared memory. The backend skips image download and HF processing entirely.Transfer Modes
Controlled by
DYNAMO_MM_TRANSFERenv var:shm(default)nixlDYNAMO_DISABLE_NIXL_MM=1Fallback behavior: When SHM transfer fails (e.g., cross-node where
/dev/shmis not shared), the backend falls back to downloading and processing images from URLs. The HF processor is not skipped in this case. A follow-up PR will explore making NIXL or TCP request plane the cross-node default.Key Changes
Frontend (
vllm_processor.py):_prepare_mm_routing()extracts mm_routing_info from vLLM'sprocess_inputs()output and prepares SHM/NIXL transferDYNAMO_MM_TRANSFERenv var (shmdefault,nixlfor cross-node)MmKwargsSender/MmKwargsShmSender(no lazy imports)preprocess_workers != 0guard restored (pool not supported for vllm processor)mm_processor_kwargsthrough both KV and non-KV router pathsBackend (
handlers.py):_try_receive_mm_kwargs_nixl()receives pre-processed mm_kwargs via SHM or NIXL (mutually exclusive, determined by which metadata key is present inextra_args)_receive_mm_kwargs_shm()reads from shared memory (~2ms)inject_into_mm_cache()directly (public API from vLLM upstream)isinstance(MultiModalKwargsItem)validation on both NIXL and SHM pickle.loads pathsexpanded_token_idsmissing (prevents placeholder misalignment)mm_processor_kwargsanduse_audio_in_videosupportTransfer (
mm_kwargs_transfer.py):MmKwargsShmSender/MmKwargsShmReceiver— POSIX shared memory transferMmKwargsSender/MmKwargsReceiver— NIXL RDMA with pre-registered descriptor poolFileNotFoundError, log other exceptions_acquire_descriptor: explicitRuntimeErrorforNone _data_ref(not assert)New files:
routing_utils.py— model-agnosticmm_featurestoblock_mm_infosconversionmedia_connector.py—DynamoMediaConnectorwraps ImageLoader with LRU cacheTests:
Upstream vLLM Dependency
This PR depends on vllm-project/vllm#39502, which exposes
InputProcessor.inject_into_mm_cache()as a public API for injecting pre-processed mm_kwargs into the processor cache. Until merged, apply the patch:Where should the reviewer start?
components/src/dynamo/frontend/vllm_processor.py— main integration:_prepare_mm_routing()and transfer mode selectioncomponents/src/dynamo/vllm/handlers.py— backend SHM/NIXL receive andinject_into_mm_cache()callcomponents/src/dynamo/common/multimodal/mm_kwargs_transfer.py— SHM and NIXL sender/receivertests/mm_router/test_vllm_mm_router_e2e.py— 30 e2e tests (10 scenarios × 3 transfer modes)components/src/dynamo/common/tests/multimodal/test_mm_kwargs_transfer.py— 15 unit testsSummary by CodeRabbit
New Features
Removals
Tests
Documentation