Skip to content

VER-300: Rebuild Stage 4 - Review with Knowledge Base Augmentation Agent#61

Merged
quancao-ea merged 11 commits intomainfrom
features/reenable-stage-4-review-with-knowledge-base-augmentation-agent
Feb 23, 2026
Merged

VER-300: Rebuild Stage 4 - Review with Knowledge Base Augmentation Agent#61
quancao-ea merged 11 commits intomainfrom
features/reenable-stage-4-review-with-knowledge-base-augmentation-agent

Conversation

@quancao-ea
Copy link
Copy Markdown
Collaborator

@quancao-ea quancao-ea commented Feb 13, 2026

Important

This PR revamps Stage 4 of the disinformation analysis pipeline by introducing a multi-agent review process, expanding the knowledge base schema, and updating prompts and models.

  • Stage 4 Overhaul:
    • Introduces multi-agent review pipeline with kb_researcher, web_researcher, reviewer, and kb_updater agents in agents.py.
    • Adds Stage4Executor in executor.py to manage the review process.
    • Updates flows.py to handle the new review process.
  • Knowledge Base:
    • Creates kb_entries, kb_entry_sources, kb_entry_embeddings, and kb_entry_snippet_usage tables in create_knowledge_base.sql.
    • Adds functions search_kb_entries and find_duplicate_kb_entries in SQL files.
    • Implements search_knowledge_base, upsert_knowledge_entry, and deactivate_knowledge_entry in tools.py.
  • Prompts and Models:
    • Adds prompt instructions for kb_researcher, web_researcher, reviewer, and kb_updater in prompts/stage_4/.
    • Updates output_schema.json to include thought_summaries.
    • Refactors models.py to include new data structures for Stage 4.
  • Miscellaneous:
    • Updates requirements.txt with new dependencies.
    • Refactors main.py and constants.py to support new Stage 4 logic.
    • Updates supabase_utils.py with new database interaction methods.

This description was created by Ellipsis for 137ba05. You can customize this summary. It will automatically update as commits are pushed.

Summary by CodeRabbit

  • New Features

    • Added a verifiable knowledge‑base with semantic search, deduplication, source tracking, and create/update/deactivate workflows.
    • Launched a multi‑agent Stage‑4 review pipeline (KB researcher, web researcher, reviewer, KB updater) with async end‑to‑end processing and grounding metadata.
  • Improvements

    • Reviewer output schema expanded with thought summaries and explicit verification status fields.
  • Bug Fixes

    • Review routing now uses confidence thresholds to determine manual review vs auto‑processing.
  • Chores

    • Updated dependencies and made Stage‑4 run counts configurable.

- Bump requests from 2.32.3 to 2.32.5
- Bump google-genai from 1.45.0 to 1.62.0
- Bump pydantic from 2.10.1 to 2.12.5
- Add google-adk 1.24.1
Implement database schema and search functions for fact-based knowledge
base to augment Stage 4 snippet reviews. Tables store verified facts
with embeddings, sources, and usage tracking. Functions enable semantic
search and deduplication using sub-vector HNSW indexing.
Add agents.py, executor.py, and tools.py to create a multi-agent
review system using Google ADK. The pipeline runs KB research and
web research in parallel, synthesizes findings to revise analysis,
and updates the knowledge base with newly verified facts.
Implement async KB-augmented review with researcher, web search,
and reviewer agents working in parallel. Add thought summaries
tracking and improve error handling.
Remove unnecessary alias from register field to use direct field name.
This simplifies the model definition and improves code clarity.
When skip_review=False, snippets with confidence_scores.overall >= 95
are now set to 'Ready for review' instead of 'Processed', enabling them
to flow into the Stage 4 agentic review pipeline. Default deployment
parameter changed from skip_review=True to skip_review=False.
@linear
Copy link
Copy Markdown

linear Bot commented Feb 13, 2026

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Feb 13, 2026

Walkthrough

Replaces monolithic Stage 4 with a multi-agent async review pipeline: adds agent prompts, new stage_4 package (agents/executor/flows/tasks/models/tools), KB DB schema and SQL functions, extends Supabase client for KB operations, updates entrypoints and constants, and adjusts schemas/prompts for review outputs.

Changes

Cohort / File(s) Summary
Stage 4 Prompt docs
prompts/stage_4/kb_researcher_instruction.md, prompts/stage_4/web_researcher_instruction.md, prompts/stage_4/reviewer_instruction.md, prompts/stage_4/kb_updater_instruction.md
Add four agent instruction documents defining roles, inputs, multi-step research/updater workflows, and strict structured output formats.
Output schema & system text
prompts/stage_4/output_schema.json, prompts/stage_4/system_instruction.md
Modify schema: remove is_convertible, add thought_summaries, add confidence_scores.verification_status enum, add validation_checklist.uncertain_claims_scored_low; remove prior system instruction block.
Stage 4 code split (monolith removed)
src/processing_pipeline/stage_4.py
Delete old monolithic Stage 4 implementation and its helpers/executor.
New Stage 4 package
src/processing_pipeline/stage_4/__init__.py, .../agents.py, .../executor.py, .../flows.py, .../tasks.py, .../models.py, .../tools.py
Introduce agent builder, pipeline orchestration (parallel research → review → KB update), Stage4Executor.run_async with plugin/session handling, async flows, snippet tasks, ReviewAnalysisOutput model, and KB tools (search/upsert/deactivate using embeddings + Supabase).
Supabase client & DB
src/processing_pipeline/supabase_utils.py, supabase/database/sql/create_knowledge_base.sql, supabase/database/sql/search_kb_entries.sql, supabase/database/sql/find_duplicate_kb_entries.sql
Add KB schema (entries, sources, embeddings, usage), RLS/policies, SQL search & dedupe functions, and many SupabaseClient KB APIs (search, dedupe, insert/supersede/deactivate, embedding management, usage recording).
Prompt registration & constants
src/scripts/import_prompts_to_db.py, src/processing_pipeline/constants.py
Replace single STAGE_4 mapping with four stage-specific keys; add PromptStage enum members for new stage IDs; add KB-related constants; remove old stage-4 helper accessors.
Integration & entrypoint changes
src/main.py, src/processing_pipeline/stage_4/__init__.py
Convert test_stage_4 to async and call Stage4Executor.run_async with prompt_versions and reviewer_model; re-export Stage4Executor and analysis_review.
Stage 3 / flow tweaks
src/processing_pipeline/stage_3/models.py, src/processing_pipeline/stage_3/tasks.py, src/processing_pipeline/main.py
Rename Language.register_→register; add CONFIDENCE_THRESHOLD gating and change review trigger logic so confidence influences READY_FOR_REVIEW vs PROCESSED; toggle skip_review behavior.
Dependencies & scripts
requirements.txt, scripts/start_processing.sh
Update/upgrade deps (google-genai, pydantic, add google-adk, urllib3), and make start_processing.sh use env-backed per-stage flow counts; Stage 4 started by default.
CLI/import mapping
src/scripts/import_prompts_to_db.py
PROMPT_MAPPING key/value shape updated to multiple lowercase prompt paths and per-agent stage keys.

Sequence Diagram(s)

sequenceDiagram
    participant Client as Client / main.py
    participant Flow as Analysis Review Flow
    participant Executor as Stage4 Executor
    participant KBR as KB Researcher
    participant WR as Web Researcher
    participant AR as Analysis Reviewer
    participant KBU as KB Updater
    participant DB as Supabase DB

    Client->>Flow: analysis_review(snippet_ids / repeat)
    Flow->>DB: fetch ready-for-review snippet
    DB-->>Flow: snippet JSON
    Flow->>Executor: run_async(transcription, analysis_json, prompt_versions...)
    Executor->>Executor: build_review_pipeline()
    par Parallel Research
        Executor->>KBR: search_knowledge_base(claims)
        KBR->>DB: search_kb_entries(embedding...)
        DB-->>KBR: kb_research results
        KBR-->>Executor: kb_research
    and
        Executor->>WR: web research(claims)
        WR->>WR: external searches & validation
        WR-->>Executor: web_research
    end
    Executor->>AR: revised_analysis(kb_research, web_research)
    AR-->>Executor: revised_analysis JSON
    Executor->>KBU: upsert/deactivate KB entries
    KBU->>DB: insert_kb_entry / supersede / deactivate
    DB-->>KBU: confirmation
    KBU-->>Executor: kb_update_summary
    Executor-->>Flow: (revised_analysis, grounding_metadata)
    Flow->>DB: submit_snippet_review_result(...)
    DB-->>Flow: update complete
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • nhphong

Poem

🐇 I hopped through prompts and code today,
Four agents set to search and say —
KB facts dug up, web trails traced,
Reviews refined, old notes replaced.
Happy thumps — the warren's knowledge grows. 🌱

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 23.08% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly summarizes the main change: a Stage 4 rebuild introducing a multi-agent review pipeline with knowledge base augmentation capabilities.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch features/reenable-stage-4-review-with-knowledge-base-augmentation-agent

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 Pylint (4.0.4)
src/processing_pipeline/stage_4/tools.py

************* Module .pylintrc
.pylintrc:1:0: F0011: error while parsing the configuration: File contains no section headers.
file: '.pylintrc', line: 1
'disable=C0116\n' (config-parse-error)
[
{
"type": "convention",
"module": "src.processing_pipeline.stage_4.tools",
"obj": "",
"line": 1,
"column": 0,
"endLine": null,
"endColumn": null,
"path": "src/processing_pipeline/stage_4/tools.py",
"symbol": "line-too-long",
"message": "Line too long (138/100)",
"message-id": "C0301"
},
{
"type": "convention",
"module": "src.processing_pipeline.stage_4.tools",
"obj": "",
"line": 35,
"column": 0,
"endLine": null,
"endColumn": null,
"path": "src/processing_pipeline/stage_4/tools.py",
"symbol": "line-too-long",
"message": "Line too long (115/100)",
"message-id": "C0301"
},
{
"type": "convention",

... [truncated 7535 characters] ...

4.tools",
"obj": "upsert_knowledge_entry",
"line": 80,
"column": 0,
"endLine": 80,
"endColumn": 26,
"path": "src/processing_pipeline/stage_4/tools.py",
"symbol": "too-many-branches",
"message": "Too many branches (13/12)",
"message-id": "R0912"
},
{
"type": "refactor",
"module": "src.processing_pipeline.stage_4.tools",
"obj": "deactivate_knowledge_entry",
"line": 242,
"column": 4,
"endLine": 245,
"endColumn": 78,
"path": "src/processing_pipeline/stage_4/tools.py",
"symbol": "no-else-return",
"message": "Unnecessary "else" after "return", remove the "else" and de-indent the code inside it",
"message-id": "R1705"
}
]

src/processing_pipeline/constants.py

************* Module .pylintrc
.pylintrc:1:0: F0011: error while parsing the configuration: File contains no section headers.
file: '.pylintrc', line: 1
'disable=C0116\n' (config-parse-error)
[
{
"type": "convention",
"module": "src.processing_pipeline.constants",
"obj": "",
"line": 97,
"column": 0,
"endLine": null,
"endColumn": null,
"path": "src/processing_pipeline/constants.py",
"symbol": "line-too-long",
"message": "Line too long (101/100)",
"message-id": "C0301"
},
{
"type": "convention",
"module": "src.processing_pipeline.constants",
"obj": "",
"line": 101,
"column": 0,
"endLine": null,
"endColumn": null,
"path": "src/processing_pipeline/constants.py",
"symbol": "line-too-long",
"message": "Line too long (115/100)",
"message-id": "C0301"
},
{
"type": "convention",
"module

... [truncated 4614 characters] ...

ini_timestamped_transcription_generation_prompt",
"line": 67,
"column": 0,
"endLine": 67,
"endColumn": 58,
"path": "src/processing_pipeline/constants.py",
"symbol": "missing-function-docstring",
"message": "Missing function or method docstring",
"message-id": "C0116"
},
{
"type": "warning",
"module": "src.processing_pipeline.constants",
"obj": "get_gemini_timestamped_transcription_generation_prompt",
"line": 68,
"column": 11,
"endLine": 68,
"endColumn": 85,
"path": "src/processing_pipeline/constants.py",
"symbol": "unspecified-encoding",
"message": "Using open without explicitly specifying an encoding",
"message-id": "W1514"
}
]

