-
Notifications
You must be signed in to change notification settings - Fork 12
Feature/parallel chain processing #116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This improves efficiency when processing I/O-bound workloads by: - Adding CONSERVER_WORKERS env var to spawn multiple worker processes that independently consume from Redis queues (Phase 1) - Adding CONSERVER_PARALLEL_STORAGE env var to enable concurrent writes to multiple storage backends using ThreadPoolExecutor (Phase 2) Features: - Workers are automatically restarted if they die unexpectedly - Graceful shutdown signals all workers to complete current work - Redis BLPOP provides atomic distribution of vCons to workers - Single-worker mode (default) preserves original behavior
Tests cover: - Worker count configuration from environment variables - Parallel storage enabled/disabled settings - ThreadPoolExecutor behavior with multiple storage backends - Parallel vs sequential timing comparison - Error handling when individual storage backends fail - Signal handling for graceful shutdown - Worker naming conventions and process management
Memory optimization improvements for multi-worker mode: - Add CONSERVER_START_METHOD env var to choose fork/spawn/forkserver - "fork": Copy-on-write memory sharing (Unix), lower memory - "spawn": Fresh interpreter per worker, safer but higher memory - "forkserver": Hybrid approach - Default: platform default (fork on Unix, spawn on Windows) - Move module imports from main() to worker_loop() - Defers heavy imports until worker starts - With 'spawn', each worker only loads what it needs - Reduces base memory footprint significantly - Add tests for start method configuration - Update documentation in example_config.yml
Document the new multi-worker and parallel storage capabilities: - 02_CORE_FUNCTIONALITY.md: Add multi-worker architecture and parallel storage sections, update processing flow description - 06_DEPLOYMENT_OPERATIONS.md: Add CONSERVER_WORKERS, CONSERVER_PARALLEL_STORAGE, CONSERVER_START_METHOD env vars, update scaling strategies with memory optimization guidance - 09_ARCHITECTURE_DESIGN.md: Update architecture diagram to show worker processes and parallel storage, add detailed scalability design section - 10_ROADMAP_FUTURE.md: Update to v1.1, mark parallel processing features as completed - README.md: Add parallel processing to core capabilities, include worker configuration example
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.
This is the final PR Bugbot will review for you during this billing cycle
Your free Bugbot reviews will reset on January 28
Details
Your team is on the Bugbot Free tier. On this plan, Bugbot will review limited PRs each billing cycle for each member of your team.
To receive Bugbot reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.
Comment @cursor review or bugbot run to trigger another review on this PR
|
|
||
| logger.info("Loading required modules") | ||
| # Re-initialize Redis client in worker process | ||
| r = redis_mgr.get_client() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redis client not re-initialized in forked workers
High Severity
The comment says "Re-initialize Redis client in worker process" but redis_mgr.get_client() simply returns the module-level singleton redis object created at import time. With the 'fork' start method (which the documentation explicitly recommends for "memory-efficient on Unix"), all worker processes inherit and share the parent's Redis connection. Redis connections are not fork-safe, and sharing them across processes can cause corrupted reads/writes, protocol errors, and deadlocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds parallelism to the vCon server by introducing multi-process worker execution and optional concurrent storage writes, along with new configuration knobs and updated documentation.
Changes:
- Added multi-worker processing via
multiprocessingcontrolled byCONSERVER_WORKERSandCONSERVER_START_METHOD. - Added optional parallel storage writes via
ThreadPoolExecutorcontrolled byCONSERVER_PARALLEL_STORAGE. - Updated docs/config examples and added tests for the new settings and parallelism behavior.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
server/main.py |
Introduces worker process spawning/monitoring, signal handling, and parallel storage write path. |
server/settings.py |
Adds env-driven worker/parallel-storage/start-method settings. |
server/config.py |
Adds helper accessors/validation for new parallelism settings. |
server/tests/test_worker_config.py |
Adds tests intended to cover new worker/start-method/parallel-storage configuration. |
server/tests/test_parallel_processing.py |
Adds tests intended to cover parallel storage and worker-loop/shutdown behavior. |
example_config.yml |
Documents the new environment variables and their behavior. |
prod_mgt/README.md |
Updates roadmap highlights and adds worker configuration examples. |
prod_mgt/10_ROADMAP_FUTURE.md |
Updates “current state” to v1.1 and enumerates parallel-processing features. |
prod_mgt/09_ARCHITECTURE_DESIGN.md |
Updates architecture diagrams/sections to describe multi-worker + parallel storage. |
prod_mgt/06_DEPLOYMENT_OPERATIONS.md |
Adds deployment guidance and tuning notes for new env vars. |
prod_mgt/02_CORE_FUNCTIONALITY.md |
Updates core flow documentation to reflect multi-worker and parallel storage behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| from config import get_worker_count | ||
| importlib.reload(__import__('config')) | ||
| from config import get_worker_count | ||
|
|
||
| # Default should be 1 | ||
| assert settings.CONSERVER_WORKERS == 1 |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from config import get_worker_count is imported twice here but never used, which will fail flake8 (F401). Either remove these imports or assert on get_worker_count() (instead of only checking settings.CONSERVER_WORKERS).
| from config import get_worker_count | |
| importlib.reload(__import__('config')) | |
| from config import get_worker_count | |
| # Default should be 1 | |
| assert settings.CONSERVER_WORKERS == 1 | |
| import config | |
| importlib.reload(config) | |
| from config import get_worker_count | |
| # Default should be 1 | |
| assert settings.CONSERVER_WORKERS == 1 | |
| assert get_worker_count() == 1 |
| # In main process, signal all workers to shut down | ||
| for worker in worker_processes: | ||
| if worker.is_alive(): | ||
| logger.info("Signaling worker %s (PID %s) to shut down", worker.name, worker.pid) | ||
| try: | ||
| os.kill(worker.pid, signal.SIGTERM) | ||
| except (ProcessLookupError, OSError): |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fork-based workers, this signal_handler will inherit worker_processes from the parent (containing parent-created multiprocessing.Process objects). Calling Process.is_alive() from a non-parent process can raise AssertionError ("can only test a child process"), potentially crashing a worker during shutdown. Guard this loop so only the main process signals workers (e.g., check multiprocessing.current_process().name == "MainProcess" or compare against a stored main PID), or clear/avoid Process objects in child processes.
| follower.start_followers() | ||
|
|
||
| if worker_count > 1: | ||
| # Multi-worker mode: spawn worker processes | ||
| logger.info("Starting %d worker processes", worker_count) | ||
|
|
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
follower.start_followers() starts threading.Timer threads (server/follower.py) before worker processes are spawned. If the start method is "fork" (default on Linux), forking after threads have started is unsafe and can deadlock/crash. Consider spawning worker processes first and only then starting followers in the parent, or enforce/auto-select a non-fork start method when followers are enabled.
| # Wait for all operations to complete and handle results | ||
| for future in as_completed(future_to_storage): | ||
| storage_name = future_to_storage[future] | ||
| try: | ||
| future.result() | ||
| except Exception as e: | ||
| # Error already logged in _process_storage, but log again for parallel context | ||
| logger.error( | ||
| "Parallel storage operation failed for %s on vCon %s: %s", | ||
| storage_name, | ||
| self.vcon_id, | ||
| str(e) | ||
| ) |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_process_storage() catches and logs all Exceptions and does not re-raise, so future.result() in _process_storage_parallel() will almost never raise; the surrounding try/except (and the comment about "already logged") is effectively dead code. Either have _process_storage return/raise so the parallel coordinator can aggregate failures, or remove the redundant exception handling here to avoid misleading control flow and double-logging.
| # Wait for all operations to complete and handle results | |
| for future in as_completed(future_to_storage): | |
| storage_name = future_to_storage[future] | |
| try: | |
| future.result() | |
| except Exception as e: | |
| # Error already logged in _process_storage, but log again for parallel context | |
| logger.error( | |
| "Parallel storage operation failed for %s on vCon %s: %s", | |
| storage_name, | |
| self.vcon_id, | |
| str(e) | |
| ) | |
| # Wait for all operations to complete | |
| for future in as_completed(future_to_storage): | |
| storage_name = future_to_storage[future] | |
| # Ensure any unexpected exceptions from storage tasks surface | |
| future.result() |
| import pytest | ||
| from unittest.mock import patch, MagicMock |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused imports here (pytest, MagicMock) will fail flake8 (F401) based on the repo’s .flake8/pre-commit configuration. Please remove them or use them in the tests.
| import pytest | |
| from unittest.mock import patch, MagicMock | |
| from unittest.mock import patch |
| import os | ||
| import time | ||
| import pytest | ||
| from unittest.mock import patch, MagicMock, PropertyMock |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused imports (os, patch, MagicMock, PropertyMock) will trigger flake8 F401 and fail CI/pre-commit. Remove them or use them in assertions/mocks.
| import os | |
| import time | |
| import pytest | |
| from unittest.mock import patch, MagicMock, PropertyMock | |
| import time | |
| import pytest |
| # Parallel should be significantly faster (at least 2x for 4 backends) | ||
| # Each mock takes 0.1s, so sequential ~0.4s, parallel ~0.1s | ||
| assert parallel_time < sequential_time * 0.75, \ | ||
| f"Parallel ({parallel_time:.3f}s) should be faster than sequential ({sequential_time:.3f}s)" |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test asserts a wall-clock speedup (parallel_time < sequential_time * 0.75) using real sleeps and ThreadPoolExecutor. That’s prone to flakiness on loaded CI (timing variance can invert or narrow the gap). Prefer a deterministic assertion (e.g., verify overlap/concurrency via timestamps in call_log, or mock out sleep/clock) rather than relying on absolute runtime comparisons.
|
|
||
| def test_get_start_method_validates_value(self): | ||
| """Test that get_start_method validates the value.""" | ||
| from config import get_start_method |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from config import get_start_method at line 52 is unused (you re-import it later inside the patched env). This will trigger flake8 F401. Remove the unused import or restructure the test to import once and use that symbol.
| from config import get_start_method |
| from config import get_start_method | ||
|
|
||
| # Valid values should work | ||
| with patch.dict(os.environ, {"CONSERVER_START_METHOD": "fork"}): | ||
| import importlib | ||
| import settings | ||
| importlib.reload(settings) | ||
| import config | ||
| importlib.reload(config) |
Copilot
AI
Jan 23, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Module 'config' is imported with both 'import' and 'import from'.
Module 'server.config' is imported with both 'import' and 'import from'.
| from config import get_start_method | |
| # Valid values should work | |
| with patch.dict(os.environ, {"CONSERVER_START_METHOD": "fork"}): | |
| import importlib | |
| import settings | |
| importlib.reload(settings) | |
| import config | |
| importlib.reload(config) | |
| # Valid values should work | |
| with patch.dict(os.environ, {"CONSERVER_START_METHOD": "fork"}): | |
| import importlib | |
| import settings | |
| importlib.reload(settings) | |
| importlib.reload(__import__('config')) |
This pull request introduces comprehensive support for parallel processing and storage in the vCon server, enabling higher throughput and improved scalability. The documentation, configuration files, and core server code have been updated to support and explain multi-worker processing (via the
CONSERVER_WORKERSenvironment variable), concurrent storage writes (CONSERVER_PARALLEL_STORAGE), and flexible multiprocessing start methods (CONSERVER_START_METHOD). These changes are reflected in both user-facing documentation and internal configuration and control flow, including graceful shutdown and worker management.Parallel Processing & Storage Support
ThreadPoolExecutor, allowing concurrent operations on multiple storage backends for significant throughput improvements. [1] [2] [3] [4] [5]Configuration & Environment Variables
CONSERVER_WORKERS,CONSERVER_PARALLEL_STORAGE, andCONSERVER_START_METHOD, with detailed explanations and examples inexample_config.ymland deployment docs. [1] [2] [3] [4]server/config.pyto retrieve and validate these new configuration options.Documentation & Roadmap Updates
Configuration & Usage Examples
These changes collectively enable the vCon server to handle higher loads with better resource utilization and provide clear guidance for users to deploy and scale the system efficiently.
Note
Introduces configurable parallelism across the processing pipeline and storage layer.
CONSERVER_WORKERS) that atomically consume via RedisBLPOP, with SIGTERM/SIGINT graceful shutdown and auto-restart monitoring; configurable multiprocessing start method (fork/spawn/forkserver).ThreadPoolExecutor, falling back to sequential mode; errors isolated per backend.server/config.py; environment-driven flags inserver/settings.py.Written by Cursor Bugbot for commit 924f6eb. Configure here.