-
Notifications
You must be signed in to change notification settings - Fork 349
[Feat][Router] Add disaggregated prefill orchestrated routing #777
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[Feat][Router] Add disaggregated prefill orchestrated routing #777
Conversation
Summary of ChangesHello @yahavb, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant enhancement to the vLLM router by adding support for router-orchestrated disaggregated prefill. This new routing logic allows the system to separate the compute-intensive prefill phase from the memory-bound decode phase, with the router intelligently managing the flow of requests and KV cache metadata between dedicated prefill and decode pods. This change improves resource utilization, enables independent scaling of prefill and decode components, and offers greater flexibility for various backend implementations beyond LMCache. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new routing logic for disaggregated prefill, which is a valuable feature. The overall structure is good, but there are several areas for improvement regarding correctness, performance, and maintainability. Key issues include duplicated and dead code for the orchestration logic, a deviation from the proposal where max_tokens=1 is not set for prefill requests, inefficient creation of aiohttp.ClientSession for each request, and incorrect handling of streaming responses which buffers the entire response in memory. I've provided specific comments and suggestions to address these points, which should lead to a more robust, performant, and maintainable implementation.
| async def handle_orchestrated_request( | ||
| self, | ||
| endpoints: List[EndpointInfo], | ||
| request_json: Dict, | ||
| request_path: str, | ||
| aiohttp_client, | ||
| ): | ||
| """ | ||
| Orchestrate the full Prefill → Decode flow. | ||
| Args: | ||
| endpoints: List of available endpoints | ||
| request_json: The original request body | ||
| request_path: The API path (e.g., /v1/chat/completions) | ||
| aiohttp_client: The aiohttp client session for making HTTP requests | ||
| Returns: | ||
| An async generator that yields the streaming response from decode | ||
| """ | ||
| import aiohttp | ||
| import json | ||
|
|
||
| prefiller_endpoints, decoder_endpoints = self._find_endpoints(endpoints) | ||
|
|
||
| # Select endpoints (simple first-available for now, can add load balancing later) | ||
| prefill_url = prefiller_endpoints[0].url | ||
| decode_url = decoder_endpoints[0].url | ||
|
|
||
| request_id = str(uuid.uuid4()) | ||
| logger.info(f"[{request_id}] Starting orchestrated disaggregated inference") | ||
| logger.info(f"[{request_id}] Prefill endpoint: {prefill_url}") | ||
| logger.info(f"[{request_id}] Decode endpoint: {decode_url}") | ||
|
|
||
| # Step 1: Send request to Prefill | ||
| prefill_api_url = f"{prefill_url}{request_path}" | ||
| logger.info(f"[{request_id}] Sending prefill request to {prefill_api_url}") | ||
|
|
||
| try: | ||
| async with aiohttp.ClientSession() as session: | ||
| # Call Prefill | ||
| async with session.post( | ||
| prefill_api_url, | ||
| json=request_json, | ||
| headers={"Content-Type": "application/json", "X-Request-ID": request_id}, | ||
| timeout=aiohttp.ClientTimeout(total=300) # 5 min timeout for prefill | ||
| ) as prefill_resp: | ||
| if prefill_resp.status != 200: | ||
| error_text = await prefill_resp.text() | ||
| logger.error(f"[{request_id}] Prefill failed with status {prefill_resp.status}: {error_text}") | ||
| yield json.dumps({"error": f"Prefill failed: {error_text}"}).encode() | ||
| return | ||
|
|
||
| prefill_data = await prefill_resp.json() | ||
| logger.info(f"[{request_id}] Prefill completed successfully") | ||
| logger.debug(f"[{request_id}] Prefill response: {prefill_data}") | ||
|
|
||
| # Step 2: Add prefill metadata and send to Decode | ||
| decode_request = request_json.copy() | ||
| decode_request["disagg_prefill_resp"] = prefill_data | ||
|
|
||
| decode_api_url = f"{decode_url}{request_path}" | ||
| logger.info(f"[{request_id}] Sending decode request to {decode_api_url}") | ||
|
|
||
| # Check if streaming is requested | ||
| is_streaming = request_json.get("stream", False) | ||
|
|
||
| async with session.post( | ||
| decode_api_url, | ||
| json=decode_request, | ||
| headers={"Content-Type": "application/json", "X-Request-ID": request_id}, | ||
| timeout=aiohttp.ClientTimeout(total=600) # 10 min timeout for decode | ||
| ) as decode_resp: | ||
| if decode_resp.status != 200: | ||
| error_text = await decode_resp.text() | ||
| logger.error(f"[{request_id}] Decode failed with status {decode_resp.status}: {error_text}") | ||
| yield json.dumps({"error": f"Decode failed: {error_text}"}).encode() | ||
| return | ||
|
|
||
| # Stream the decode response back to client | ||
| if is_streaming: | ||
| async for chunk in decode_resp.content.iter_any(): | ||
| if chunk: | ||
| yield chunk | ||
| else: | ||
| response_data = await decode_resp.read() | ||
| yield response_data | ||
|
|
||
| logger.info(f"[{request_id}] Decode completed successfully") | ||
|
|
||
| except aiohttp.ClientError as e: | ||
| logger.error(f"[{request_id}] HTTP error during orchestrated request: {e}") | ||
| yield json.dumps({"error": f"HTTP error: {str(e)}"}).encode() | ||
| except Exception as e: | ||
| logger.error(f"[{request_id}] Unexpected error during orchestrated request: {e}") | ||
| yield json.dumps({"error": f"Unexpected error: {str(e)}"}).encode() | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This handle_orchestrated_request method appears to be dead code. The actual orchestration logic is implemented in route_orchestrated_disaggregated_request in src/vllm_router/services/request_service/request.py.
This duplicated logic can lead to confusion and maintenance issues. Additionally, this method contains some anti-patterns:
- Local imports of
aiohttpandjson. - It accepts an
aiohttp_clientargument but creates a newaiohttp.ClientSessioninstead of using the provided one.
It's recommended to remove this method to keep the codebase clean and maintain a single source of truth for the orchestration logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in commit 2e4d2d2. Removed the entire handle_orchestrated_request method. The orchestration logic now lives only in route_orchestrated_disaggregated_request() in request.py.
| async with aiohttp.ClientSession() as session: | ||
| # Send to Prefill | ||
| async with session.post( | ||
| prefill_api_url, | ||
| json=request_json, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two issues with the prefill request handling:
max_tokensnot set: The proposal document specifies sending the prefill request withmax_tokens=1to optimize the prefill step. The current implementation sends the original request, which could lead to unnecessary token generation.- Inefficient session creation: A new
aiohttp.ClientSessionis created for each request. This is inefficient. The application's shared client session atrequest.app.state.aiohttp_client_wrappershould be used instead.
Please address both points for a more correct and performant implementation.
prefill_request_json = request_json.copy()
prefill_request_json["max_tokens"] = 1
async with request.app.state.aiohttp_client_wrapper() as session:
# Send to Prefill
async with session.post(
prefill_api_url,
json=prefill_request_json,There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in commit 2e4d2d2:
- Prefill request now uses
max_tokens=1(andmax_completion_tokens=1if present) - Changed from
aiohttp.ClientSession()torequest.app.state.aiohttp_client_wrapper()
| # For streaming, collect all chunks first, then yield | ||
| chunks = [] | ||
| async for chunk in decode_resp.content.iter_any(): | ||
| if chunk: | ||
| chunks.append(chunk) | ||
|
|
||
| logger.info(f"[{request_id}] Decode streaming completed, collected {len(chunks)} chunks") | ||
|
|
||
| async def generate_stream(): | ||
| for chunk in chunks: | ||
| yield chunk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation for handling streaming responses collects all chunks into a list before sending them to the client. This approach buffers the entire response in memory, which defeats the purpose of streaming and can lead to high memory consumption and increased time to first token for the client.
The response chunks should be yielded to the client as soon as they are received from the decode service.
Note that with this change, the completion log on lines 508-510 will still be executed before the stream is fully sent. For correct logging, you might consider moving the completion log inside the generator, within a finally block, to ensure it's executed after the stream is consumed.
# For streaming, yield chunks as they arrive
async def generate_stream():
async for chunk in decode_resp.content.iter_any():
if chunk:
yield chunkThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in commit 2e4d2d2. Streaming now yields chunks as they arrive using a generator with finally block for completion logging:
async def generate_stream():
try:
async for chunk in decode_resp.content.iter_any():
if chunk:
yield chunk
finally:
logger.info(f"[{request_id}] Orchestrated streaming request completed...")|
|
||
| elif isinstance( | ||
| request.app.state.router, (KvawareRouter, PrefixAwareRouter, SessionRouter) | ||
| request.app.state.router, (KvawareRouter, PrefixAwareRouter, SessionRouter, DisaggregatedPrefillOrchestratedRouter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DisaggregatedPrefillOrchestratedRouter is already handled in a dedicated if block earlier in this function (lines 185-189). Including it here is redundant and this branch will never be taken for this router type. Please remove it from this isinstance check to improve clarity.
| request.app.state.router, (KvawareRouter, PrefixAwareRouter, SessionRouter, DisaggregatedPrefillOrchestratedRouter) | |
| request.app.state.router, (KvawareRouter, PrefixAwareRouter, SessionRouter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in commit 2e4d2d2. Removed DisaggregatedPrefillOrchestratedRouter from the tuple - it's already handled by the dedicated if block earlier.
| router = request.app.state.router | ||
| prefill_labels = router.prefill_model_labels | ||
| decode_labels = router.decode_model_labels | ||
|
|
||
| prefiller_endpoints = [e for e in endpoints if e.model_label in prefill_labels] | ||
| decoder_endpoints = [e for e in endpoints if e.model_label in decode_labels] | ||
|
|
||
| if not prefiller_endpoints: | ||
| logger.error(f"[{request_id}] No prefill endpoints found with labels {prefill_labels}") | ||
| return JSONResponse( | ||
| status_code=503, | ||
| content={"error": f"No prefill endpoints available with labels {prefill_labels}"}, | ||
| headers={"X-Request-Id": request_id}, | ||
| ) | ||
|
|
||
| if not decoder_endpoints: | ||
| logger.error(f"[{request_id}] No decode endpoints found with labels {decode_labels}") | ||
| return JSONResponse( | ||
| status_code=503, | ||
| content={"error": f"No decode endpoints available with labels {decode_labels}"}, | ||
| headers={"X-Request-Id": request_id}, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for finding prefill and decode endpoints is duplicated here. The DisaggregatedPrefillOrchestratedRouter class already has a _find_endpoints method that encapsulates this logic, including error handling.
To avoid code duplication and adhere to better design principles (keeping routing logic within the router), you should call the router's method here.
This also applies to the error handling for when endpoints are not found.
router = request.app.state.router
try:
prefiller_endpoints, decoder_endpoints = router._find_endpoints(endpoints)
except ValueError as e:
logger.error(f"[{request_id}] Endpoint discovery failed: {e}")
return JSONResponse(
status_code=503,
content={"error": str(e)},
headers={"X-Request-Id": request_id},
)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in commit 2e4d2d2. Now using router._find_endpoints(endpoints) with try/except ValueError for error handling.
| "Content-Type": "application/json", | ||
| "X-Request-Id": request_id, | ||
| }, | ||
| timeout=aiohttp.ClientTimeout(total=300) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Acknowledged. Keeping hardcoded timeouts (300s prefill, 600s decode) for now as reasonable defaults.
Implements support for disaggregated prefill as outlined in the 2025 Q1 roadmap. This enables prefill/decode disaggregation with router-orchestrated KV cache transfer. Closes vllm-project#26 Signed-off-by: Yahav <yahavb@amazon.com>
aa54dff to
4ac2c59
Compare
…mpatibility Signed-off-by: Yahav <yahavb@amazon.com>
…rchestrated routing - Remove dead code (handle_orchestrated_request method in routing_logic.py) - Fix prefill request to use max_tokens=1 per proposal spec - Use shared aiohttp client instead of creating new session per request - Fix streaming to yield chunks immediately (true streaming) - Remove redundant isinstance check for DisaggregatedPrefillOrchestratedRouter - Use router's _find_endpoints method to avoid code duplication Signed-off-by: Yahav <yahavb@amazon.com>
Implements support for disaggregated prefill as outlined in the 2025 Q1 roadmap. This enables prefill/decode disaggregation with router-orchestrated KV cache transfer.
Closes #26
Summary
This PR implements disaggregated prefill routing - a feature listed in the 2025 Q1 roadmap. It adds a new routing algorithm
disaggregated_prefill_orchestratedthat enables prefill/decode disaggregation with router-orchestrated KV cache transfer.See full proposal: proposals/disaggregated-prefill-orchestrated-routing.md
Motivation
This complements LMCache-based disaggregated inference by supporting backends with custom
kv_connectorimplementations:| Approach | KV Transfer | Use Case | |----------|-------------|----------| | LMCache-based DI | LMCache + NIXL | GPU clusters with LMCache | | Router-orchestrated DI (this PR) | vLLM native
kv_transfer_config| Any backend with kv_connector |Changes
| File | Change | |------|--------| |
routing_logic.py| NewDisaggregatedPrefillOrchestratedRouterclass | |parser.py| New--prefill-model-labels,--decode-model-labelsarguments | |request.py| Newroute_orchestrated_disaggregated_request()function |Usage
python -m vllm_router.app \ --routing-logic=disaggregated_prefill_orchestrated \ --service-discovery=k8s \ --k8s-label-selector="app in (prefill,decode)" \ --prefill-model-labels=prefill \ --decode-model-labels=decodeTested
✅ End-to-end tested with prefill/decode pods on EKS
FIX #26
Checklist:
[Feat][Router]prefix-s)uv run pre-commit run --all-files)-swhen doinggit commit[Bugfix],[Feat], and[CI].Detailed Checklist (Click to Expand)
Thank you for your contribution to production-stack! Before submitting the pull request, please ensure the PR meets the following criteria. This helps us maintain the code quality and improve the efficiency of the review process.
PR Title and Classification
Please try to classify PRs for easy understanding of the type of changes. The PR title is prefixed appropriately to indicate the type of change. Please use one of the following:
[Bugfix]for bug fixes.[CI/Build]for build or continuous integration improvements.[Doc]for documentation fixes and improvements.[Feat]for new features in the cluster (e.g., autoscaling, disaggregated prefill, etc.).[Router]for changes to thevllm_router(e.g., routing algorithm, router observability, etc.).[Misc]for PRs that do not fit the above categories. Please use this sparingly.Note: If the PR spans more than one category, please include all relevant prefixes.
Code Quality
The PR need to meet the following code quality standards:
pre-committo format your code. SeeREADME.mdfor installation.DCO and Signed-off-by
When contributing changes to this project, you must agree to the DCO. Commits must include a
Signed-off-by:header which certifies agreement with the terms of the DCO.Using
-swithgit commitwill automatically add this header.What to Expect for the Reviews
We aim to address all PRs in a timely manner. If no one reviews your PR within 5 days, please @-mention one of YuhanLiu11
, Shaoting-Feng or ApostaC.