-
-
Notifications
You must be signed in to change notification settings - Fork 52
feat: Add parallel LLM calls implementation (#276) #320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughA new parallel LLM execution framework is introduced via Changes
Sequence DiagramsequenceDiagram
participant Caller
participant Executor as ParallelLLMExecutor
participant RateLimit as RateLimiter
participant Router as LLMRouter
participant ThreadPool as Thread Pool
participant Results as Result Aggregator
Caller->>Executor: execute_batch_async(queries)
Executor->>Executor: create semaphore & tasks
loop For each query
Executor->>RateLimit: acquire()
RateLimit-->>Executor: token available
Executor->>ThreadPool: run_in_executor(router.complete)
ThreadPool->>Router: complete(query)
Router-->>ThreadPool: LLMResponse
ThreadPool-->>Executor: response
alt Success
Executor->>Results: add ParallelResult
else Failure & retry_enabled
Executor->>Executor: exponential backoff
Executor->>RateLimit: acquire()
Executor->>ThreadPool: retry execute
else Failure & no_retry
Executor->>Results: add failed ParallelResult
end
end
Executor->>Results: finalize BatchResult
Results-->>Caller: BatchResult(aggregated stats)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Possibly related PRs
Poem
Pre-merge checks and finishing touches✅ Passed checks (5 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds parallel/concurrent LLM API call support to Cortex Linux, enabling significant performance improvements for batch operations. The implementation introduces a new parallel_llm.py module with async-based concurrent execution, rate limiting, and automatic retry capabilities.
Key Changes:
- New
ParallelLLMExecutorclass for concurrent API calls with semaphore-based concurrency control - Token bucket rate limiter to prevent API throttling
- Helper functions for common batch operations (package queries, error diagnosis, hardware checks)
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 14 comments.
| File | Description |
|---|---|
| cortex/parallel_llm.py | New module implementing parallel LLM execution with rate limiting, retries, and result aggregation |
| tests/test_parallel_llm.py | Comprehensive unit tests covering parallel execution, rate limiting, callbacks, and helper functions |
| docs/PARALLEL_LLM_IMPLEMENTATION.md | Documentation with usage examples, configuration options, and performance benchmarks |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| from cortex.parallel_llm import ParallelLLMExecutor, ParallelQuery | ||
|
|
||
| def on_complete(result): | ||
| status = "✓" if result.success else "✗" | ||
| print(f"{status} {result.query_id} completed in {result.execution_time:.2f}s") | ||
|
|
||
| executor = ParallelLLMExecutor() | ||
| # Use execute_with_callback_async for progress tracking |
Copilot
AI
Dec 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation example is incomplete and shows # Use execute_with_callback_async for progress tracking without providing the actual implementation. This will confuse users trying to understand how to use the callback feature. Complete the example by showing how to use asyncio.run() with executor.execute_with_callback_async(queries, on_complete).
| from cortex.parallel_llm import ParallelLLMExecutor, ParallelQuery | |
| def on_complete(result): | |
| status = "✓" if result.success else "✗" | |
| print(f"{status} {result.query_id} completed in {result.execution_time:.2f}s") | |
| executor = ParallelLLMExecutor() | |
| # Use execute_with_callback_async for progress tracking | |
| import asyncio | |
| from cortex.parallel_llm import ParallelLLMExecutor, ParallelQuery | |
| def on_complete(result): | |
| status = "✓" if result.success else "✗" | |
| print(f"{status} {result.query_id} completed in {result.execution_time:.2f}s") | |
| async def run_with_progress(): | |
| executor = ParallelLLMExecutor() | |
| queries = [ | |
| ParallelQuery(query_id="gpu_check", prompt="Analyze GPU configuration and health."), | |
| ParallelQuery(query_id="cpu_check", prompt="Analyze CPU configuration and health."), | |
| ParallelQuery(query_id="ram_check", prompt="Analyze RAM configuration and health."), | |
| ] | |
| # Use execute_with_callback_async for progress tracking | |
| await executor.execute_with_callback_async(queries, on_complete) | |
| asyncio.run(run_with_progress()) |
| from cortex.llm_router import TaskType | ||
|
|
||
| executor = ParallelLLMExecutor(max_concurrent=5, requests_per_second=10.0) |
Copilot
AI
Dec 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The example initializes ParallelLLMExecutor without providing a router, but the router parameter is shown as optional in the constructor documentation. When router is None, a new LLMRouter() is created. This should be explicitly mentioned in the usage example or shown as executor = ParallelLLMExecutor(router=router, max_concurrent=5, requests_per_second=10.0) to demonstrate proper initialization with an existing router instance.
| from cortex.llm_router import TaskType | |
| executor = ParallelLLMExecutor(max_concurrent=5, requests_per_second=10.0) | |
| from cortex.llm_router import LLMRouter, TaskType | |
| router = LLMRouter() | |
| executor = ParallelLLMExecutor(router=router, max_concurrent=5, requests_per_second=10.0) |
| def test_multiple_rapid_acquires(self): | ||
| """Test multiple rapid acquires work correctly.""" | ||
| limiter = RateLimiter(requests_per_second=10.0) | ||
|
|
||
| async def run_test(): | ||
| for _ in range(5): | ||
| await limiter.acquire() | ||
| # Should have consumed 5 tokens | ||
| self.assertLessEqual(limiter.tokens, 5.5) | ||
|
|
||
| asyncio.run(run_test()) | ||
|
|
Copilot
AI
Dec 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing test coverage for the rate limiter's token replenishment behavior. While tests verify token consumption, there's no test that verifies tokens are correctly replenished over time according to the configured rate. Add a test that waits for tokens to replenish and verifies the rate limiter allows requests after waiting.
| def test_concurrent_execution_time(self): | ||
| """Test that parallel execution is faster than sequential.""" | ||
| import time | ||
|
|
||
| delay_time = 0.1 | ||
|
|
||
| def slow_complete(*args, **kwargs): | ||
| time.sleep(delay_time) | ||
| return self.mock_response | ||
|
|
||
| self.mock_router.complete.side_effect = slow_complete | ||
| executor = ParallelLLMExecutor( | ||
| router=self.mock_router, | ||
| max_concurrent=5, | ||
| requests_per_second=100.0, # High rate to not limit | ||
| ) | ||
| queries = [ | ||
| ParallelQuery(id=f"speed_{i}", messages=[{"role": "user", "content": f"Test {i}"}]) | ||
| for i in range(3) | ||
| ] | ||
|
|
||
| start = time.time() | ||
| result = executor.execute_batch(queries) | ||
| elapsed = time.time() - start | ||
|
|
||
| # Parallel should complete faster than 3 * delay_time | ||
| # Allow some overhead but should be significantly faster | ||
| self.assertLess(elapsed, 3 * delay_time * 0.9) | ||
| self.assertEqual(result.success_count, 3) | ||
|
|
Copilot
AI
Dec 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing test coverage for concurrent execution with actual rate limiting. While test_concurrent_execution_time tests parallelism with a high rate limit (100.0), there's no test that verifies rate limiting actually throttles requests when the limit is low. Add a test with a low rate limit that verifies requests are properly throttled over time.
|
|
||
| async with self._semaphore: | ||
| # Run sync router.complete in thread pool | ||
| loop = asyncio.get_event_loop() |
Copilot
AI
Dec 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential resource leak with asyncio event loop. The code calls asyncio.get_event_loop() which may return a closed loop or create a new loop depending on the context. In Python 3.10+, this is deprecated in favor of asyncio.get_running_loop() which only works within an async context. Since this code is already in an async function, use asyncio.get_running_loop() instead to ensure you're getting the correct running loop and avoid deprecation warnings.
| loop = asyncio.get_event_loop() | |
| loop = asyncio.get_running_loop() |
| id: str | ||
| messages: list[dict[str, str]] | ||
| task_type: TaskType = TaskType.USER_CHAT | ||
| force_provider: LLMProvider | None = None |
Copilot
AI
Dec 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment states "exponential backoff" but the implementation uses linear backoff. The comment should be corrected to "linear backoff" to match the actual behavior, or the implementation should be fixed to use true exponential backoff (see related bug comment).
| start_time = time.time() | ||
|
|
||
| try: | ||
| await self.rate_limiter.acquire() | ||
|
|
||
| async with self._semaphore: | ||
| # Run sync router.complete in thread pool | ||
| loop = asyncio.get_event_loop() | ||
| response = await loop.run_in_executor( | ||
| None, | ||
| lambda: self.router.complete( | ||
| messages=query.messages, | ||
| task_type=query.task_type, | ||
| force_provider=query.force_provider, | ||
| temperature=query.temperature, | ||
| max_tokens=query.max_tokens, | ||
| ), | ||
| ) | ||
|
|
||
| return ParallelResult( | ||
| query_id=query.id, | ||
| response=response, | ||
| success=True, | ||
| execution_time=time.time() - start_time, |
Copilot
AI
Dec 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mixing time.time() and time.monotonic() for timing calculations. The code uses time.monotonic() for rate limiting (line 92, 98) but time.time() for execution timing (lines 146, 169, 184). For measuring elapsed time, time.monotonic() is preferred as it's not affected by system clock adjustments. Consider using time.monotonic() consistently for all execution time measurements.
| async def execute_with_callback_async( | ||
| self, | ||
| queries: list[ParallelQuery], | ||
| on_complete: Callable[[ParallelResult], None] | None = None, | ||
| ) -> BatchResult: |
Copilot
AI
Dec 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method name execute_with_callback_async is inconsistent with the naming pattern of other methods. The other async method is named execute_batch_async (noun + async), but this is execute_with_callback_async (verb + prepositional phrase + async). For consistency, consider renaming to execute_batch_with_callback_async to maintain the parallel structure with execute_batch_async.
| logger.warning(f"Query {query.id} failed (attempt {attempt + 1}): {e}") | ||
|
|
||
| if self.retry_failed and attempt < self.max_retries: | ||
| await asyncio.sleep(0.5 * (attempt + 1)) # exponential backoff |
Copilot
AI
Dec 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exponential backoff implementation is incorrect. The current formula 0.5 * (attempt + 1) results in linear backoff (0.5s, 1.0s, 1.5s), not exponential backoff. For true exponential backoff, use a formula like 0.5 * (2 ** attempt) which would give delays of 0.5s, 1.0s, 2.0s.
| await asyncio.sleep(0.5 * (attempt + 1)) # exponential backoff | |
| await asyncio.sleep(0.5 * (2 ** attempt)) # exponential backoff |
| import os | ||
| import sys | ||
| import unittest | ||
| from unittest.mock import MagicMock, Mock, patch |
Copilot
AI
Dec 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import of 'MagicMock' is not used.
Import of 'patch' is not used.
| from unittest.mock import MagicMock, Mock, patch | |
| from unittest.mock import Mock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (4)
cortex/parallel_llm.py (2)
233-244: Consider documenting that this cannot be called from within an existing event loop.
asyncio.run()will raiseRuntimeErrorif called from within an async context. Consider adding a note in the docstring, or useasyncio.get_event_loop().run_until_complete()for broader compatibility.
270-271: Remove unused variable assignment.
results = []on line 271 is immediately overwritten byresults = await asyncio.gather(*tasks)on line 280 and is never used.🔎 Proposed fix
start_time = time.time() - results = [] async def execute_with_notify(query: ParallelQuery) -> ParallelResult:tests/test_parallel_llm.py (1)
14-14: Consider using proper package structure instead of sys.path manipulation.
sys.path.insert(0, ...)is fragile. Consider usingpytestwith proper package configuration orconftest.pyfor path setup.docs/PARALLEL_LLM_IMPLEMENTATION.md (1)
99-111: Incomplete callback example.The "With Progress Callback" example defines
on_completebut doesn't show how to use it withexecute_with_callback_async. Consider completing the example for clarity.🔎 Suggested completion
from cortex.parallel_llm import ParallelLLMExecutor, ParallelQuery import asyncio def on_complete(result): status = "✓" if result.success else "✗" print(f"{status} {result.query_id} completed in {result.execution_time:.2f}s") async def run_with_callback(): executor = ParallelLLMExecutor() queries = [ ParallelQuery(id="q1", messages=[{"role": "user", "content": "Query 1"}]), ParallelQuery(id="q2", messages=[{"role": "user", "content": "Query 2"}]), ] return await executor.execute_with_callback_async(queries, on_complete) asyncio.run(run_with_callback())
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
cortex/parallel_llm.py(1 hunks)docs/PARALLEL_LLM_IMPLEMENTATION.md(1 hunks)tests/test_parallel_llm.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Follow PEP 8 style guide
Type hints required in Python code
Docstrings required for all public APIs
Files:
tests/test_parallel_llm.pycortex/parallel_llm.py
tests/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
Maintain >80% test coverage for pull requests
Files:
tests/test_parallel_llm.py
🧬 Code graph analysis (2)
tests/test_parallel_llm.py (2)
cortex/llm_router.py (3)
LLMProvider(43-47)LLMResponse(51-60)TaskType(30-40)cortex/parallel_llm.py (13)
BatchResult(54-73)ParallelQuery(30-39)ParallelResult(43-50)RateLimiter(76-108)create_error_diagnosis_queries(329-360)create_hardware_check_queries(363-394)create_package_queries(297-326)get_result(64-69)successful_responses(71-73)acquire(95-108)execute_batch(233-243)execute_with_callback_async(245-294)execute_batch_async(187-231)
cortex/parallel_llm.py (2)
cortex/llm_router.py (4)
LLMProvider(43-47)LLMResponse(51-60)LLMRouter(73-418)TaskType(30-40)tests/test_parallel_llm.py (1)
on_complete(319-320)
🪛 LanguageTool
docs/PARALLEL_LLM_IMPLEMENTATION.md
[grammar] ~131-~131: Use a hyphen to join words.
Context: ...s - Parallel: ~1.2s (accounting for rate limiting overhead) **Speedup: ~4x for 5...
(QB_NEW_EN_HYPHEN)
🪛 markdownlint-cli2 (0.18.1)
docs/PARALLEL_LLM_IMPLEMENTATION.md
133-133: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Build Package
- GitHub Check: Agent
🔇 Additional comments (17)
cortex/parallel_llm.py (7)
1-27: LGTM!Module structure follows best practices with appropriate imports and a comprehensive docstring.
29-74: LGTM!Data classes are well-structured with proper type hints and docstrings. The helper methods
get_resultandsuccessful_responsesare clear and functional.
111-143: LGTM!Executor initialization is well-structured with sensible defaults and proper documentation.
187-232: LGTM!The async batch execution is well-implemented with proper empty-input handling and statistics aggregation.
297-327: LGTM!Helper function is clean and follows the expected pattern with proper documentation.
329-361: LGTM!Error diagnosis query helper is well-structured with appropriate task type assignment.
363-394: LGTM!Hardware check query helper follows the established pattern consistently.
tests/test_parallel_llm.py (7)
29-54: LGTM!Tests for
ParallelQuerydataclass cover creation and metadata scenarios appropriately.
56-91: LGTM!Tests for
ParallelResultcover both success and failure scenarios with appropriate assertions.
93-174: LGTM!
BatchResulttests comprehensively cover statistics, lookup, and filtering functionality.
176-206: LGTM!Rate limiter tests cover initialization and token consumption. Consider adding a test for the waiting behavior when tokens are exhausted to improve coverage.
208-335: LGTM!Executor tests are comprehensive, covering initialization, execution paths, failure handling, retry logic, and callbacks. Good use of mocking for the router.
337-386: LGTM!Helper function tests verify correct ID generation, task type assignment, and custom template handling.
420-449: Timing-based test may be flaky in CI environments.The test relies on wall-clock timing which can vary significantly under load. Consider increasing the tolerance or using a different approach (e.g., verifying that tasks ran concurrently by checking overlapping execution windows).
docs/PARALLEL_LLM_IMPLEMENTATION.md (3)
36-36: Inconsistency with implementation: backoff is linear, not exponential.The documentation states "exponential backoff" but the implementation uses linear backoff (
0.5 * (attempt + 1)). Update either the implementation or documentation to match.
127-134: LGTM!Performance section provides useful context. The static analysis hints about hyphenation and emphasis are stylistic and can be safely ignored.
135-150: LGTM!Documentation is comprehensive and provides clear guidance on testing and future directions.
| async def acquire(self) -> None: | ||
| """Wait until a request token is available.""" | ||
| async with self._lock: | ||
| now = time.monotonic() | ||
| elapsed = now - self.last_update | ||
| self.tokens = min(self.rate, self.tokens + elapsed * self.rate) | ||
| self.last_update = now | ||
|
|
||
| if self.tokens < 1: | ||
| wait_time = (1 - self.tokens) / self.rate | ||
| await asyncio.sleep(wait_time) | ||
| self.tokens = 0 | ||
| else: | ||
| self.tokens -= 1 | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lock held during sleep blocks concurrency.
The acquire() method holds _lock while sleeping (line 105), which blocks all other coroutines from acquiring tokens during the wait. This serializes waiting callers instead of allowing them to calculate and wait concurrently.
🔎 Proposed fix: release lock before sleeping
async def acquire(self) -> None:
"""Wait until a request token is available."""
+ wait_time = 0.0
async with self._lock:
now = time.monotonic()
elapsed = now - self.last_update
self.tokens = min(self.rate, self.tokens + elapsed * self.rate)
self.last_update = now
if self.tokens < 1:
wait_time = (1 - self.tokens) / self.rate
- await asyncio.sleep(wait_time)
self.tokens = 0
else:
self.tokens -= 1
+ return
+
+ if wait_time > 0:
+ await asyncio.sleep(wait_time)🤖 Prompt for AI Agents
In cortex/parallel_llm.py around lines 95 to 109, the acquire() method currently
holds self._lock while awaiting asyncio.sleep, which blocks other coroutines;
modify it to compute now, elapsed, and tentative tokens while holding the lock,
calculate wait_time if tokens < 1, then release the lock before awaiting sleep;
after the sleep re-acquire the lock, recompute now/elapsed/tokens (or update
tokens using the elapsed since last_update), then decrement tokens by 1 and
update last_update; ensure all state mutations (tokens and last_update) occur
under the lock but the actual asyncio.sleep happens outside the locked section.
| # Run sync router.complete in thread pool | ||
| loop = asyncio.get_event_loop() | ||
| response = await loop.run_in_executor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Use asyncio.get_running_loop() instead of deprecated get_event_loop().
asyncio.get_event_loop() is deprecated in Python 3.10+ and will emit a deprecation warning when called from a coroutine.
🔎 Proposed fix
- loop = asyncio.get_event_loop()
+ loop = asyncio.get_running_loop()🤖 Prompt for AI Agents
In cortex/parallel_llm.py around lines 152 to 154, the code calls
asyncio.get_event_loop() from within a coroutine which is deprecated; replace
that call with asyncio.get_running_loop() so the coroutine obtains the currently
running event loop (keeping the existing loop.run_in_executor(...) usage
unchanged) to avoid the deprecation warning and preserve behavior.
| if self.retry_failed and attempt < self.max_retries: | ||
| await asyncio.sleep(0.5 * (attempt + 1)) # exponential backoff | ||
| return await self._execute_single(query, attempt + 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Backoff is linear, not exponential.
The delay 0.5 * (attempt + 1) produces linear backoff (0.5s, 1.0s, 1.5s). True exponential backoff would be 0.5 * (2 ** attempt) (0.5s, 1.0s, 2.0s).
Consider updating either the implementation or the docstring/comments to match.
🔎 Proposed fix for exponential backoff
if self.retry_failed and attempt < self.max_retries:
- await asyncio.sleep(0.5 * (attempt + 1)) # exponential backoff
+ await asyncio.sleep(0.5 * (2 ** attempt)) # exponential backoff
return await self._execute_single(query, attempt + 1)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if self.retry_failed and attempt < self.max_retries: | |
| await asyncio.sleep(0.5 * (attempt + 1)) # exponential backoff | |
| return await self._execute_single(query, attempt + 1) | |
| if self.retry_failed and attempt < self.max_retries: | |
| await asyncio.sleep(0.5 * (2 ** attempt)) # exponential backoff | |
| return await self._execute_single(query, attempt + 1) |
🤖 Prompt for AI Agents
In cortex/parallel_llm.py around lines 175 to 177, the retry delay uses 0.5 *
(attempt + 1) which yields linear backoff (0.5s, 1.0s, 1.5s); change it to
exponential backoff by using 0.5 * (2 ** attempt) so delays become 0.5s, 1.0s,
2.0s, etc., or alternatively update the surrounding comment/docstring to state
that the current behavior is linear backoff if you want to keep the existing
formula.



Related Issue
Closes #276
Summary
This implementation adds parallel/concurrent LLM API call support to Cortex Linux, enabling 2-3x speedup for batch operations.The previous architecture made sequential LLM calls, which was slow for:
asynciowith semaphore-based concurrency controlChecklist
pytest tests/)[#276 ] DescriptionSummary by CodeRabbit
Release Notes
New Features
Documentation
✏️ Tip: You can customize this high-level summary in your review settings.