src/processing_pipeline/stage_3/tasks.py

************* Module .pylintrc
.pylintrc:1:0: F0011: error while parsing the configuration: File contains no section headers.
file: '.pylintrc', line: 1
'disable=C0116\n' (config-parse-error)
[
{
"type": "convention",
"module": "src.processing_pipeline.stage_3.tasks",
"obj": "",
"line": 22,
"column": 0,
"endLine": null,
"endColumn": null,
"path": "src/processing_pipeline/stage_3/tasks.py",
"symbol": "line-too-long",
"message": "Line too long (180/100)",
"message-id": "C0301"
},
{
"type": "convention",
"module": "src.processing_pipeline.stage_3.tasks",
"obj": "",
"line": 106,
"column": 0,
"endLine": null,
"endColumn": null,
"path": "src/processing_pipeline/stage_3/tasks.py",
"symbol": "line-too-long",
"message": "Line too long (119/100)",
"message-id": "C0301"
},
{
"type": "convention",

... [truncated 10030 characters] ...

ule": "src.processing_pipeline.stage_3.tasks",
"obj": "process_snippet",
"line": 186,
"column": 0,
"endLine": 186,
"endColumn": 19,
"path": "src/processing_pipeline/stage_3/tasks.py",
"symbol": "too-many-positional-arguments",
"message": "Too many positional arguments (6/5)",
"message-id": "R0917"
},
{
"type": "warning",
"module": "src.processing_pipeline.stage_3.tasks",
"obj": "process_snippet",
"line": 224,
"column": 11,
"endLine": 224,
"endColumn": 20,
"path": "src/processing_pipeline/stage_3/tasks.py",
"symbol": "broad-exception-caught",
"message": "Catching too general exception Exception",
"message-id": "W0718"
}
]

Tip

Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@ellipsis-dev ellipsis-dev Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important

Looks good to me! 👍

Reviewed everything up to 137ba05 in 26 seconds. Click for details.
  • Reviewed 3213 lines of code in 25 files
  • Skipped 0 files when reviewing.
  • Skipped posting 0 draft comments. View those below.
  • Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.

Workflow ID: wflow_Pew9npXzPkuGP2iE

You can customize Ellipsis by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello @quancao-ea, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request represents a significant architectural overhaul of Stage 4 of the disinformation analysis pipeline. It transitions from a monolithic review process to a sophisticated multi-agent system. This new design integrates dedicated agents for researching existing knowledge, performing web-based fact-checking, and maintaining a persistent Knowledge Base of verified facts. The primary goal is to enhance the accuracy, consistency, and explainability of disinformation analysis by providing the main reviewer agent with comprehensive, evidence-backed insights, thereby improving the overall robustness and reliability of the review process.

Highlights

  • Multi-Agent Architecture: The Stage 4 review process has been rebuilt into a multi-agent system, introducing specialized Knowledge Base Researcher, Web Researcher, and Knowledge Base Updater agents.
  • Knowledge Base Integration: A new internal Knowledge Base (KB) has been implemented to store and retrieve verified facts, significantly augmenting the review process with structured, evidence-backed information.
  • Enhanced Reviewer Agent: The main Reviewer agent's instructions and output schema have been updated to leverage findings from the new research agents and to include a detailed 'thought_summaries' field for improved explainability.
  • Database Schema for KB: New database tables and functions were added to support the Knowledge Base, including entries, sources, embeddings, and usage tracking, enabling efficient storage and retrieval of verified facts.
  • Dependency Updates: Core Python dependencies, including 'requests', 'google-genai', and 'pydantic', were updated, and 'google-adk' was added to support the new agent framework.
Changelog
  • prompts/stage_4/kb_researcher_instruction.md
    • Added instructions for the new Knowledge Base Researcher agent, detailing its role, input, search strategy, and output format.
  • prompts/stage_4/kb_updater_instruction.md
    • Added instructions for the new Knowledge Base Updater agent, outlining its role in maintaining the KB, decision framework for creating/updating/deactivating entries, and output format.
  • prompts/stage_4/output_schema.json
    • Modified the output schema for Stage 4 analysis, removing the 'is_convertible' field, adding 'verification_status' and 'uncertain_claims_scored_low' to 'confidence_scores.analysis.validation_checklist', and introducing a 'thought_summaries' field.
  • prompts/stage_4/reviewer_instruction.md
    • Renamed from 'review_prompt.md' and extensively updated with new role definition, input structure, detailed tasks for reviewing against research findings, content preservation rules, confidence scoring guidelines, a required self-review process, and political leaning assessment.
  • prompts/stage_4/system_instruction.md
    • Removed the old system instruction file, as agent-specific instructions are now used.
  • prompts/stage_4/web_researcher_instruction.md
    • Added instructions for the new Web Researcher agent, defining its role, input, research strategy, source tier system, and output format.
  • requirements.txt
    • Updated 'requests' from '2.32.3' to '2.32.5', 'google-genai' from '1.45.0' to '1.62.0', 'pydantic' from '2.10.1' to '2.12.5', and added 'google-adk==1.24.1'.
  • src/main.py
    • Updated the 'test_stage_4' function to use the new asynchronous 'Stage4Executor.run_async' method, retrieve prompt versions from Supabase, and pass additional parameters like 'recorded_at' and 'current_time'.
  • src/processing_pipeline/constants.py
    • Expanded the 'PromptStage' enum to include specific stages for 'STAGE_4_KB_RESEARCHER', 'STAGE_4_WEB_RESEARCHER', 'STAGE_4_REVIEWER', and 'STAGE_4_KB_UPDATER', and removed deprecated Stage 4 prompt getter functions.
  • src/processing_pipeline/main.py
    • Modified the 'in_depth_analysis' deployment parameter 'skip_review' from 'True' to 'False' to enable the review process.
  • src/processing_pipeline/stage_3/models.py
    • Corrected a field name in the 'Language' model from 'register_' to 'register'.
  • src/processing_pipeline/stage_3/tasks.py
    • Adjusted the logic for setting 'ProcessingStatus' to 'READY_FOR_REVIEW' only if 'skip_review' is false and the 'overall' confidence score is 95 or higher, otherwise setting it to 'PROCESSED'.
  • src/processing_pipeline/stage_4.py
    • Removed the previous monolithic Stage 4 implementation file.
  • src/processing_pipeline/stage_4/init.py
    • Added an 'init.py' file to expose 'Stage4Executor' and 'analysis_review' from the new Stage 4 module structure.
  • src/processing_pipeline/stage_4/agents.py
    • Added a new file defining the multi-agent architecture for Stage 4 review, including 'LlmAgent' instances for KB Researcher, Web Researcher, Analysis Reviewer, and KB Updater, orchestrated by 'ParallelAgent' and 'SequentialAgent'.
  • src/processing_pipeline/stage_4/executor.py
    • Added a new file containing the 'Stage4Executor' class, responsible for running the agentic review pipeline asynchronously using 'google.adk', handling session state, and building grounding metadata.
  • src/processing_pipeline/stage_4/flows.py
    • Added a new Prefect flow definition for 'analysis_review', which orchestrates the execution of Stage 4 tasks, including fetching snippets, loading prompt versions, and processing snippets with the new agentic pipeline.
  • src/processing_pipeline/stage_4/models.py
    • Added a new Pydantic model 'ReviewAnalysisOutput' to define the structured output for the revised Stage 4 analysis, including the new 'thought_summaries' field.
  • src/processing_pipeline/stage_4/tasks.py
    • Added new task functions for Stage 4, including 'fetch_a_ready_for_review_snippet_from_supabase', 'fetch_a_specific_snippet_from_supabase', 'prepare_snippet_for_review', 'backup_snippet_analysis', 'submit_snippet_review_result', and 'process_snippet' (now asynchronous).
  • src/processing_pipeline/stage_4/tools.py
    • Added new file defining custom tools for interacting with the Knowledge Base: 'search_knowledge_base', 'upsert_knowledge_entry', and 'deactivate_knowledge_entry', including embedding generation for vector search.
  • src/processing_pipeline/supabase_utils.py
    • Updated the 'submit_snippet_review' method to accept 'reviewed_by' and 'thought_summaries' parameters, and added comprehensive new methods for managing Knowledge Base entries, sources, embeddings, and usage tracking in Supabase.
  • src/scripts/import_prompts_to_db.py
    • Updated the script to import prompts for the newly defined Stage 4 agents ('KB_RESEARCHER', 'WEB_RESEARCHER', 'REVIEWER', 'KB_UPDATER').
  • supabase/database/sql/create_knowledge_base.sql
    • Added a new SQL script to define the schema for the Knowledge Base, including tables for 'kb_entries', 'kb_entry_sources', 'kb_entry_embeddings', and 'kb_entry_snippet_usage', along with RLS policies and indexes.
  • supabase/database/sql/find_duplicate_kb_entries.sql
    • Added a new SQL function to find duplicate knowledge base entries based on embedding similarity for deduplication purposes.
  • supabase/database/sql/search_kb_entries.sql
    • Added a new SQL function to perform a two-stage sub-vector search for knowledge base entries, including filtering by categories and temporal relevance.
Activity
  • The pull request introduces a complete architectural shift for Stage 4 of the processing pipeline, moving from a single LLM-based review to a sophisticated multi-agent system that leverages external tools and a dedicated Knowledge Base.
  • This involves significant code restructuring, new prompt definitions for specialized agents, database schema changes for the KB, and updates to core dependencies.
  • The changes aim to enhance the accuracy and reliability of disinformation analysis by providing more robust research and verification capabilities.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request significantly overhauls the Stage 4 disinformation analysis pipeline, introducing a sophisticated multi-agent review process with modular agents like kb_researcher, web_researcher, reviewer, and kb_updater, and an enhanced Knowledge Base. While this is a major architectural improvement, several high-severity security vulnerabilities were identified that must be addressed before deployment. These include vulnerability to indirect prompt injection across the agent pipeline, a hardcoded session ID causing data leakage between concurrent processes, and potential for SSRF via web research tools, which could compromise the Knowledge Base and internal network security. Additionally, there are a few suggestions to improve maintainability and robustness.

