feat: Add n>1 support to the frontend and the vLLM backend#6350
feat: Add n>1 support to the frontend and the vLLM backend#6350kornelcsernai-harmonic wants to merge 4 commits into
Conversation
|
👋 Hi kornelcsernai-harmonic! Thank you for contributing to ai-dynamo/dynamo. Just a reminder: The 🚀 |
|
This PR is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 5 days. |
bc3ad9b to
f5c9d22
Compare
Signed-off-by: Kornel Csernai <239206175+kornelcsernai-harmonic@users.noreply.github.com>
fc3953c to
f3ad4b6
Compare
WalkthroughThe pull request adds end-to-end support for the OpenAI Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/src/dynamo/vllm/handlers.py (1)
1577-1644:⚠️ Potential issue | 🟠 MajorGate token-mode usage until the final choice finishes.
completion_usageis attached on every per-choicefinish_reason; withn > 1, an early-finishing choice can report stale combined usage before other choices finish, and later choices can duplicate it. Mirror text mode and only attach usage whenfinished_choices ∪ {choice_index}reachesn.🐛 Proposed fix
output_tokens_per_choice: Dict[int, int] = {} finished_choices: set[int] = set() + n = getattr(sampling_params, "n", 1) or 1 async for res in gen: @@ if output.finish_reason: out["finish_reason"] = normalize_finish_reason( output.finish_reason ) - out[ - "completion_usage" - ] = BaseWorkerHandler._build_completion_usage( - request_output=res, - embedding_sequence_length=embedding_sequence_length, - ) + if len(finished_choices | {choice_index}) >= n: + out[ + "completion_usage" + ] = BaseWorkerHandler._build_completion_usage( + request_output=res, + embedding_sequence_length=embedding_sequence_length, + ) # Log completion with LoRA info (debug level to avoid log spam)
🧹 Nitpick comments (1)
tests/frontend/test_vllm.py (1)
509-517: Avoid+=string concatenation in the streaming loop.Collect fragments per choice and join only when needed.
🧹 Proposed fix
- content_per_choice: dict = {} + content_per_choice: dict[int, list[str]] = {} for chunk in chunks: for choice in chunk.get("choices", []): idx = choice["index"] all_indices.add(idx) delta_content = choice.get("delta", {}).get("content", "") if delta_content: - content_per_choice.setdefault(idx, "") - content_per_choice[idx] += delta_content + content_per_choice.setdefault(idx, []).append(delta_content) @@ # Verify each choice produced some content for i in range(5): - assert content_per_choice.get(i), f"Choice {i} has no content" + assert "".join(content_per_choice.get(i, [])), f"Choice {i} has no content"As per coding guidelines, avoid
+=string concatenation inside loops.Also applies to: 539-541
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/frontend/test_vllm.py` around lines 509 - 517, The loop in the streaming test uses in-loop string concatenation on content_per_choice (content_per_choice[idx] += delta_content) which is inefficient; instead collect fragments per choice into a list and join when needed: change content_per_choice to map indices to lists (append delta_content to content_per_choice[idx]) within the chunks/choice loop, and later (where code currently assumes a single string, e.g., the same pattern around lines ~539-541) join the list with ''.join(content_per_choice[idx]) before assertions or further processing; update any usages of content_per_choice to expect the joined string at consumption time.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/frontend/test_vllm.py`:
- Around line 483-502: Wrap the streaming POST call in a context manager to
ensure the socket is always closed: change the bare requests.post(...) that
assigns to response and the subsequent response.iter_lines(...) loop to use
"with requests.post(...) as response:" so the returned Response is closed even
if assertions or json decoding fail; locate this change around the Response
usage in tests/frontend/test_vllm.py where response is created and iterated with
response.iter_lines(decode_unicode=True).
- Line 466: The local import "import json" currently placed inside a function in
test_vllm.py must be moved to module scope; remove the function-local "import
json" and add "import json" with the other top-level imports at the top of the
file so all imports remain at module scope and adhere to the repo import rule.
---
Nitpick comments:
In `@tests/frontend/test_vllm.py`:
- Around line 509-517: The loop in the streaming test uses in-loop string
concatenation on content_per_choice (content_per_choice[idx] += delta_content)
which is inefficient; instead collect fragments per choice into a list and join
when needed: change content_per_choice to map indices to lists (append
delta_content to content_per_choice[idx]) within the chunks/choice loop, and
later (where code currently assumes a single string, e.g., the same pattern
around lines ~539-541) join the list with ''.join(content_per_choice[idx])
before assertions or further processing; update any usages of content_per_choice
to expect the joined string at consumption time.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: fd250d6b-394d-432b-9ca4-19b59148345d
📒 Files selected for processing (5)
components/src/dynamo/vllm/handlers.pylib/llm/src/backend.rslib/llm/src/migration.rslib/llm/src/protocols/openai/chat_completions/delta.rstests/frontend/test_vllm.py
| request, start_services: ServicePorts, predownload_models | ||
| ) -> None: | ||
| """Test that n=5 streaming returns 5 distinct choices with correct indices.""" | ||
| import json |
There was a problem hiding this comment.
Move the json import to module scope.
The new function-local import violates the repo’s import rule.
🧹 Proposed fix
from __future__ import annotations
+import json
import logging
import os
import shutil
@@
def test_multiple_choices_n5(
request, start_services: ServicePorts, predownload_models
) -> None:
"""Test that n=5 streaming returns 5 distinct choices with correct indices."""
- import json
payload = {As per coding guidelines, keep all imports at the top of the file (no imports inside functions/classes).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/frontend/test_vllm.py` at line 466, The local import "import json"
currently placed inside a function in test_vllm.py must be moved to module
scope; remove the function-local "import json" and add "import json" with the
other top-level imports at the top of the file so all imports remain at module
scope and adhere to the repo import rule.
| response = requests.post( | ||
| f"{base_url}/v1/chat/completions", | ||
| headers={"Content-Type": "application/json"}, | ||
| json=payload, | ||
| stream=True, | ||
| timeout=180, | ||
| ) | ||
| assert ( | ||
| response.status_code == 200 | ||
| ), f"Streaming request failed with status {response.status_code}: {response.text}" | ||
|
|
||
| # Parse SSE events | ||
| chunks = [] | ||
| for line in response.iter_lines(decode_unicode=True): | ||
| if not line or not line.startswith("data: "): | ||
| continue | ||
| data_str = line[len("data: ") :] | ||
| if data_str == "[DONE]": | ||
| break | ||
| chunks.append(json.loads(data_str)) |
There was a problem hiding this comment.
Close the streamed response with a context manager.
stream=True keeps the connection open while iterating; wrap the response in with so the socket is released even on assertion/JSON failures.
🧹 Proposed fix
- response = requests.post(
- f"{base_url}/v1/chat/completions",
- headers={"Content-Type": "application/json"},
- json=payload,
- stream=True,
- timeout=180,
- )
- assert (
- response.status_code == 200
- ), f"Streaming request failed with status {response.status_code}: {response.text}"
-
- # Parse SSE events
chunks = []
- for line in response.iter_lines(decode_unicode=True):
- if not line or not line.startswith("data: "):
- continue
- data_str = line[len("data: ") :]
- if data_str == "[DONE]":
- break
- chunks.append(json.loads(data_str))
+ with requests.post(
+ f"{base_url}/v1/chat/completions",
+ headers={"Content-Type": "application/json"},
+ json=payload,
+ stream=True,
+ timeout=180,
+ ) as response:
+ assert (
+ response.status_code == 200
+ ), f"Streaming request failed with status {response.status_code}: {response.text}"
+
+ # Parse SSE events
+ for line in response.iter_lines(decode_unicode=True):
+ if not line or not line.startswith("data: "):
+ continue
+ data_str = line[len("data: ") :]
+ if data_str == "[DONE]":
+ break
+ chunks.append(json.loads(data_str))As per coding guidelines, for tests, ensure no leaked file handles (always use with).
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| response = requests.post( | |
| f"{base_url}/v1/chat/completions", | |
| headers={"Content-Type": "application/json"}, | |
| json=payload, | |
| stream=True, | |
| timeout=180, | |
| ) | |
| assert ( | |
| response.status_code == 200 | |
| ), f"Streaming request failed with status {response.status_code}: {response.text}" | |
| # Parse SSE events | |
| chunks = [] | |
| for line in response.iter_lines(decode_unicode=True): | |
| if not line or not line.startswith("data: "): | |
| continue | |
| data_str = line[len("data: ") :] | |
| if data_str == "[DONE]": | |
| break | |
| chunks.append(json.loads(data_str)) | |
| chunks = [] | |
| with requests.post( | |
| f"{base_url}/v1/chat/completions", | |
| headers={"Content-Type": "application/json"}, | |
| json=payload, | |
| stream=True, | |
| timeout=180, | |
| ) as response: | |
| assert ( | |
| response.status_code == 200 | |
| ), f"Streaming request failed with status {response.status_code}: {response.text}" | |
| # Parse SSE events | |
| for line in response.iter_lines(decode_unicode=True): | |
| if not line or not line.startswith("data: "): | |
| continue | |
| data_str = line[len("data: ") :] | |
| if data_str == "[DONE]": | |
| break | |
| chunks.append(json.loads(data_str)) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/frontend/test_vllm.py` around lines 483 - 502, Wrap the streaming POST
call in a context manager to ensure the socket is always closed: change the bare
requests.post(...) that assigns to response and the subsequent
response.iter_lines(...) loop to use "with requests.post(...) as response:" so
the returned Response is closed even if assertions or json decoding fail; locate
this change around the Response usage in tests/frontend/test_vllm.py where
response is created and iterated with response.iter_lines(decode_unicode=True).
|
Hi @kornelcsernai-harmonic |
Overview:
Adds support for generating multiple choices (n>1) in chat completions for vLLM.
Details:
Pass
nto vLLM and keep track of each decoder independently, handling stopping. Currently no request migration support.Report combined usage statistics.
Where should the reviewer start?
previous_text_per_choice,output_tokens_per_choice,finished_choices). Keeps track of finished decoders infinished_choices. Usage is attached only when all nchoices finish. Maps the n parameter through to vLLM's SamplingParams.
n=5Things to review: correctness, performance.
Related Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to)
Summary by CodeRabbit
Release Notes
New Features
Tests