VER-306: Fix crashing issue due to out of memory in Stage 3 machine#69
VER-306: Fix crashing issue due to out of memory in Stage 3 machine#69quancao-ea merged 4 commits intomainfrom
Conversation
Implement SearXNG web search and URL content extraction functionality to enable web-based information gathering in Stage 3 processing. These tools provide asynchronous web search capabilities and HTML-to-markdown conversion for content extraction.
Migrate Stage 3 processing pipeline from synchronous to asynchronous execution with enhanced web search capabilities. Replace Gemini CLI and Google Search grounding with direct SDK web search tools. Key changes: - Convert executor and flow to async/await pattern - Replace CLI and Google Search with custom searxng_web_search/web_url_read tools - Add dedicated constants module for model configuration - Simplify error handling with unified fallback strategy - Pass gemini_client instance instead of API key - Improve memory efficiency with streaming operations
WalkthroughThis PR converts Stage 3 to async execution, introduces SDK-based web-search grounding using SearXNG and URL reading, centralizes a shared GenAI client (removing direct API-key passing), adds web tooling and model constants, and refactors executors, flows, and tasks to use the new async flow. Changes
Sequence Diagram(s)sequenceDiagram
participant Flow as in_depth_analysis (Flow)
participant Task as process_snippet (Task)
participant Executor as Stage3Executor.run_async()
participant GenAI as GenAI Client (SDK)
participant Files as GenAI Files API
participant Search as searxng_web_search (Web Tools)
participant WebRead as web_url_read (Web Tools)
Flow->>Task: await process_snippet(gemini_client, snippet)
Task->>Executor: await run_async(gemini_client, model, audio_file)
Executor->>GenAI: upload file (aio)
GenAI->>Files: store file -> returns file_id
Executor->>GenAI: aio.models.generate_content (with automatic_function_calling)
GenAI->>Search: call searxng_web_search(query)
Search-->>GenAI: results
GenAI->>WebRead: call web_url_read(url)
WebRead-->>GenAI: markdown content
GenAI-->>Executor: analysis + grounding
Executor->>Files: delete(file_id)
Executor-->>Task: analysis result
Task-->>Flow: final result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
📝 Coding Plan
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 Pylint (4.0.5)src/processing_pipeline/stage_3/tasks.py************* Module .pylintrc ... [truncated 10800 characters] ... "module": "src.processing_pipeline.stage_3.tasks", src/processing_pipeline/stage_3/web_tools.py************* Module .pylintrc Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment Tip CodeRabbit can generate a title for your PR based on the changes with custom instructions.Set the |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a critical out-of-memory issue in the Stage 3 processing machine by overhauling how it interacts with the Gemini API and external web resources. The primary change involves migrating the entire Stage 3 pipeline to an asynchronous architecture, utilizing Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request is a significant and well-executed refactoring of the Stage 3 processing pipeline to address out-of-memory issues. By moving from a synchronous, subprocess-based approach to a fully asynchronous implementation using asyncio and the Google GenAI async SDK, the code is now more efficient, scalable, and robust. The introduction of web_tools.py for handling web requests is a clean separation of concerns. The overall changes are excellent. I have one high-severity suggestion regarding exception handling to make the service more manageable by avoiding a broad BaseException catch.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/processing_pipeline/stage_3/executors.py`:
- Around line 60-64: The processing wait loop using
uploaded_audio_file.state.name == "PROCESSING" can hang indefinitely; modify the
logic around that loop (the block updating uploaded_audio_file and calling
gemini_client.files.get) to enforce a maximum wait: introduce a timeout
parameter (e.g., max_wait_seconds or deadline) and break/raise once elapsed,
checking elapsed time each iteration (or use asyncio.wait_for around a helper
coroutine) and surface a clear error or change state if the timeout is reached;
update references in the same function where uploaded_audio_file and
gemini_client.files.get are used so the loop exits deterministically on timeout.
In `@src/processing_pipeline/stage_3/web_tools.py`:
- Around line 48-51: searxng_web_search lacks a timeout and can hang; mirror
web_url_read by adding a 10-second aiohttp timeout. Update the call that opens
the client/session (in searxng_web_search) to pass
timeout=aiohttp.ClientTimeout(total=10) either on ClientSession(...) or on
session.get(...), so response.raise_for_status() and await response.json() are
bounded by 10s; keep existing SSL connector usage
(aiohttp.TCPConnector(ssl=_ssl_context)) when adding the timeout.
- Line 8: SEARXNG_URL currently defaults to an empty string which leads to
requests to "/search" and confusing failures; update the module to validate the
configuration and fail fast: change the default to None (or keep env lookup) and
either (a) raise a clear ValueError during import if SEARXNG_URL is falsy, or
(b) add a guard at the start of searxng_web_search(...) that checks "if not
SEARXNG_URL: raise ValueError('SEARXNG_URL environment variable is not set')" so
callers get an explicit error instead of silent broken requests; reference
SEARXNG_URL and searxng_web_search to locate where to add the validation.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b2615480-abce-46c6-a62a-a782f94a19df
📒 Files selected for processing (6)
requirements.txtsrc/processing_pipeline/stage_3/constants.pysrc/processing_pipeline/stage_3/executors.pysrc/processing_pipeline/stage_3/flows.pysrc/processing_pipeline/stage_3/tasks.pysrc/processing_pipeline/stage_3/web_tools.py
- Narrow BaseException to Exception to avoid catching SystemExit/KeyboardInterrupt - Validate SEARXNG_URL is set - Add 10s HTTP timeout to searxng_web_search (matching web_url_read)
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/processing_pipeline/stage_3/tasks.py`:
- Around line 220-223: Add a safe import for ExceptionGroup and use the backport
on older interpreters: in the modules that reference ExceptionGroup (the
exception-handling block building error_message), wrap the import with a
try/except to import builtins.ExceptionGroup and fall back to the exceptiongroup
backport if ImportError; alternatively, if the project requires Python 3.11+,
ensure pyproject.toml specifies that and add the exceptiongroup backport to
dev/test requirements so imports succeed during development.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 44039503-e8fd-47a7-af97-168b200768e1
📒 Files selected for processing (2)
src/processing_pipeline/stage_3/tasks.pysrc/processing_pipeline/stage_3/web_tools.py
✅ Files skipped from review due to trivial changes (1)
- src/processing_pipeline/stage_3/web_tools.py
Summary by CodeRabbit
New Features
Bug Fixes
Chores