Comment thread src/processing_pipeline/stage_4/executor.py Outdated
Comment on lines +112 to +116
{analysis_json}
```

### Transcription:
{transcription}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The Stage 4 review pipeline is vulnerable to indirect prompt injection. Untrusted data from the radio transcription and Stage 3 analysis is directly embedded into the prompts of multiple LLM agents using simple string substitution (e.g., {transcription}, {analysis_json}). An attacker who can influence the content of the radio broadcast can inject malicious instructions into these fields to manipulate the agents' behavior. This could lead to the agents ignoring their safety guidelines, producing biased or incorrect analysis, or performing unauthorized actions via their tools, such as corrupting the internal Knowledge Base.

Remediation: Implement robust sanitization and delimiting for all untrusted data embedded in prompts. Use system-level instructions to explicitly warn agents about potential injection attempts in the input data. Consider using a more secure way to pass data to agents, such as separate data fields if supported by the framework.

),
timeout=60,
),
tool_filter=["searxng_web_search", "web_url_read"],
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The web_researcher agent is equipped with the web_url_read tool, which allows it to fetch and read content from arbitrary URLs. The agent's research strategy is driven by untrusted input from the radio transcription and search results. If an attacker successfully performs a prompt injection, they can trick the agent into using the web_url_read tool to access internal network resources or sensitive metadata services (e.g., cloud instance metadata). Since the tool is executed in the local environment via npx, it may have access to services not exposed to the public internet.

Remediation: Implement a strict allow-list for domains that the web_researcher agent can access. Ensure the web_url_read tool (or the underlying MCP server) has built-in protections against SSRF, such as blocking requests to private IP ranges.

## Current Review Data

### Revised Analysis:
{revised_analysis}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

Similar to other agents in the pipeline, the kb_updater agent receives untrusted data ({revised_analysis}) directly embedded in its system instruction. This data is the output of the reviewer agent, which itself is influenced by untrusted transcription and web research data. This creates a chain of indirect prompt injection that could ultimately lead to the kb_updater agent making unauthorized or incorrect modifications to the Knowledge Base.

Remediation: Apply the same sanitization and delimiting strategies as recommended for the other agents.

)

status = ProcessingStatus.PROCESSED if skip_review else ProcessingStatus.READY_FOR_REVIEW
needs_review = not skip_review and analyzing_response["response"]["confidence_scores"]["overall"] >= 95
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The confidence score threshold 95 is used here as a magic number. To improve readability and maintainability, this value should be extracted into a named constant (e.g., MIN_CONFIDENCE_FOR_REVIEW). This would make this important business rule more explicit and easier to configure in a central place, such as processing_pipeline/constants.py.


results = supabase_client.search_kb_entries(
query_embedding=embedding,
match_threshold=0.3,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The match_threshold of 0.3 is a magic number. This value is critical for the knowledge base search logic. It should be defined as a named constant (e.g., KB_SEARCH_MATCH_THRESHOLD) for better readability and easier maintenance. A similar issue exists on line 143 with the similarity_threshold of 0.92 for deduplication.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

🤖 Fix all issues with AI agents
In `@requirements.txt`:
- Line 1: The requirements list currently pins requests==2.32.5 but does not
constrain urllib3, allowing resolution to vulnerable urllib3 versions; update
requirements.txt to add a pinned safe urllib3 (e.g., add a line
`urllib3>=2.6.3`) so that package resolution cannot pick urllib3<=2.6.2 and
thereby mitigate CVE-2026-21441; ensure the new urllib3 requirement sits
alongside the existing requests==2.32.5 entry so dependency resolution enforces
the minimum secure version.

In `@src/main.py`:
- Line 17: The assignment to os.environ["GOOGLE_API_KEY"] uses
os.environ.get("GOOGLE_GEMINI_PAID_KEY") which can be None and will raise
TypeError; change the logic in src/main.py to only set
os.environ["GOOGLE_API_KEY"] when os.environ.get("GOOGLE_GEMINI_PAID_KEY")
returns a non-None string (or use a fallback/default) — locate the assignment to
os.environ["GOOGLE_API_KEY"] and guard it with an existence check (or use
setdefault/populate with a default) so you never assign None into os.environ.

In `@src/processing_pipeline/constants.py`:
- Around line 43-46: Remove the unused enum member STAGE_4 from the constants.py
definitions: delete the line defining STAGE_4 so only the new members
(STAGE_4_KB_RESEARCHER, STAGE_4_WEB_RESEARCHER, STAGE_4_REVIEWER,
STAGE_4_KB_UPDATER) remain; verify there are no remaining references to STAGE_4
elsewhere (search for the identifier STAGE_4) and run tests or linting to ensure
no import or usage breakage after removal.

In `@src/processing_pipeline/stage_4/agents.py`:
- Around line 31-41: The code creates searxng_toolset with SEARXNG_URL
defaulting to an empty string which will produce a broken web researcher; modify
the logic around searxng_toolset (the
McpToolset/StdioConnectionParams/StdioServerParameters construction) to first
read and validate os.environ.get("SEARXNG_URL") into a variable (e.g.,
searxng_url) and if it is missing or empty raise/throw a clear config error (or
log and exit) instead of passing an empty string into env; only construct
McpToolset when searxng_url is non-empty so failures are fast and explicit.

In `@src/processing_pipeline/stage_4/flows.py`:
- Line 22: The assignment to os.environ["GOOGLE_API_KEY"] uses
os.environ.get("GOOGLE_GEMINI_PAID_KEY") which can be None and will raise when
assigned; update the code around the os.environ["GOOGLE_API_KEY"] assignment to
first read the value into a variable (e.g., gemini_key =
os.environ.get("GOOGLE_GEMINI_PAID_KEY")) and only set
os.environ["GOOGLE_API_KEY"] if gemini_key is not None (or provide a safe
default/raise a clear error), ensuring the check is done before assigning to
avoid a TypeError.

In `@src/processing_pipeline/stage_4/tasks.py`:
- Around line 48-63: The code calls supabase_client.get_audio_file_by_id which
can return None, then immediately uses audio_file.get(...) causing
AttributeError; update the logic in tasks.py around audio_file to check if
audio_file is None after get_audio_file_by_id (or validate it is a dict), and
handle the missing case by logging a clear error (or raising a descriptive
exception) and returning/aborting the task path instead of building metadata;
ensure the metadata construction (the block that sets
"location_city"/"location_state"/"radio_station_code"/"radio_station_name") only
executes when audio_file is present.
- Around line 104-111: The code in process_snippet currently calls
backup_snippet_analysis(supabase_client, snippet) then sets previous_analysis =
snippet when snippet["previous_analysis"] is falsy, which both redundantly backs
up the full snippet and uses the full object as previous_analysis; instead,
construct a minimal previous_analysis containing only the analysis-related
fields that prepare_snippet_for_review needs (translation, title, summary,
explanation, disinformation_categories, keywords_detected, language,
confidence_scores, political_leaning, recorded_at, audio_file, transcription,
context), set previous_analysis to that minimal dict, and call
backup_snippet_analysis(supabase_client, minimal_previous_analysis) if you still
need to persist a backup; update references to previous_analysis in
process_snippet accordingly so the minimal shape is used rather than the full
snippet.
- Around line 144-150: The except block in
src/processing_pipeline/stage_4/tasks.py assumes ExceptionGroup exists; make the
check version-safe by resolving ExceptionGroup at runtime (e.g.,
ExceptionGroupType = getattr(builtins, "ExceptionGroup", None) or similar) and
then using isinstance(e, ExceptionGroupType) only if ExceptionGroupType is not
None, falling back to treating e as a single exception otherwise; update the
error_msg construction logic around that check (the block that currently builds
error_msg and calls supabase_client.set_snippet_status) to use this safe check,
and if you intend to require Python 3.11+, also add python_requires = ">=3.11"
to pyproject.toml to make the requirement explicit.

In `@src/processing_pipeline/supabase_utils.py`:
- Around line 501-538: The supersede_kb_entry function performs multiple
dependent DB mutations without transactional guarantees, so failures mid-way can
leave inconsistent state; modify supersede_kb_entry to run the whole flow inside
an atomic DB transaction (via a Supabase RPC/SQL function or a client
transaction API) or, if RPC isn't possible, add try/except rollback logic: after
inserting the new row (insert into "kb_entries"), wrap copying sources
(insert_kb_entry_source calls), updating the old entry, and deleting the old
embedding in a single transactional call or, on any exception, delete the newly
inserted entry (using new_entry["id"]) and undo partial updates (e.g., restore
old entry status) and re-raise the error; use the existing helpers
get_kb_entry_by_id, insert_kb_entry_source, delete_kb_entry_embedding, and the
table update call to locate where to add transaction/rollback behavior.

In `@supabase/database/sql/find_duplicate_kb_entries.sql`:
- Around line 23-24: The inline comment after the WHERE clause is misleading:
the query is filtering with "WHERE kee.status = 'Processed'" but the comment
says "Search across ALL statuses for deduplication"; update that comment to
accurately state that the query restricts to status = 'Processed' (because
superseded/deactivated entries have embeddings removed so filtering by embedding
status is equivalent) and remove the contradictory "all statuses" phrasing so
readers understand the actual behavior of the filter in the
find_duplicate_kb_entries logic (reference kee.status = 'Processed' and the
surrounding WHERE clause).
🧹 Nitpick comments (6)
supabase/database/sql/create_knowledge_base.sql (1)

14-58: updated_at columns won't auto-update without a trigger.

Lines 17 and 89 define updated_at TIMESTAMPTZ NOT NULL DEFAULT now() on kb_entries and kb_entry_embeddings, but there's no CREATE TRIGGER ... BEFORE UPDATE to keep them current. The default only fires on INSERT. If application code always sets updated_at explicitly on updates this is harmless, but an auto-update trigger is the typical safeguard.

Example trigger (apply to both tables)
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = now();
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER set_updated_at
    BEFORE UPDATE ON public.kb_entries
    FOR EACH ROW
    EXECUTE FUNCTION update_updated_at_column();

CREATE TRIGGER set_updated_at
    BEFORE UPDATE ON public.kb_entry_embeddings
    FOR EACH ROW
    EXECUTE FUNCTION update_updated_at_column();
prompts/stage_4/web_researcher_instruction.md (1)

102-102: Fenced code blocks lack a language specifier (linter warning).

The two template blocks at lines 102 and 131 don't specify a language, triggering MD040. Since these are structured-text templates rather than code, adding ```text would silence the linter without implying a programming language.

Also applies to: 131-131

src/processing_pipeline/stage_4/flows.py (2)

37-43: Minor: id shadows the Python built-in.

Using id as a loop variable shadows the built-in id() function. Consider renaming to snippet_id.

♻️ Proposed fix
-        for id in snippet_ids:
-            snippet = fetch_a_specific_snippet_from_supabase(supabase_client, id)
+        for snippet_id in snippet_ids:
+            snippet = fetch_a_specific_snippet_from_supabase(supabase_client, snippet_id)

44-60: Polling loop never refreshes prompt versions.

If repeat=True, this loop runs indefinitely but prompt_versions (loaded once at lines 30-35) is never refreshed. If active prompts are updated in the DB while the loop is running, new snippets will still use the old prompt versions. This may be intentional for consistency within a run, but worth noting if hot-reloading prompts is desired.

src/processing_pipeline/stage_4/executor.py (1)

79-96: Unused session variable (flagged by Ruff F841).

The create_session return value isn't used. Prefix with _ to signal the side-effect-only intent.

♻️ Proposed fix
-            session = await session_service.create_session(
+            await session_service.create_session(
src/processing_pipeline/stage_4/tools.py (1)

14-28: New SupabaseClient and OpenAI client created on every tool invocation.

_get_supabase_client() and _generate_embedding() instantiate new clients per call. During a single pipeline run, the KB tools may be invoked multiple times (search, then upsert, possibly multiple upserts). Each creates fresh HTTP clients.

Consider caching these at module level (lazy singleton) or passing them via tool context to avoid repeated instantiation overhead and connection setup.

♻️ Suggested pattern for lazy client caching
+_supabase_client = None
+_openai_client = None
+
+
 def _get_supabase_client():
-    return SupabaseClient(
-        supabase_url=os.getenv("SUPABASE_URL"),
-        supabase_key=os.getenv("SUPABASE_KEY"),
-    )
+    global _supabase_client
+    if _supabase_client is None:
+        _supabase_client = SupabaseClient(
+            supabase_url=os.getenv("SUPABASE_URL"),
+            supabase_key=os.getenv("SUPABASE_KEY"),
+        )
+    return _supabase_client
+
+
+def _get_openai_client():
+    global _openai_client
+    if _openai_client is None:
+        openai_api_key = os.getenv("OPENAI_API_KEY")
+        if not openai_api_key:
+            raise ValueError("OpenAI API key was not set!")
+        _openai_client = OpenAI(api_key=openai_api_key)
+    return _openai_client

Comment thread requirements.txt
Comment thread src/main.py
GEMINI_KEY = os.getenv("GOOGLE_GEMINI_KEY")

async def test_stage_4():
os.environ["GOOGLE_API_KEY"] = os.environ.get("GOOGLE_GEMINI_PAID_KEY")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

os.environ[...] = None raises TypeError when the env var is missing.

os.environ.get("GOOGLE_GEMINI_PAID_KEY") returns None if the variable isn't set. Assigning None to os.environ raises TypeError: str expected, not NoneType.

🐛 Proposed fix
-    os.environ["GOOGLE_API_KEY"] = os.environ.get("GOOGLE_GEMINI_PAID_KEY")
+    gemini_key = os.environ.get("GOOGLE_GEMINI_PAID_KEY")
+    if not gemini_key:
+        raise RuntimeError("GOOGLE_GEMINI_PAID_KEY environment variable is not set")
+    os.environ["GOOGLE_API_KEY"] = gemini_key
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
os.environ["GOOGLE_API_KEY"] = os.environ.get("GOOGLE_GEMINI_PAID_KEY")
gemini_key = os.environ.get("GOOGLE_GEMINI_PAID_KEY")
if not gemini_key:
raise RuntimeError("GOOGLE_GEMINI_PAID_KEY environment variable is not set")
os.environ["GOOGLE_API_KEY"] = gemini_key
🤖 Prompt for AI Agents
In `@src/main.py` at line 17, The assignment to os.environ["GOOGLE_API_KEY"] uses
os.environ.get("GOOGLE_GEMINI_PAID_KEY") which can be None and will raise
TypeError; change the logic in src/main.py to only set
os.environ["GOOGLE_API_KEY"] when os.environ.get("GOOGLE_GEMINI_PAID_KEY")
returns a non-None string (or use a fallback/default) — locate the assignment to
os.environ["GOOGLE_API_KEY"] and guard it with an existence check (or use
setdefault/populate with a default) so you never assign None into os.environ.

Comment on lines +43 to +46
STAGE_4_KB_RESEARCHER = "stage_4_kb_researcher"
STAGE_4_WEB_RESEARCHER = "stage_4_web_researcher"
STAGE_4_REVIEWER = "stage_4_reviewer"
STAGE_4_KB_UPDATER = "stage_4_kb_updater"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

rg -n '\bSTAGE_4\b' --type=py -g '!constants.py' | head -30

Repository: PublicDataWorks/verdad

Length of output: 48


🏁 Script executed:

head -n 100 src/processing_pipeline/constants.py | tail -n 60

Repository: PublicDataWorks/verdad

Length of output: 2568


🏁 Script executed:

cat src/processing_pipeline/import_prompts_to_db.py

Repository: PublicDataWorks/verdad

Length of output: 145


🏁 Script executed:

fd -t f -e py | xargs grep -l "PromptStage" | head -20

Repository: PublicDataWorks/verdad

Length of output: 315


🏁 Script executed:

fd -t f -e py | xargs grep -l "STAGE_4" | head -20

Repository: PublicDataWorks/verdad

Length of output: 191


🏁 Script executed:

find src -name "*mapping*" -o -name "*import*prompt*" | head -20

Repository: PublicDataWorks/verdad

Length of output: 101


🏁 Script executed:

grep -n "STAGE_4" src/main.py src/processing_pipeline/stage_4/flows.py src/scripts/import_prompts_to_db.py

Repository: PublicDataWorks/verdad

Length of output: 1346


🏁 Script executed:

cat src/scripts/import_prompts_to_db.py | head -150

Repository: PublicDataWorks/verdad

Length of output: 5689


New Stage 4 enum members are properly integrated.

The four new PromptStage members (STAGE_4_KB_RESEARCHER, STAGE_4_WEB_RESEARCHER, STAGE_4_REVIEWER, STAGE_4_KB_UPDATER) follow the existing naming convention and are correctly mapped in src/scripts/import_prompts_to_db.py and used in main.py and stage_4/flows.py.

The original STAGE_4 enum member on line 42 is unused and should be removed. It's not referenced anywhere in the codebase and is no longer part of the pipeline architecture.

🤖 Prompt for AI Agents
In `@src/processing_pipeline/constants.py` around lines 43 - 46, Remove the unused
enum member STAGE_4 from the constants.py definitions: delete the line defining
STAGE_4 so only the new members (STAGE_4_KB_RESEARCHER, STAGE_4_WEB_RESEARCHER,
STAGE_4_REVIEWER, STAGE_4_KB_UPDATER) remain; verify there are no remaining
references to STAGE_4 elsewhere (search for the identifier STAGE_4) and run
tests or linting to ensure no import or usage breakage after removal.

Comment on lines +31 to +41
searxng_toolset = McpToolset(
connection_params=StdioConnectionParams(
server_params=StdioServerParameters(
command="npx",
args=["-y", "mcp-searxng"],
env={"SEARXNG_URL": os.environ.get("SEARXNG_URL", "")},
),
timeout=60,
),
tool_filter=["searxng_web_search", "web_url_read"],
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Empty string fallback for SEARXNG_URL will silently produce a broken web researcher.

If SEARXNG_URL is not set, the toolset is created with an empty URL string. The web researcher agent will be constructed successfully but all search calls will fail at runtime. Consider failing fast here.

🐛 Proposed fix
+    searxng_url = os.environ.get("SEARXNG_URL")
+    if not searxng_url:
+        raise ValueError("SEARXNG_URL environment variable is required for web research")
+
     searxng_toolset = McpToolset(
         connection_params=StdioConnectionParams(
             server_params=StdioServerParameters(
                 command="npx",
                 args=["-y", "mcp-searxng"],
-                env={"SEARXNG_URL": os.environ.get("SEARXNG_URL", "")},
+                env={"SEARXNG_URL": searxng_url},
             ),
             timeout=60,
         ),
         tool_filter=["searxng_web_search", "web_url_read"],
     )
🤖 Prompt for AI Agents
In `@src/processing_pipeline/stage_4/agents.py` around lines 31 - 41, The code
creates searxng_toolset with SEARXNG_URL defaulting to an empty string which
will produce a broken web researcher; modify the logic around searxng_toolset
(the McpToolset/StdioConnectionParams/StdioServerParameters construction) to
first read and validate os.environ.get("SEARXNG_URL") into a variable (e.g.,
searxng_url) and if it is missing or empty raise/throw a clear config error (or
log and exit) instead of passing an empty string into env; only construct
McpToolset when searxng_url is non-empty so failures are fast and explicit.

task_runner=ConcurrentTaskRunner,
)
async def analysis_review(snippet_ids, repeat):
os.environ["GOOGLE_API_KEY"] = os.environ.get("GOOGLE_GEMINI_PAID_KEY")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

os.environ.get() can return None, which will crash when assigned to os.environ.

If GOOGLE_GEMINI_PAID_KEY is not set, os.environ.get(...) returns None, and assigning None to os.environ["GOOGLE_API_KEY"] raises a TypeError because environment variable values must be strings.

🐛 Proposed fix
-    os.environ["GOOGLE_API_KEY"] = os.environ.get("GOOGLE_GEMINI_PAID_KEY")
+    gemini_key = os.environ.get("GOOGLE_GEMINI_PAID_KEY")
+    if not gemini_key:
+        raise ValueError("GOOGLE_GEMINI_PAID_KEY environment variable is not set")
+    os.environ["GOOGLE_API_KEY"] = gemini_key
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
os.environ["GOOGLE_API_KEY"] = os.environ.get("GOOGLE_GEMINI_PAID_KEY")
gemini_key = os.environ.get("GOOGLE_GEMINI_PAID_KEY")
if not gemini_key:
raise ValueError("GOOGLE_GEMINI_PAID_KEY environment variable is not set")
os.environ["GOOGLE_API_KEY"] = gemini_key
🤖 Prompt for AI Agents
In `@src/processing_pipeline/stage_4/flows.py` at line 22, The assignment to
os.environ["GOOGLE_API_KEY"] uses os.environ.get("GOOGLE_GEMINI_PAID_KEY") which
can be None and will raise when assigned; update the code around the
os.environ["GOOGLE_API_KEY"] assignment to first read the value into a variable
(e.g., gemini_key = os.environ.get("GOOGLE_GEMINI_PAID_KEY")) and only set
os.environ["GOOGLE_API_KEY"] if gemini_key is not None (or provide a safe
default/raise a clear error), ensuring the check is done before assigning to
avoid a TypeError.

Comment on lines +48 to +63
audio_file = supabase_client.get_audio_file_by_id(
snippet_json["audio_file"],
select="location_city,location_state,radio_station_code,radio_station_name",
)

print(f"Audio file metadata: {audio_file}")

metadata = {
"recorded_at": recorded_at.strftime("%B %-d, %Y %-I:%M %p"),
"recording_day_of_week": recorded_at.strftime("%A"),
"location_city": audio_file.get("location_city"),
"location_state": audio_file.get("location_state"),
"radio_station_code": audio_file.get("radio_station_code"),
"radio_station_name": audio_file.get("radio_station_name"),
"time_zone": "UTC",
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

get_audio_file_by_id can return None, causing AttributeError on .get() calls.

If the audio file isn't found in the database, audio_file will be None, and lines 58-61 will raise AttributeError: 'NoneType' object has no attribute 'get'.

🐛 Proposed fix
     audio_file = supabase_client.get_audio_file_by_id(
         snippet_json["audio_file"],
         select="location_city,location_state,radio_station_code,radio_station_name",
     )
 
+    if not audio_file:
+        raise ValueError(f"Audio file not found for ID: {snippet_json['audio_file']}")
+
     print(f"Audio file metadata: {audio_file}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
audio_file = supabase_client.get_audio_file_by_id(
snippet_json["audio_file"],
select="location_city,location_state,radio_station_code,radio_station_name",
)
print(f"Audio file metadata: {audio_file}")
metadata = {
"recorded_at": recorded_at.strftime("%B %-d, %Y %-I:%M %p"),
"recording_day_of_week": recorded_at.strftime("%A"),
"location_city": audio_file.get("location_city"),
"location_state": audio_file.get("location_state"),
"radio_station_code": audio_file.get("radio_station_code"),
"radio_station_name": audio_file.get("radio_station_name"),
"time_zone": "UTC",
}
audio_file = supabase_client.get_audio_file_by_id(
snippet_json["audio_file"],
select="location_city,location_state,radio_station_code,radio_station_name",
)
if not audio_file:
raise ValueError(f"Audio file not found for ID: {snippet_json['audio_file']}")
print(f"Audio file metadata: {audio_file}")
metadata = {
"recorded_at": recorded_at.strftime("%B %-d, %Y %-I:%M %p"),
"recording_day_of_week": recorded_at.strftime("%A"),
"location_city": audio_file.get("location_city"),
"location_state": audio_file.get("location_state"),
"radio_station_code": audio_file.get("radio_station_code"),
"radio_station_name": audio_file.get("radio_station_name"),
"time_zone": "UTC",
}
🤖 Prompt for AI Agents
In `@src/processing_pipeline/stage_4/tasks.py` around lines 48 - 63, The code
calls supabase_client.get_audio_file_by_id which can return None, then
immediately uses audio_file.get(...) causing AttributeError; update the logic in
tasks.py around audio_file to check if audio_file is None after
get_audio_file_by_id (or validate it is a dict), and handle the missing case by
logging a clear error (or raising a descriptive exception) and
returning/aborting the task path instead of building metadata; ensure the
metadata construction (the block that sets
"location_city"/"location_state"/"radio_station_code"/"radio_station_name") only
executes when audio_file is present.

Comment on lines +104 to +111
@optional_task(log_prints=True)
async def process_snippet(supabase_client, snippet, prompt_versions):
try:
if snippet["previous_analysis"]:
previous_analysis = snippet["previous_analysis"]
else:
backup_snippet_analysis(supabase_client, snippet)
previous_analysis = snippet
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find and inspect the prepare_snippet_for_review function
rg -n "def prepare_snippet_for_review" --type py -A 20

Repository: PublicDataWorks/verdad

Length of output: 1889


🏁 Script executed:

#!/bin/bash
# Check what fields are accessed from snippet_json in the tasks.py file
rg -n "snippet_json\[|snippet\[" src/processing_pipeline/stage_4/tasks.py -A 2

Repository: PublicDataWorks/verdad

Length of output: 2144


🏁 Script executed:

#!/bin/bash
# Also search for prepare_snippet_for_review calls to see how snippet is used
rg -n "prepare_snippet_for_review" --type py -B 2 -A 5

Repository: PublicDataWorks/verdad

Length of output: 8218


🏁 Script executed:

#!/bin/bash
# Check the backup_snippet_analysis function to understand what it does
rg -n "def backup_snippet_analysis" --type py -A 10

Repository: PublicDataWorks/verdad

Length of output: 868


Redundant backup and unclear data preservation design.

On line 110, when snippet["previous_analysis"] is falsy, the code calls backup_snippet_analysis and then uses the raw snippet dict as previous_analysis. This creates a redundant operation: the snippet is backed up only to be used immediately as previous_analysis on the first run. More importantly, backup_snippet_analysis saves the entire snippet object (including fields like id, status, etc.) as the previous analysis, rather than a minimal previous analysis record. Since prepare_snippet_for_review only requires a subset of snippet fields (translation, title, summary, explanation, disinformation_categories, keywords_detected, language, confidence_scores, political_leaning, recorded_at, audio_file, transcription, context), consider whether storing the full snippet is intentional or if the backup should preserve only analysis-related fields.

🤖 Prompt for AI Agents
In `@src/processing_pipeline/stage_4/tasks.py` around lines 104 - 111, The code in
process_snippet currently calls backup_snippet_analysis(supabase_client,
snippet) then sets previous_analysis = snippet when snippet["previous_analysis"]
is falsy, which both redundantly backs up the full snippet and uses the full
object as previous_analysis; instead, construct a minimal previous_analysis
containing only the analysis-related fields that prepare_snippet_for_review
needs (translation, title, summary, explanation, disinformation_categories,
keywords_detected, language, confidence_scores, political_leaning, recorded_at,
audio_file, transcription, context), set previous_analysis to that minimal dict,
and call backup_snippet_analysis(supabase_client, minimal_previous_analysis) if
you still need to persist a backup; update references to previous_analysis in
process_snippet accordingly so the minimal shape is used rather than the full
snippet.

Comment on lines +144 to +150
except Exception as e:
if isinstance(e, ExceptionGroup):
error_msg = "\n".join(f"- {type(exc).__name__}: {exc}" for exc in e.exceptions)
else:
error_msg = str(e)
print(f"Failed to process snippet {snippet['id']}:\n{error_msg}")
supabase_client.set_snippet_status(snippet["id"], "Error", f"[Stage 4] {error_msg}")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check the Python version requirement in the project
echo "=== pyproject.toml ===" && fd -t f "pyproject.toml" --exec cat {} \; 2>/dev/null || echo "Not found"
echo ""
echo "=== setup.cfg ===" && fd -t f "setup.cfg" --exec cat {} \; 2>/dev/null || echo "Not found"
echo ""
echo "=== setup.py ===" && fd -t f "setup.py" --exec cat {} \; 2>/dev/null || echo "Not found"
echo ""
echo "=== .python-version ===" && fd -t f ".python-version" --exec cat {} \; 2>/dev/null || echo "Not found"
echo ""
echo "=== Dockerfile ===" && fd -t f "Dockerfile" --exec grep -i "python" {} \; 2>/dev/null || echo "Not found"
echo ""
echo "=== runtime.txt ===" && fd -t f "runtime.txt" --exec cat {} \; 2>/dev/null || echo "Not found"

Repository: PublicDataWorks/verdad

Length of output: 1066


🏁 Script executed:

#!/bin/bash
# Inspect the actual file to check imports and ExceptionGroup usage
echo "=== Checking file existence ===" && ls -la src/processing_pipeline/stage_4/tasks.py 2>/dev/null || echo "File not found"
echo ""
echo "=== Full file inspection ===" && wc -l src/processing_pipeline/stage_4/tasks.py
echo ""
echo "=== Lines 144-150 ===" && sed -n '144,150p' src/processing_pipeline/stage_4/tasks.py
echo ""
echo "=== All imports at top of file ===" && head -30 src/processing_pipeline/stage_4/tasks.py
echo ""
echo "=== Search for ExceptionGroup usage ===" && rg "ExceptionGroup" src/processing_pipeline/stage_4/tasks.py -n
echo ""
echo "=== Search for ExceptionGroup imports ===" && rg "import.*ExceptionGroup" src/ -n

Repository: PublicDataWorks/verdad

Length of output: 1854


🏁 Script executed:

#!/bin/bash
# Check for any ExceptionGroup usage/import in the entire project
echo "=== Searching entire src/ for ExceptionGroup ===" && rg "ExceptionGroup" src/ -n
echo ""
echo "=== Checking __init__.py files ===" && find src -name "__init__.py" -exec cat {} \;

Repository: PublicDataWorks/verdad

Length of output: 1544


Add explicit Python version requirement and make ExceptionGroup check version-compatible.

The code uses ExceptionGroup (a Python 3.11+ builtin) without an explicit minimum version constraint in pyproject.toml. While Dockerfiles indicate Python 3.11+ targets, the absence of a version lock in project metadata creates brittleness. If the project ever needs to support older Python versions, this will fail silently at runtime.

Use a defensive approach that works across versions:

Suggested fix
     except Exception as e:
-        if isinstance(e, ExceptionGroup):
+        if hasattr(e, 'exceptions'):  # ExceptionGroup (Python 3.11+)
             error_msg = "\n".join(f"- {type(exc).__name__}: {exc}" for exc in e.exceptions)
         else:
             error_msg = str(e)

Alternatively, add python_requires = ">=3.11" to pyproject.toml if Python 3.11+ is a hard requirement.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except Exception as e:
if isinstance(e, ExceptionGroup):
error_msg = "\n".join(f"- {type(exc).__name__}: {exc}" for exc in e.exceptions)
else:
error_msg = str(e)
print(f"Failed to process snippet {snippet['id']}:\n{error_msg}")
supabase_client.set_snippet_status(snippet["id"], "Error", f"[Stage 4] {error_msg}")
except Exception as e:
if hasattr(e, 'exceptions'): # ExceptionGroup (Python 3.11+)
error_msg = "\n".join(f"- {type(exc).__name__}: {exc}" for exc in e.exceptions)
else:
error_msg = str(e)
print(f"Failed to process snippet {snippet['id']}:\n{error_msg}")
supabase_client.set_snippet_status(snippet["id"], "Error", f"[Stage 4] {error_msg}")
🧰 Tools
🪛 Ruff (0.15.0)

[warning] 144-144: Do not catch blind exception: Exception

(BLE001)


[error] 145-145: Undefined name ExceptionGroup. Consider specifying requires-python = ">= 3.11" or tool.ruff.target-version = "py311" in your pyproject.toml file.

(F821)

🤖 Prompt for AI Agents
In `@src/processing_pipeline/stage_4/tasks.py` around lines 144 - 150, The except
block in src/processing_pipeline/stage_4/tasks.py assumes ExceptionGroup exists;
make the check version-safe by resolving ExceptionGroup at runtime (e.g.,
ExceptionGroupType = getattr(builtins, "ExceptionGroup", None) or similar) and
then using isinstance(e, ExceptionGroupType) only if ExceptionGroupType is not
None, falling back to treating e as a single exception otherwise; update the
error_msg construction logic around that check (the block that currently builds
error_msg and calls supabase_client.set_snippet_status) to use this safe check,
and if you intend to require Python 3.11+, also add python_requires = ">=3.11"
to pyproject.toml to make the requirement explicit.

Comment on lines +501 to +538
def supersede_kb_entry(self, old_entry_id, new_entry_data):
"""Create a new version of a KB entry. Deactivates old, inserts new."""
# Get old entry to determine new version number
old_entry = self.get_kb_entry_by_id(old_entry_id)
if not old_entry:
raise ValueError(f"KB entry not found: {old_entry_id}")

new_entry_data["version"] = old_entry["version"] + 1
new_entry_data["previous_version"] = old_entry_id

# Insert new entry
new_response = self.client.table("kb_entries").insert(new_entry_data).execute()
new_entry = new_response.data[0]

# Copy sources from old entry to new entry
old_sources = self.get_kb_entry_sources(old_entry_id)
for source in old_sources:
self.insert_kb_entry_source(
kb_entry_id=new_entry["id"],
url=source["url"],
source_name=source["source_name"],
source_type=source["source_type"],
title=source.get("title"),
relevant_excerpt=source.get("relevant_excerpt"),
publication_date=source.get("publication_date"),
relevance_to_claim=source.get("relevance_to_claim", "provides_context"),
)

# Update old entry: superseded
self.client.table("kb_entries").update({
"status": "superseded",
"superseded_by": new_entry["id"],
}).eq("id", old_entry_id).execute()

# Delete old embedding
self.delete_kb_entry_embedding(old_entry_id)

return new_entry
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

supersede_kb_entry is a multi-step mutation without transactional guarantees.

This method performs 4 sequential mutations (insert new entry → copy sources → update old entry status → delete old embedding). If any intermediate step fails, the KB is left in an inconsistent state — e.g., a new entry without sources, or both old and new entries marked as active.

Consider wrapping this logic in a Supabase RPC/database function to ensure atomicity, or at minimum add error handling that attempts to roll back partial changes (e.g., delete the newly inserted entry if source copying fails).

🧰 Tools
🪛 Ruff (0.15.0)

[warning] 506-506: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In `@src/processing_pipeline/supabase_utils.py` around lines 501 - 538, The
supersede_kb_entry function performs multiple dependent DB mutations without
transactional guarantees, so failures mid-way can leave inconsistent state;
modify supersede_kb_entry to run the whole flow inside an atomic DB transaction
(via a Supabase RPC/SQL function or a client transaction API) or, if RPC isn't
possible, add try/except rollback logic: after inserting the new row (insert
into "kb_entries"), wrap copying sources (insert_kb_entry_source calls),
updating the old entry, and deleting the old embedding in a single transactional
call or, on any exception, delete the newly inserted entry (using
new_entry["id"]) and undo partial updates (e.g., restore old entry status) and
re-raise the error; use the existing helpers get_kb_entry_by_id,
insert_kb_entry_source, delete_kb_entry_embedding, and the table update call to
locate where to add transaction/rollback behavior.

Comment on lines +23 to +24
WHERE kee.status = 'Processed'
-- Search across ALL statuses for deduplication
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Misleading inline comment contradicts the actual WHERE clause.

Line 23 filters kee.status = 'Processed', but the comment on line 24 says "Search across ALL statuses for deduplication." The function header (lines 2-3) explains the rationale — superseded/deactivated entries have their embeddings deleted, so filtering on embedding status is effectively equivalent. However, the inline comment suggests no status filter is applied, which is confusing.

Suggested comment fix
         WHERE kee.status = 'Processed'
-            -- Search across ALL statuses for deduplication
+            -- Effectively searches all kb_entry statuses: superseded/deactivated
+            -- entries have their embeddings deleted, so only active entries remain.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
WHERE kee.status = 'Processed'
-- Search across ALL statuses for deduplication
WHERE kee.status = 'Processed'
-- Effectively searches all kb_entry statuses: superseded/deactivated
-- entries have their embeddings deleted, so only active entries remain.
🤖 Prompt for AI Agents
In `@supabase/database/sql/find_duplicate_kb_entries.sql` around lines 23 - 24,
The inline comment after the WHERE clause is misleading: the query is filtering
with "WHERE kee.status = 'Processed'" but the comment says "Search across ALL
statuses for deduplication"; update that comment to accurately state that the
query restricts to status = 'Processed' (because superseded/deactivated entries
have embeddings removed so filtering by embedding status is equivalent) and
remove the contradictory "all statuses" phrasing so readers understand the
actual behavior of the filter in the find_duplicate_kb_entries logic (reference
kee.status = 'Processed' and the surrounding WHERE clause).

quancao-ea and others added 2 commits February 13, 2026 16:50
Allow each stage's flow run count to be overridden through
environment variables, enabling flexible pipeline configuration.
Stages can be skipped by setting their flow runs to 0.
Remove hardcoded Stage 4 conditional in favor of unified approach.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@src/processing_pipeline/stage_4/executor.py`:
- Line 79: The call to session_service.create_session(...) returns a value
that's never used and triggers a lint error; rename the local variable from
session to _session (or prefix it with an underscore) where create_session is
awaited so the returned value is intentionally unused and the later get_session
usage remains unchanged; update the assignment at the create_session call site
in executor.py to use _session to silence Ruff F841.
- Around line 136-139: The code currently does json.loads(revised_analysis) on
LLM-produced text (revised_analysis from final_session.state) and will crash
with a JSONDecodeError if the model emits malformed JSON; update the block
around revised_analysis/result to check if revised_analysis is a dict else
attempt json.loads inside a try/except catching json.JSONDecodeError and raise a
clear ValueError that includes the raw revised_analysis text (and original
exception message) to aid debugging; reference the variables revised_analysis,
final_session.state and result when making this change and optionally log the
error before raising.
🧹 Nitpick comments (2)
src/processing_pipeline/stage_4/executor.py (2)

36-48: Missing return type annotation on run_async.

The docstring documents the return as tuple: (result_dict, grounding_metadata_str), but the method signature lacks a return type hint. Adding -> tuple[dict, Optional[str]] would improve clarity and enable static type checking.

Proposed fix
     `@classmethod`
     async def run_async(
         cls,
         snippet_id: str,
         transcription: str,
         disinformation_snippet: str,
         metadata: dict,
         analysis_json: dict,
         recorded_at: str,
         current_time: str,
         prompt_versions: dict[str, dict],
         reviewer_model: GeminiModel,
-    ):
+    ) -> tuple[dict, Optional[str]]:

72-78: Move try to immediately after build_review_pipeline to guarantee toolset cleanup.

If any statement on lines 73–77 were to raise (unlikely today, but fragile for future edits), searxng_toolset.close() would never execute. Wrapping the toolset in the try/finally right after creation is the safer pattern.

Proposed fix
         review_pipeline, searxng_toolset = build_review_pipeline(prompt_versions, reviewer_model)
-        session_service = InMemorySessionService()
-        app_name = "stage4_review"
-        user_id = "pipeline"
-        session_id = f"stage4_review_session_{snippet_id}"
-
         try:
+            session_service = InMemorySessionService()
+            app_name = "stage4_review"
+            user_id = "pipeline"
+            session_id = f"stage4_review_session_{snippet_id}"
+
             session = await session_service.create_session(

Also applies to: 149-152

session_id = f"stage4_review_session_{snippet_id}"

try:
session = await session_service.create_session(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Unused session variable (Ruff F841).

The return value of create_session is never referenced; the session is later retrieved via get_session. Prefix with _ to silence the lint error.

Proposed fix
-            session = await session_service.create_session(
+            _ = await session_service.create_session(
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
session = await session_service.create_session(
_ = await session_service.create_session(
🧰 Tools
🪛 Ruff (0.15.0)

[error] 79-79: Local variable session is assigned to but never used

Remove assignment to unused variable session

(F841)

🤖 Prompt for AI Agents
In `@src/processing_pipeline/stage_4/executor.py` at line 79, The call to
session_service.create_session(...) returns a value that's never used and
triggers a lint error; rename the local variable from session to _session (or
prefix it with an underscore) where create_session is awaited so the returned
value is intentionally unused and the later get_session usage remains unchanged;
update the assignment at the create_session call site in executor.py to use
_session to silence Ruff F841.

Comment on lines +136 to +139
revised_analysis = final_session.state.get("revised_analysis", "")
if not revised_analysis:
raise ValueError("No revised analysis produced by the review pipeline")
result = revised_analysis if isinstance(revised_analysis, dict) else json.loads(revised_analysis)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

json.loads on LLM output without error handling — will crash on malformed JSON.

revised_analysis comes from session state populated by an LLM agent. If the model produces invalid JSON, json.loads raises json.JSONDecodeError with a cryptic traceback. Wrap it to surface a clear error.

Proposed fix
             revised_analysis = final_session.state.get("revised_analysis", "")
             if not revised_analysis:
                 raise ValueError("No revised analysis produced by the review pipeline")
-            result = revised_analysis if isinstance(revised_analysis, dict) else json.loads(revised_analysis)
+            if isinstance(revised_analysis, dict):
+                result = revised_analysis
+            else:
+                try:
+                    result = json.loads(revised_analysis)
+                except json.JSONDecodeError as e:
+                    raise ValueError(
+                        f"Review pipeline produced invalid JSON for revised_analysis: {e}"
+                    ) from e
🧰 Tools
🪛 Ruff (0.15.0)

[warning] 138-138: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In `@src/processing_pipeline/stage_4/executor.py` around lines 136 - 139, The code
currently does json.loads(revised_analysis) on LLM-produced text
(revised_analysis from final_session.state) and will crash with a
JSONDecodeError if the model emits malformed JSON; update the block around
revised_analysis/result to check if revised_analysis is a dict else attempt
json.loads inside a try/except catching json.JSONDecodeError and raise a clear
ValueError that includes the raw revised_analysis text (and original exception
message) to aid debugging; reference the variables revised_analysis,
final_session.state and result when making this change and optionally log the
error before raising.

Moved hardcoded confidence and similarity thresholds to named
constants in constants.py for better maintainability and
configurability across Stage 3 and Stage 4 processing.
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (2)
src/processing_pipeline/constants.py (1)

47-51: STAGE_4 on line 47 is still unused — remove it.

The original STAGE_4 member is not referenced anywhere in the codebase and has been superseded by the four new STAGE_4_* members. This was already flagged in a prior review.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/processing_pipeline/constants.py` around lines 47 - 51, Remove the unused
STAGE_4 constant from the constants module: delete the STAGE_4 = "stage_4" entry
in src/processing_pipeline/constants.py and ensure no other code references the
symbol STAGE_4 (rename usages if any); keep the four specific constants
STAGE_4_KB_RESEARCHER, STAGE_4_WEB_RESEARCHER, STAGE_4_REVIEWER, and
STAGE_4_KB_UPDATER unchanged.
requirements.txt (1)

1-2: urllib3>=2.6.3 correctly mitigates CVE-2026-21441.

The pin addresses the decompression-bomb DoS vulnerability flagged in the prior review.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@requirements.txt` around lines 1 - 2, Update the dependency entry in
requirements.txt to explicitly pin the urllib3 version to the patched release
that mitigates CVE-2026-21441 (change the current "urllib3>=2.6.3" to an
explicit "urllib3==2.6.3" or another exact patched version) and verify
compatibility with the existing requests==2.32.5 entry so the decompression-bomb
DoS fix is enforced and builds remain reproducible.
🧹 Nitpick comments (4)
src/processing_pipeline/stage_4/tools.py (3)

18-32: New SupabaseClient and OpenAI client instantiated on every tool call.

Both _get_supabase_client() and _generate_embedding() create fresh client instances per invocation. During a Stage 4 flow that calls search_knowledge_base and then upsert_knowledge_entry, this means multiple redundant client constructions and potentially new HTTP connections.

Consider caching or lazily initializing these at the module level (e.g., via functools.lru_cache or a module-level singleton).

♻️ Example: cache with lru_cache
+from functools import lru_cache
+
+@lru_cache(maxsize=1)
 def _get_supabase_client():
     return SupabaseClient(
         supabase_url=os.getenv("SUPABASE_URL"),
         supabase_key=os.getenv("SUPABASE_KEY"),
     )
+
+@lru_cache(maxsize=1)
+def _get_openai_client():
+    openai_api_key = os.getenv("OPENAI_API_KEY")
+    if not openai_api_key:
+        raise ValueError("OpenAI API key was not set!")
+    return OpenAI(api_key=openai_api_key)
+
 
 def _generate_embedding(text: str) -> list[float]:
-    openai_api_key = os.getenv("OPENAI_API_KEY")
-    if not openai_api_key:
-        raise ValueError("OpenAI API key was not set!")
-
-    client = OpenAI(api_key=openai_api_key)
+    client = _get_openai_client()
     response = client.embeddings.create(model="text-embedding-3-large", input=text)
     return response.data[0].embedding
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/processing_pipeline/stage_4/tools.py` around lines 18 - 32, The code
instantiates SupabaseClient and OpenAI on every call which is wasteful; change
_get_supabase_client and _generate_embedding to return cached/lazily-initialized
clients instead of creating new ones each invocation (e.g., apply
functools.lru_cache to _get_supabase_client or maintain a module-level singleton
variable for SupabaseClient, and similarly cache the OpenAI client used inside
_generate_embedding), ensure you still read env vars once during initialization
and reuse the same SupabaseClient and OpenAI instances across calls (referencing
SupabaseClient, _get_supabase_client, OpenAI, and _generate_embedding).

31-32: Embedding model name "text-embedding-3-large" is hardcoded in three places.

Lines 31, 201, and 211 all reference "text-embedding-3-large". Extract it to a constant in constants.py to keep the model choice in sync.

Also applies to: 201-201, 211-211

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/processing_pipeline/stage_4/tools.py` around lines 31 - 32, The embedding
model name "text-embedding-3-large" is hardcoded in multiple places; add a
constant (e.g., EMBEDDING_MODEL = "text-embedding-3-large") to constants.py and
replace the literal occurrences in src/processing_pipeline/stage_4/tools.py with
that constant (references where response =
client.embeddings.create(model="text-embedding-3-large", input=text) and the
other two identical calls). Import the constant from constants.py into tools.py
and use it for the model parameter so all three usages share the single source
of truth.

160-160: Hardcoded GeminiModel.GEMINI_2_5_FLASH_PREVIEW_09_2025 as created_by_model.

Both the create and update paths hardcode the model to GEMINI_2_5_FLASH_PREVIEW_09_2025. If the Stage 4 executor model changes (or differs between agents), this provenance metadata will be wrong. Consider passing the model identifier in as a parameter or deriving it from the agent context.

Also applies to: 185-185

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/processing_pipeline/stage_4/tools.py` at line 160, Replace the hardcoded
GeminiModel.GEMINI_2_5_FLASH_PREVIEW_09_2025 value used for the
"created_by_model" metadata in tools.py with a model identifier passed in or
derived from the agent/context; update both the create and update code paths
that currently set "created_by_model" to use a parameter (e.g., model_id or
executor_model) or pull from the agent execution context, and add a sensible
fallback (e.g., the existing GeminiModel constant) only if no model is provided
so provenance stays accurate when the Stage 4 executor model changes.
src/processing_pipeline/stage_3/tasks.py (1)

200-203: Verify the confidence gate direction is intentional: high confidence → review, low → skip.

The condition sends snippets with overall >= 95 to READY_FOR_REVIEW (Stage 4) and marks lower-confidence ones as PROCESSED. This makes sense if Stage 4 is an enrichment step for high-conviction detections, but reads counterintuitively if one expects "review" to mean "uncertain, needs human check."

A brief inline comment clarifying the intent (e.g., "Only invest in Stage 4 KB-augmented review for high-confidence detections") would help future readers.

Also, if confidence_scores or overall is ever missing from the response, this will raise an unhandled KeyError/TypeError. Consider defensive access if the upstream schema isn't strictly guaranteed.

💡 Suggested clarifying comment
+        # Only route high-confidence detections to Stage 4 KB-augmented review;
+        # low-confidence results are finalized immediately.
         needs_review = (
             not skip_review and analyzing_response["response"]["confidence_scores"]["overall"] >= CONFIDENCE_THRESHOLD
         )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/processing_pipeline/stage_3/tasks.py` around lines 200 - 203, The
confidence-gate currently treats high confidence
(analyzing_response["response"]["confidence_scores"]["overall"] >=
CONFIDENCE_THRESHOLD) as needing review, which is counterintuitive and risks
KeyError/TypeError if fields are missing; update the block around
needs_review/skip_review to (1) add a brief inline comment clarifying the
intended semantics (e.g., "Only send high-confidence detections to Stage 4
KB-augmented review") and (2) compute the overall score defensively by
extracting analyzing_response.get("response", {}).get("confidence_scores",
{}).get("overall") (or equivalent defensive access) and ensuring it's a numeric
value (fallback to -inf or 0) before comparing to CONFIDENCE_THRESHOLD, then set
status to ProcessingStatus.READY_FOR_REVIEW or ProcessingStatus.PROCESSED as
before using the safely computed needs_review.
ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 6dbb26c and ee5afae.

📒 Files selected for processing (4)
  • requirements.txt
  • src/processing_pipeline/constants.py
  • src/processing_pipeline/stage_3/tasks.py
  • src/processing_pipeline/stage_4/tools.py
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@requirements.txt`:
- Line 23: Update requirements.txt to explicitly pin pydantic-core to the
matching version for pydantic 2.12.5 by adding a line for pydantic-core==2.41.5
(so the file contains both pydantic==2.12.5 and pydantic-core==2.41.5); this
ensures deterministic installs across CI/Docker/local and prevents the Pydantic
startup error from an incompatible core version.
- Around line 12-13: Replace the yanked supabase==2.22.0 entry with a non-yanked
constraint (e.g., supabase>=2.23.0 or supabase>=3.0.0) in requirements.txt to
eliminate the broken release; for google-adk==1.24.1, audit where google-adk is
used (search for imports/use-sites referencing google_adk or related ADK APIs)
and either pin only the specific Google Cloud client packages you actually need
(e.g., google-cloud-storage, google-cloud-aiplatform) instead of the umbrella
google-adk, or bump google-adk to a fixed release (google-adk>=1.25.1) if you
truly need the ADK bundle, thereby reducing transitive dependencies and image
size.

In `@src/processing_pipeline/stage_4/tools.py`:
- Around line 150-212: The current multi-step write flow using
supabase_client.supersede_kb_entry / supabase_client.insert_kb_entry →
supabase_client.insert_kb_entry_source →
supabase_client.upsert_kb_entry_embedding can leave orphaned entries on partial
failure; modify upsert_knowledge_entry to perform these operations inside a
single DB transaction if Supabase client supports it, or at minimum wrap the
sequence in a try/except that on any exception calls
supabase_client.delete_kb_entry(entry["id"]) (and/or supersede rollback) to
remove the partially-created entry, ensuring insert_kb_entry_source and
upsert_kb_entry_embedding are only attempted after a successful entry write and
that any failure triggers cleanup of entry id created by insert_kb_entry or
supersede_kb_entry.

---

Duplicate comments:
In `@requirements.txt`:
- Around line 1-2: Update the dependency entry in requirements.txt to explicitly
pin the urllib3 version to the patched release that mitigates CVE-2026-21441
(change the current "urllib3>=2.6.3" to an explicit "urllib3==2.6.3" or another
exact patched version) and verify compatibility with the existing
requests==2.32.5 entry so the decompression-bomb DoS fix is enforced and builds
remain reproducible.

In `@src/processing_pipeline/constants.py`:
- Around line 47-51: Remove the unused STAGE_4 constant from the constants
module: delete the STAGE_4 = "stage_4" entry in
src/processing_pipeline/constants.py and ensure no other code references the
symbol STAGE_4 (rename usages if any); keep the four specific constants
STAGE_4_KB_RESEARCHER, STAGE_4_WEB_RESEARCHER, STAGE_4_REVIEWER, and
STAGE_4_KB_UPDATER unchanged.

---

Nitpick comments:
In `@src/processing_pipeline/stage_3/tasks.py`:
- Around line 200-203: The confidence-gate currently treats high confidence
(analyzing_response["response"]["confidence_scores"]["overall"] >=
CONFIDENCE_THRESHOLD) as needing review, which is counterintuitive and risks
KeyError/TypeError if fields are missing; update the block around
needs_review/skip_review to (1) add a brief inline comment clarifying the
intended semantics (e.g., "Only send high-confidence detections to Stage 4
KB-augmented review") and (2) compute the overall score defensively by
extracting analyzing_response.get("response", {}).get("confidence_scores",
{}).get("overall") (or equivalent defensive access) and ensuring it's a numeric
value (fallback to -inf or 0) before comparing to CONFIDENCE_THRESHOLD, then set
status to ProcessingStatus.READY_FOR_REVIEW or ProcessingStatus.PROCESSED as
before using the safely computed needs_review.

In `@src/processing_pipeline/stage_4/tools.py`:
- Around line 18-32: The code instantiates SupabaseClient and OpenAI on every
call which is wasteful; change _get_supabase_client and _generate_embedding to
return cached/lazily-initialized clients instead of creating new ones each
invocation (e.g., apply functools.lru_cache to _get_supabase_client or maintain
a module-level singleton variable for SupabaseClient, and similarly cache the
OpenAI client used inside _generate_embedding), ensure you still read env vars
once during initialization and reuse the same SupabaseClient and OpenAI
instances across calls (referencing SupabaseClient, _get_supabase_client,
OpenAI, and _generate_embedding).
- Around line 31-32: The embedding model name "text-embedding-3-large" is
hardcoded in multiple places; add a constant (e.g., EMBEDDING_MODEL =
"text-embedding-3-large") to constants.py and replace the literal occurrences in
src/processing_pipeline/stage_4/tools.py with that constant (references where
response = client.embeddings.create(model="text-embedding-3-large", input=text)
and the other two identical calls). Import the constant from constants.py into
tools.py and use it for the model parameter so all three usages share the single
source of truth.
- Line 160: Replace the hardcoded GeminiModel.GEMINI_2_5_FLASH_PREVIEW_09_2025
value used for the "created_by_model" metadata in tools.py with a model
identifier passed in or derived from the agent/context; update both the create
and update code paths that currently set "created_by_model" to use a parameter
(e.g., model_id or executor_model) or pull from the agent execution context, and
add a sensible fallback (e.g., the existing GeminiModel constant) only if no
model is provided so provenance stays accurate when the Stage 4 executor model
changes.

Comment thread requirements.txt
Comment on lines +12 to +13
google-genai==1.62.0
google-adk==1.24.1
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check for dependency conflicts and inspect httpx constraints

# Install pipdeptree and pip-audit for dependency analysis
pip install -q pipdeptree pip-audit 2>/dev/null || true

# Step 1: Try to resolve both packages together and capture any conflicts
echo "=== Attempting to resolve google-adk==1.24.1 + supabase==2.22.0 ==="
pip install --dry-run --quiet google-adk==1.24.1 supabase==2.22.0 2>&1 | head -100

echo -e "\n=== Direct dependency inspection ==="
# Use pip show to get metadata if packages are already installed
pip install -q google-adk==1.24.1 2>&1 | tail -5
python3 << 'EOFPYTHON'
import json
import subprocess
import sys

# Get dependency info using pip
packages = ['google-adk==1.24.1', 'supabase==2.22.0']
for pkg in packages:
    print(f"\n--- {pkg} dependencies ---")
    result = subprocess.run(
        ['pip', 'show', pkg],
        capture_output=True,
        text=True
    )
    if result.returncode == 0:
        for line in result.stdout.split('\n'):
            if 'Requires:' in line:
                print(line)
    else:
        print(f"Not installed yet, attempting install...")
        subprocess.run(['pip', 'install', '-q', pkg], capture_output=True)
        result = subprocess.run(['pip', 'show', pkg], capture_output=True, text=True)
        for line in result.stdout.split('\n'):
            if 'Requires:' in line:
                print(line)

# Check httpx versions available
print("\n--- httpx versions ---")
result = subprocess.run(['pip', 'index', 'versions', 'httpx'], capture_output=True, text=True)
if result.returncode == 0:
    print(result.stdout[:500])
else:
    print("pip index unavailable, checking installed httpx version")
    result = subprocess.run(['pip', 'show', 'httpx'], capture_output=True, text=True)
    print(result.stdout)
EOFPYTHON

Repository: PublicDataWorks/verdad

Length of output: 1826


🏁 Script executed:

#!/bin/bash
# Extract actual dependencies from installed packages

echo "=== google-adk==1.24.1 dependencies ==="
python3 << 'EOF'
import subprocess
import json

# Get metadata using pip
result = subprocess.run(['pip', 'show', 'google-adk'], capture_output=True, text=True)
if 'Requires:' in result.stdout:
    for line in result.stdout.split('\n'):
        if line.startswith('Requires:'):
            deps = line.replace('Requires:', '').strip().split(', ')
            for dep in deps:
                print(f"  {dep}")
EOF

echo -e "\n=== supabase==2.22.0 dependencies ==="
python3 << 'EOF'
import subprocess

result = subprocess.run(['pip', 'show', 'supabase'], capture_output=True, text=True)
if 'Requires:' in result.stdout:
    for line in result.stdout.split('\n'):
        if line.startswith('Requires:'):
            deps = line.replace('Requires:', '').strip().split(', ')
            for dep in deps:
                print(f"  {dep}")
EOF

# Use pipdeptree to show full dependency tree
echo -e "\n=== google-adk dependency tree (top-level) ==="
pipdeptree -p google-adk 2>/dev/null | head -50 || echo "pipdeptree not available"

echo -e "\n=== supabase dependency tree (top-level) ==="
pipdeptree -p supabase 2>/dev/null | head -50 || echo "pipdeptree not available"

# Check httpx specifically
echo -e "\n=== httpx version installed ==="
pip show httpx | grep -E "Name|Version|Requires"

Repository: PublicDataWorks/verdad

Length of output: 1244


🏁 Script executed:

#!/bin/bash
# Check the actual requirements.txt file context and metadata

echo "=== Full requirements.txt content ==="
cat requirements.txt

echo -e "\n=== Checking for pyproject.toml or setup.py with version constraints ==="
if [ -f pyproject.toml ]; then
  echo "--- pyproject.toml ---"
  cat pyproject.toml
fi

if [ -f setup.py ]; then
  echo "--- setup.py ---"
  head -100 setup.py
fi

# Check if there are version constraints in package metadata from PyPI
echo -e "\n=== Checking PyPI metadata for google-adk==1.24.1 ==="
python3 << 'EOF'
import subprocess
import re

# Get the METADATA from the installed package
result = subprocess.run(['pip', 'show', '-f', 'google-adk'], capture_output=True, text=True)
metadata_lines = result.stdout.split('\n')

# Look for Requires-Dist entries which show version constraints
print("Direct dependency version constraints from google-adk==1.24.1:")
for line in metadata_lines:
    if 'httpx' in line.lower():
        print(line)
EOF

Repository: PublicDataWorks/verdad

Length of output: 2152


supabase==2.22.0 is a yanked version on PyPI — use a non-yanked release.

supabase==2.22.0 is yanked with the reason: "Non fixed dependencies may cause breakage with future incompatible changes." This is a critical blocker. Pin to a non-yanked version (e.g., supabase>=2.23.0 or supabase>=3.0.0 if available).

Additionally, google-adk==1.24.1 installs all Google Cloud Platform libraries as transitive dependencies (google-cloud-bigquery, google-cloud-spanner, google-cloud-bigtable, google-cloud-pubsub, google-cloud-speech, google-cloud-storage, google-cloud-discoveryengine, google-cloud-secret-manager, google-cloud-aiplatform, etc.) even if Stage 4 uses only a subset of ADK functionality. This increases image size and attack surface unnecessarily. Verify that all these libraries are actually required.

Consider bumping google-adk to >=1.25.1 (released Feb 18, 2026) to pick up any bug fixes.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@requirements.txt` around lines 12 - 13, Replace the yanked supabase==2.22.0
entry with a non-yanked constraint (e.g., supabase>=2.23.0 or supabase>=3.0.0)
in requirements.txt to eliminate the broken release; for google-adk==1.24.1,
audit where google-adk is used (search for imports/use-sites referencing
google_adk or related ADK APIs) and either pin only the specific Google Cloud
client packages you actually need (e.g., google-cloud-storage,
google-cloud-aiplatform) instead of the umbrella google-adk, or bump google-adk
to a fixed release (google-adk>=1.25.1) if you truly need the ADK bundle,
thereby reducing transitive dependencies and image size.

Comment thread requirements.txt
openai==1.57.1
tiktoken==0.8.0
pydantic==2.10.1
pydantic==2.12.5
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for model_validator(mode='after') usages
rg -n --type py "model_validator\s*\(\s*mode\s*=\s*['\"]after['\"]" -A 3

Repository: PublicDataWorks/verdad

Length of output: 48


🏁 Script executed:

#!/bin/bash
# Search for serialize_as_any usages
rg -n --type py "serialize_as_any" -A 2 -B 2

Repository: PublicDataWorks/verdad

Length of output: 48


🏁 Script executed:

#!/bin/bash
# Find all Python files with Pydantic model definitions
rg -n --type py "from pydantic import|from pydantic_core import" | head -20

Repository: PublicDataWorks/verdad

Length of output: 314


🏁 Script executed:

#!/bin/bash
# Check current pydantic and pydantic-core versions in requirements
cat requirements.txt | grep -E "pydantic|pydantic-core"

Repository: PublicDataWorks/verdad

Length of output: 82


🏁 Script executed:

#!/bin/bash
# Read the model files to see what patterns they use
wc -l src/processing_pipeline/stage_3/models.py src/processing_pipeline/stage_4/models.py

Repository: PublicDataWorks/verdad

Length of output: 170


🏁 Script executed:

#!/bin/bash
# Check for pydantic-core specification in all config files
rg -n "pydantic-core" --type py --type toml --type txt --type yml

Repository: PublicDataWorks/verdad

Length of output: 93


🏁 Script executed:

#!/bin/bash
# Look for any validator decorators in the model files
rg -n "validator|field_validator|model_validator" --type py src/processing_pipeline/stage_3/models.py src/processing_pipeline/stage_4/models.py

Repository: PublicDataWorks/verdad

Length of output: 48


🏁 Script executed:

#!/bin/bash
# Read stage_4 models (small file)
cat -n src/processing_pipeline/stage_4/models.py

Repository: PublicDataWorks/verdad

Length of output: 1117


🏁 Script executed:

#!/bin/bash
# Read stage_3 models (larger file, read in sections)
head -100 src/processing_pipeline/stage_3/models.py | cat -n

Repository: PublicDataWorks/verdad

Length of output: 4386


🏁 Script executed:

#!/bin/bash
# Tail of stage_3 models
tail -100 src/processing_pipeline/stage_3/models.py | cat -n

Repository: PublicDataWorks/verdad

Length of output: 5505


🏁 Script executed:

#!/bin/bash
# Search for pydantic-core with corrected file types
rg "pydantic-core" --type py --type toml --type txt

Repository: PublicDataWorks/verdad

Length of output: 48


🏁 Script executed:

#!/bin/bash
# Search for setup.py or pyproject.toml
fd -e "setup.py" -o -e "pyproject.toml" -o -e "setup.cfg"

Repository: PublicDataWorks/verdad

Length of output: 182


🌐 Web query:

Pydantic 2.12 changelog breaking changes model_validator serialize_as_any

💡 Result:

Pydantic 2.12.x: breaking / behavior changes relevant to @model_validator and serialize_as_any

serialize_as_any (notably 2.12.0, then several fixes in 2.12.1+)

  • Behavior change in 2.12.0: model_dump(..., serialize_as_any=True) was changed to match the field-level SerializeAsAny[...] annotation behavior (“unified” behavior). This can expose extra subclass fields (intended) but can also surface new serialization errors where previously it “worked” by accident. Mitigation suggested by Pydantic: prefer SerializeAsAny[...] only on specific fields instead of the global serialize_as_any=True flag. [1]
  • Real-world regression reports: users hit PydanticSerializationError after upgrading to 2.12.0 when using serialize_as_any=True (e.g., with RootModel). [2]
  • Follow-up patch releases (2.12.1–2.12.4): multiple fixes landed for serialize_as_any interactions (e.g., respecting field serializers, RootModel serialization fixes, IP address JSON serialization fix). [3]

@model_validator (after validators / signatures)

  • Behavior change starting in 2.12.0: using @model_validator(mode='after') as a @classmethod began raising an error (and was not raised consistently).
  • Change in 2.12.3: this was relaxed to a deprecation warning instead of an error (“reverting a change to the supported after model validator function signatures”). [3]

Sources: Pydantic v2.12 release post (serialize_as_any unification) [1]; GitHub issue demonstrating 2.12.0 serialization errors [2]; Pydantic GitHub release notes for 2.12.1/2.12.3/2.12.4 validator + serialize_as_any-related changes [3].


🌐 Web query:

Pydantic 2.12 pydantic-core version compatibility requirements

💡 Result:

Pydantic 2.12.x requires an exact (==) matching pydantic-core version (not a range), and starting in 2.12 it will raise an explicit startup error if an incompatible pydantic-core is installed. [1]

pydantic 2.12 ↔ pydantic-core pins (PyPI metadata)

  • pydantic==2.12.0pydantic-core==2.41.1 [2]
  • pydantic==2.12.1 (yanked)pydantic-core==2.41.3 [3]
  • pydantic==2.12.2pydantic-core==2.41.4 [4]
  • pydantic==2.12.3pydantic-core==2.41.4 [5]
  • pydantic==2.12.4pydantic-core==2.41.5 [6]
  • pydantic==2.12.5pydantic-core==2.41.5 [7]

Practical rule: install/upgrade pydantic and pydantic-core together (or just pip install -U pydantic, which will pull the pinned core). [2]

References: [1] [2] [3] [4] [5] [6] [7]


Pin pydantic-core==2.41.5 in requirements.txt to match Pydantic 2.12.5.

The upgrade to pydantic==2.12.5 requires pydantic-core==2.41.5. While Pydantic will auto-resolve the dependency, explicitly pinning the matching core version ensures consistent versions across all environments (CI, Docker, local development). Pydantic 2.12 raises an explicit startup error if an incompatible pydantic-core is present, so this prevents potential deployment surprises.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@requirements.txt` at line 23, Update requirements.txt to explicitly pin
pydantic-core to the matching version for pydantic 2.12.5 by adding a line for
pydantic-core==2.41.5 (so the file contains both pydantic==2.12.5 and
pydantic-core==2.41.5); this ensures deterministic installs across
CI/Docker/local and prevents the Pydantic startup error from an incompatible
core version.

Comment on lines +150 to +212
if duplicates:
# Update existing entry (create new version)
existing_id = duplicates[0]["id"]

new_entry_data = {
"fact": fact,
"confidence_score": confidence_score,
"disinformation_categories": category_list,
"keywords": keyword_list,
"is_time_sensitive": is_time_sensitive,
"created_by_model": GeminiModel.GEMINI_2_5_FLASH_PREVIEW_09_2025.value,
}
if related_claim:
new_entry_data["related_claim"] = related_claim
if valid_from:
new_entry_data["valid_from"] = valid_from
if valid_until:
new_entry_data["valid_until"] = valid_until
if snippet_id:
new_entry_data["created_by_snippet"] = snippet_id

entry = supabase_client.supersede_kb_entry(existing_id, new_entry_data)
action = "updated"
else:
# Create new entry
entry = supabase_client.insert_kb_entry(
fact=fact,
confidence_score=confidence_score,
disinformation_categories=category_list,
keywords=keyword_list,
related_claim=related_claim,
is_time_sensitive=is_time_sensitive,
valid_from=valid_from,
valid_until=valid_until,
created_by_snippet=snippet_id,
created_by_model=GeminiModel.GEMINI_2_5_FLASH_PREVIEW_09_2025.value,
)
action = "created"

# Add source (required for all entries)
supabase_client.insert_kb_entry_source(
kb_entry_id=entry["id"],
url=source_url,
source_name=source_name,
source_type=source_type,
title=source_title,
relevant_excerpt=source_excerpt,
)

# Generate and store embedding
try:
encoding = encoding_for_model("text-embedding-3-large")
token_count = len(encoding.encode(document))
except Exception:
token_count = None

supabase_client.upsert_kb_entry_embedding(
kb_entry_id=entry["id"],
embedded_document=document,
document_token_count=token_count,
embedding=embedding,
model_name="text-embedding-3-large",
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Multi-step upsert is not atomic — partial failures leave inconsistent KB state.

upsert_knowledge_entry performs a sequence of independent DB writes: insert/supersede entry → insert source → upsert embedding. If any intermediate step fails (e.g., insert_kb_entry_source raises), the entry exists without its required source or embedding.

Consider wrapping the Supabase operations in a transaction, or at minimum, adding a compensating cleanup in a try/except to delete the orphaned entry on failure.

🧰 Tools
🪛 Ruff (0.15.1)

[warning] 203-203: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/processing_pipeline/stage_4/tools.py` around lines 150 - 212, The current
multi-step write flow using supabase_client.supersede_kb_entry /
supabase_client.insert_kb_entry → supabase_client.insert_kb_entry_source →
supabase_client.upsert_kb_entry_embedding can leave orphaned entries on partial
failure; modify upsert_knowledge_entry to perform these operations inside a
single DB transaction if Supabase client supports it, or at minimum wrap the
sequence in a try/except that on any exception calls
supabase_client.delete_kb_entry(entry["id"]) (and/or supersede rollback) to
remove the partially-created entry, ensuring insert_kb_entry_source and
upsert_kb_entry_embedding are only attempted after a successful entry write and
that any failure triggers cleanup of entry id created by insert_kb_entry or
supersede_kb_entry.

@quancao-ea quancao-ea merged commit db4c723 into main Feb 23, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant