From 8ae07f83809c35f5151c504297104234ec53b784 Mon Sep 17 00:00:00 2001 From: Quan Cao Date: Fri, 6 Feb 2026 12:12:25 +0700 Subject: [PATCH 01/13] Refactor Stage 3 prompts: add verification fields and web search tools --- prompts/Stage_3_analysis_prompt.md | 250 +++++++++++++++++++++- prompts/Stage_3_output_schema.json | 106 ++++++++- prompts/Stage_3_system_instruction.md | 25 ++- src/processing_pipeline/stage_3_models.py | 43 ++++ 4 files changed, 411 insertions(+), 13 deletions(-) diff --git a/prompts/Stage_3_analysis_prompt.md b/prompts/Stage_3_analysis_prompt.md index 6c29228..4c71c97 100644 --- a/prompts/Stage_3_analysis_prompt.md +++ b/prompts/Stage_3_analysis_prompt.md @@ -77,11 +77,103 @@ Perform the following steps: - Verify that the transcription matches the audio precisely. - Confirm that the translation accurately reflects the transcription. -- **Ensure Factual Accuracy:** - - **Search**: Use the web search tool to fact-check claims, ensuring search queries match the audio's recording date, time, and location for contextual accuracy. - - **Fetch**: For important claims, use the content fetch tool to read full articles from authoritative sources (news outlets, fact-checkers, official statements). Do not rely solely on search snippets for high-stakes determinations. - - **Verify**: Look for multiple sources to corroborate or contradict claims. Avoid using information from different time periods (e.g., if the audio is from 2025, don't reference data from 2000). - - **Apply**: Incorporate your findings into the analysis that follows. +- **Ensure Factual Accuracy Using Web Search:** + + **Preferred Tools:** + - **`searxng_web_search`** - Use this to search for relevant URLs from reliable sources. + - **`web_url_read`** - Use this to read full article content and extract exact quotes. + + **Two-Step Verification Process:** + + **Step 1: Search for Relevant Sources** + Use `searxng_web_search` (or other available search tools) to find URLs: + - Search for the specific claim (e.g., "Maduro captured January 2026") + - Search for related context (e.g., "US Venezuela military operations 2026") + - Search fact-checker sites (e.g., "site:snopes.com [topic]" or "site:reuters.com [topic]") + - Ensure search queries match the audio's recording date and location for contextual accuracy. + + **Step 2: Read Full Content from URLs** + For promising URLs found, use `web_url_read` to: + - Read the full article content (not just search snippets) + - Extract EXACT QUOTES (not paraphrases) as `relevant_excerpt` + - Note the publication date + - Classify the source tier (tier1_wire_service, tier1_factchecker, tier2_major_news, etc.) + + **CRITICAL:** You must document actual search results in `verification_evidence`. Do not invent or imagine what sources say. + + **Example Workflow:** + ``` + 1. searxng_web_search("Maduro captured US forces 2026") + -> Found: https://reuters.com/article/..., https://apnews.com/... + + 2. web_url_read("https://reuters.com/article/...") + -> Extract: "Reuters reports that as of [date], Venezuelan President Nicolás Maduro remains in power..." + + 3. Document in verification_evidence: + - url: "https://reuters.com/article/..." + - source_name: "Reuters" + - source_type: "tier1_wire_service" + - relevant_excerpt: "[exact quote from article]" + - relevance_to_claim: "contradicts_claim" + ``` + + **Without reliable sources contradicting the claim, maximum confidence score is 40%.** + +##### **C.1 Verification Evidence Documentation (MANDATORY)** + +**CRITICAL: All verification activities must be fully documented in the `verification_evidence` field of your output.** + +**Search and Documentation Protocol:** + +For EVERY factual claim that could be verified or disproven, you MUST: + +1. **Execute a Search**: Use the web search tool with a specific, well-formed query +2. **Record the Query**: Document the exact search query used +3. **Document ALL Results**: For each search result, record: + - **URL**: The complete URL of the source (REQUIRED) + - **Source Name**: The publication name (e.g., "Reuters", "Associated Press", "BBC News") + - **Source Type Classification**: + - `tier1_wire_service`: AP, Reuters, AFP, EFE, UPI + - `tier1_factchecker`: Snopes, PolitiFact, FactCheck.org, Full Fact, AFP Fact Check, Chequeado, Cotejo.info + - `tier2_major_news`: CNN, BBC, NPR, NYT, Washington Post, The Guardian, BBC Mundo, DW + - `tier3_regional_news`: Local newspapers, regional TV stations, El Nacional, Efecto Cocuyo + - `official_source`: Government websites (.gov), official institutional sites + - `other`: All other sources + - **Publication Date**: When the article was published (critical for time-sensitive claims) + - **Title**: The headline or title of the source + - **Relevant Excerpt**: A DIRECT QUOTE (50-200 words) from the source that relates to the claim. Do NOT paraphrase - copy the exact text. + - **Relevance Assessment**: How this result relates to the claim: + - `supports_claim`: Evidence that the claim is accurate + - `contradicts_claim`: Evidence that the claim is false or misleading + - `provides_context`: Relevant background but doesn't directly verify/contradict + - `inconclusive`: Cannot determine relationship to claim + +4. **Categorize Search Outcome**: + - `results_found`: Search returned relevant, actionable results + - `no_results`: Search returned no relevant results (document this - absence of evidence is important) + - `results_inconclusive`: Results exist but don't clearly address the claim + +**Source Priority Guidelines:** + +When multiple sources are available, PRIORITIZE in this order: +1. **Wire Services** (Reuters, AP, AFP, EFE) - Most reliable for breaking news and facts +2. **Official Fact-Checkers** (Snopes, PolitiFact, FactCheck.org, Chequeado, Cotejo.info) - Best for disputed claims +3. **Major News Outlets** (BBC, NPR, NYT, Washington Post, BBC Mundo, DW) - Good for context and analysis +4. **Official Sources** (government sites, official statements) - Authoritative for policy/data +5. **Regional/Local News** (Efecto Cocuyo, El Pitazo, Tal Cual for Venezuela) - Valuable for local events + +**State-Sponsored Media WARNING:** +The following are state-sponsored propaganda outlets and should NEVER be used as reliable sources: +- **Russian**: RT, Sputnik, Sputnik Mundo, Radio Sputnik, TASS, RIA Novosti +- **Venezuelan State**: TeleSUR, VTV (Venezolana de Television) +- **Other**: Xinhua, CGTN, PressTV, Granma, Prensa Latina + +If the audio clip ORIGINATES from one of these sources, note this in your analysis as context for understanding potential propaganda motives. + +**IMPORTANT: "No results found" does NOT mean the claim is false.** +- No results = uncertainty = low confidence score (0-40) +- Results contradict claim = potential disinformation = higher confidence score (40-100 depending on source quality) +- Results support claim = not disinformation = confidence score 0 ##### **D. Summary and Explanation** @@ -135,7 +227,7 @@ The confidence score represents your degree of certainty that the content contai **Verification Requirement:** -Before assigning confidence scores, verify all factual claims using the available web search tool. For claims that significantly impact the confidence score, fetch and read the full content of authoritative sources rather than relying on search snippets alone. The verification outcome determines the maximum possible score. +Before assigning confidence scores, verify all factual claims using web search. The verification outcome determines the maximum possible score. **Scoring Framework Based on Verification:** @@ -173,6 +265,42 @@ Example: "We need stricter immigration policies" No search results does NOT mean the claim is false. Unusual or surprising claims can be true. Recent events may have limited coverage. When you cannot find contradictory information, score conservatively (0-40), not as disinformation (80-100). +##### **H.1 Breaking News and Recent Events Protocol** + +**CRITICAL: Claims about very recent events require special handling.** + +Before assigning any confidence score above 30%, you MUST evaluate whether the claim qualifies as potentially unverifiable breaking news. + +**Step 1: Calculate Event Recency** + +Compare the following two timestamps: +1. **Recording Timestamp**: The `recorded_at` field from the audio metadata +2. **Current Timestamp**: The current date and time provided in the prompt + +Calculate the time difference. If the recording was made within the past **72 hours**, the content may reference breaking news that has not yet been indexed. + +**Step 2: Identify Time-Sensitive Claims** + +A claim is considered "time-sensitive" if it: +- Reports a specific event allegedly occurring within the past 72 hours +- Describes actions by named individuals or organizations as currently happening or just completed +- Claims something has "just happened," is "breaking," or uses similar urgent language +- References events that, if true, would be major news (arrests, deaths, military actions, political developments) + +**Step 3: Apply Breaking News Confidence Caps** + +Based on your verification results, apply the appropriate maximum confidence score: + +| Verification Outcome | Maximum Score | Verification Status | +|---------------------|---------------|---------------------| +| Contradictory evidence found (sources confirm the opposite) | 80-100 | `VERIFIED_FALSE` | +| Partial information found (some details confirmed false) | 40-79 | `PARTIALLY_VERIFIABLE` | +| No relevant results for claims within 24 hours of recording | **MAX 20%** | `UNVERIFIABLE_BREAKING` | +| No relevant results for claims 24-72 hours old | **MAX 30%** | `UNVERIFIABLE_RECENT` | +| No relevant results for claims older than 72 hours | 1-40 | `UNVERIFIABLE_STALE` | + +**THE GOLDEN RULE: For claims less than 72 hours old where no contradictory evidence is found, the MAXIMUM confidence score is 30%, regardless of how extraordinary the claim appears.** + ##### **I. Required Self-Review Process** After completing your initial analysis, perform this structured review: @@ -210,6 +338,21 @@ After completing your initial analysis, perform this structured review: - Scoring based on disagreement rather than falsity - Treating "no search results" as evidence of falsity (it is uncertainty) +5. **Breaking News Verification Checklist** + Before finalizing any score above 30%, answer these questions: + - [ ] Have I calculated the time delta between recording and current time? + - [ ] Is this claim within the 72-hour breaking news window? + - [ ] If within the breaking news window, did I find CONTRADICTORY evidence (not just absence of evidence)? + - [ ] If no contradictory evidence found for a recent claim, is my score capped at 30% or lower? + - [ ] Have I included the required Breaking News Protocol note if applicable? + - [ ] Have I documented ALL my search queries and results in `verification_evidence`? + +6. **Evidence Documentation Check** + - [ ] Did I record URLs for all search results? + - [ ] Did I include direct excerpts (not paraphrases) from sources? + - [ ] Did I classify each source by tier (tier1_wire_service, tier1_factchecker, etc.)? + - [ ] For scores 60+, did I use `web_url_read` to read full article content? + ##### **J. Emotional Tone Analysis** The emotional tone analysis identifies and measures emotions expressed in the content. Like our confidence scoring, this requires evidence-based assessment: @@ -403,6 +546,42 @@ Document your analytical reasoning process in the `thought_summaries` field. Thi - Include specific examples from the content that informed your analysis - Document any score adjustments and why they were made +##### **M. Verification Evidence** + +Your output MUST include this critical field for transparency and accountability: **`verification_evidence`** + +Document ALL web searches performed during fact-checking: +```json +{ + "verification_evidence": { + "searches_performed": [ + { + "query": "exact search query used", + "search_intent": "what claim this search verifies", + "result_status": "results_found | no_results | results_inconclusive", + "results": [ + { + "url": "https://example.com/article", + "source_name": "Reuters", + "source_type": "tier1_wire_service", + "publication_date": "2026-01-15", + "title": "Article headline", + "relevant_excerpt": "Excerpt from search result...", + "relevance_to_claim": "contradicts_claim" + } + ] + } + ], + "verification_summary": { + "total_searches": 3, + "claims_contradicted": 1, + "claims_unverifiable": 1, + "key_findings": "Summary of what verification revealed..." + } + } +} +``` + #### **3. Assemble Structured Output** Organize all the information into a structured output conforming to the provided OpenAPI JSON schema. @@ -429,7 +608,8 @@ Ensure your output strictly adheres to this schema. "confidence_scores", "emotional_tone", "political_leaning", - "thought_summaries" + "thought_summaries", + "verification_evidence" ], "properties": { "transcription": { @@ -560,12 +740,17 @@ Ensure your output strictly adheres to this schema. }, "confidence_scores": { "type": "object", - "required": ["overall", "analysis", "categories"], + "required": ["overall", "verification_status", "analysis", "categories"], "properties": { "overall": { "type": "integer", "description": "Overall confidence score of the analysis, ranging from 0 to 100." }, + "verification_status": { + "type": "string", + "enum": ["verified_false", "verified_true", "uncertain", "insufficient_evidence"], + "description": "Overall verification status based on evidence quality." + }, "analysis": { "type": "object", "required": ["claims", "validation_checklist", "score_adjustments"], @@ -760,6 +945,52 @@ Ensure your output strictly adheres to this schema. "thought_summaries": { "type": "string", "description": "A summary of your reasoning process, key observations, and analytical steps taken during the analysis." + }, + "verification_evidence": { + "type": "object", + "required": ["searches_performed", "verification_summary"], + "description": "Complete documentation of all web searches performed during fact-checking.", + "properties": { + "searches_performed": { + "type": "array", + "items": { + "type": "object", + "required": ["query", "search_intent", "result_status", "results"], + "properties": { + "query": { "type": "string" }, + "search_intent": { "type": "string" }, + "result_status": { "type": "string", "enum": ["results_found", "no_results", "results_inconclusive"] }, + "results": { + "type": "array", + "items": { + "type": "object", + "required": ["url", "source_name", "source_type", "relevant_excerpt", "relevance_to_claim"], + "properties": { + "url": { "type": "string" }, + "source_name": { "type": "string" }, + "source_type": { "type": "string", "enum": ["tier1_wire_service", "tier1_factchecker", "tier2_major_news", "tier3_regional_news", "official_source", "other"] }, + "publication_date": { "type": "string" }, + "title": { "type": "string" }, + "relevant_excerpt": { "type": "string" }, + "relevance_to_claim": { "type": "string", "enum": ["supports_claim", "contradicts_claim", "provides_context", "inconclusive"] }, + "content_fetched": { "type": "boolean" } + } + } + } + } + } + }, + "verification_summary": { + "type": "object", + "required": ["total_searches", "claims_contradicted", "claims_unverifiable", "key_findings"], + "properties": { + "total_searches": { "type": "integer" }, + "claims_contradicted": { "type": "integer" }, + "claims_unverifiable": { "type": "integer" }, + "key_findings": { "type": "string" } + } + } + } } } } @@ -1563,6 +1794,7 @@ Below is a complete example showing all required fields: }, "confidence_scores": { "overall": 92, + "verification_status": "verified_false", "analysis": { "claims": [ { @@ -1758,7 +1990,7 @@ By following these instructions and listening closely using the detailed heurist Please proceed to analyze the provided audio content following these guidelines: 1. Listen carefully to capture all spoken content. -2. Verify all factual claims using the web search tool before assigning scores (Section H), searching for information relevant to the recording datetime and/or current datetime. For important claims, use the content fetch tool to read full articles. +2. Verify all factual claims using web search before assigning scores (Section H), searching for information relevant to the recording datetime and/or current datetime. Use `web_url_read` to read full articles for important claims. 3. Apply the detailed heuristics for disinformation analysis. 4. Base political orientation assessment solely on observable content elements. 5. Document all findings with specific evidence from the content. diff --git a/prompts/Stage_3_output_schema.json b/prompts/Stage_3_output_schema.json index 619ce6a..1178bb6 100644 --- a/prompts/Stage_3_output_schema.json +++ b/prompts/Stage_3_output_schema.json @@ -13,7 +13,8 @@ "confidence_scores", "emotional_tone", "political_leaning", - "thought_summaries" + "thought_summaries", + "verification_evidence" ], "properties": { "is_convertible": { @@ -148,12 +149,17 @@ }, "confidence_scores": { "type": "object", - "required": ["overall", "analysis", "categories"], + "required": ["overall", "verification_status", "analysis", "categories"], "properties": { "overall": { "type": "integer", "description": "Overall confidence score of the analysis, ranging from 0 to 100." }, + "verification_status": { + "type": "string", + "enum": ["verified_false", "verified_true", "uncertain", "insufficient_evidence"], + "description": "Overall verification status based on evidence quality. 'verified_false' requires contradictory evidence with URLs. 'insufficient_evidence' when no search results found." + }, "analysis": { "type": "object", "required": ["claims", "validation_checklist", "score_adjustments"], @@ -348,6 +354,102 @@ "thought_summaries": { "type": "string", "description": "A summary of your reasoning process, key observations, and analytical steps taken during the analysis." + }, + "verification_evidence": { + "type": "object", + "required": ["searches_performed", "verification_summary"], + "description": "Complete documentation of all web searches performed during fact-checking.", + "properties": { + "searches_performed": { + "type": "array", + "description": "Record of all web searches performed during fact-checking.", + "items": { + "type": "object", + "required": ["query", "search_intent", "result_status", "results"], + "properties": { + "query": { + "type": "string", + "description": "The exact search query used." + }, + "search_intent": { + "type": "string", + "description": "What claim or fact this search was attempting to verify." + }, + "result_status": { + "type": "string", + "enum": ["results_found", "no_results", "results_inconclusive"], + "description": "Whether the search returned actionable results." + }, + "results": { + "type": "array", + "description": "Individual search results with full details.", + "items": { + "type": "object", + "required": ["url", "source_name", "source_type", "relevant_excerpt", "relevance_to_claim"], + "properties": { + "url": { + "type": "string", + "description": "Full URL of the source." + }, + "source_name": { + "type": "string", + "description": "Name of the publication or website (e.g., Reuters, AP News, BBC)." + }, + "source_type": { + "type": "string", + "enum": ["tier1_wire_service", "tier1_factchecker", "tier2_major_news", "tier3_regional_news", "official_source", "other"], + "description": "Classification of source reliability tier." + }, + "publication_date": { + "type": "string", + "description": "Publication date in ISO 8601 format (YYYY-MM-DD) or 'unknown' if not available." + }, + "title": { + "type": "string", + "description": "Title or headline of the article/page." + }, + "relevant_excerpt": { + "type": "string", + "description": "Direct quote from the source relevant to the claim (50-200 words). Do NOT paraphrase." + }, + "relevance_to_claim": { + "type": "string", + "enum": ["supports_claim", "contradicts_claim", "provides_context", "inconclusive"], + "description": "How this result relates to the claim being verified." + }, + "content_fetched": { + "type": "boolean", + "description": "Whether the full article content was fetched." + } + } + } + } + } + } + }, + "verification_summary": { + "type": "object", + "required": ["total_searches", "claims_contradicted", "claims_unverifiable", "key_findings"], + "properties": { + "total_searches": { + "type": "integer", + "description": "Total number of web searches performed." + }, + "claims_contradicted": { + "type": "integer", + "description": "Number of claims found to be false or misleading based on search evidence." + }, + "claims_unverifiable": { + "type": "integer", + "description": "Number of claims that could not be verified due to lack of information." + }, + "key_findings": { + "type": "string", + "description": "Narrative summary of the most important verification findings." + } + } + } + } } } } \ No newline at end of file diff --git a/prompts/Stage_3_system_instruction.md b/prompts/Stage_3_system_instruction.md index 7c4b9ff..a01e5fa 100644 --- a/prompts/Stage_3_system_instruction.md +++ b/prompts/Stage_3_system_instruction.md @@ -4,8 +4,29 @@ You are an advanced language model specialized in in-depth disinformation and po **General Guidelines:** - **Accuracy:** Ensure precise transcription and translation, preserving the original meaning and cultural nuances. - **Cultural Sensitivity:** Be mindful of cultural contexts, idiomatic expressions, and dialects specific to Spanish and Arabic-speaking immigrant communities especially. -- **Evidence-Based Analysis:** Verify all factual claims using Grounding with Google Search before scoring. High scores (80-100) require strong contradictory evidence from reputable sources, not absence of confirmation. Base all conclusions on demonstrably false claims that can be definitively proven incorrect with cited sources. +- **Evidence-Based Analysis:** Verify all factual claims using web search before scoring. High scores (80-100) require strong contradictory evidence from reputable sources, not absence of confirmation. Base all conclusions on demonstrably false claims that can be definitively proven incorrect with cited sources. - **Searching Guidelines:** When verifying factual claims, prioritize information that matches or is recent to the recording date of the provided audio file. Do not rely on old or unrelated information that predates the recording. If the recording references recent events or claims about current status, search for the most up-to-date information available. - **Objectivity:** Maintain strict neutrality by focusing solely on verifiable facts rather than assumptions or inferences. Distinguish between controversial content and demonstrably false claims. - **Self-Review:** Systematically validate all assessments through structured self-review, adjusting scores when specific evidence cannot be cited. -- **Structured Output:** All output must strictly conform to the provided JSON schema. \ No newline at end of file +- **Structured Output:** All output must strictly conform to the provided JSON schema. + +**Verification Documentation (MANDATORY):** +- **Document All Searches:** ALWAYS record every web search in the `verification_evidence` field, including searches that return no results. Documenting the absence of evidence is critical for proper scoring. +- **Record Search Results:** For each search, document: query used, URLs found, source names, publication dates, and relevant excerpts (direct quotes, not paraphrases). +- **Source Classification:** Classify each source by tier: tier1_wire_service (Reuters, AP, AFP), tier1_factchecker (Snopes, PolitiFact), tier2_major_news (BBC, CNN, NYT), tier3_regional_news, official_source, or other. + +**Preferred Verification Tools:** +When searching for information to verify claims, prefer using these tools: +- **`searxng_web_search`** - For searching multiple sources and finding relevant URLs. +- **`web_url_read`** - For reading full article content and extracting exact quotes from URLs found. + +These tools provide better access to reliable sources. Use them when available, but other search tools may also be used if needed. + +**Conservative Confidence Scoring:** +- **Evidence-Based Scoring:** High confidence scores (60+) REQUIRE tier-1 source URLs and direct excerpts that contradict the claim. Without documented contradictory evidence, maximum score is 40. +- **Absence ≠ Falsity:** No search results means UNCERTAINTY, not disinformation. Score as `insufficient_evidence`, not `verified_false`. +- **Breaking News Awareness:** Claims within 72 hours of recording require special handling. If no contradictory evidence is found, maximum score is 30% (20% for claims within 24 hours). +- **Verification Status Required:** Every analysis must include a `verification_status`: `verified_false` (evidence contradicts), `verified_true` (evidence confirms), `uncertain` (mixed/recent events), or `insufficient_evidence` (no relevant results). + +**Reliable Sources Requirement:** +You MUST use reliable, prestigious sources to verify claims. Without reliable sources explicitly contradicting a claim, you CANNOT assign high confidence that it is disinformation. \ No newline at end of file diff --git a/src/processing_pipeline/stage_3_models.py b/src/processing_pipeline/stage_3_models.py index 5790f30..593ceb2 100644 --- a/src/processing_pipeline/stage_3_models.py +++ b/src/processing_pipeline/stage_3_models.py @@ -71,6 +71,9 @@ class CategoryScore(BaseModel): class ConfidenceScores(BaseModel): overall: int = Field(ge=0, le=100, description="Overall confidence score of the analysis, ranging from 0 to 100") + verification_status: Literal["verified_false", "verified_true", "uncertain", "insufficient_evidence"] = Field( + description="Overall verification status based on evidence quality" + ) analysis: Analysis categories: list[CategoryScore] @@ -131,6 +134,43 @@ class PoliticalLeaning(BaseModel): explanation: PoliticalExplanation +class SearchResult(BaseModel): + url: str = Field(description="Full URL of the source") + source_name: str = Field(description="Name of the publication or website (e.g., Reuters, AP News, BBC)") + source_type: Literal[ + "tier1_wire_service", "tier1_factchecker", "tier2_major_news", + "tier3_regional_news", "official_source", "other" + ] = Field(description="Classification of source reliability tier") + publication_date: str | None = Field(default=None, description="Publication date (YYYY-MM-DD) or None if unknown") + title: str | None = Field(default=None, description="Title or headline of the article/page") + relevant_excerpt: str = Field(description="Direct quote from the source relevant to the claim (50-200 words)") + relevance_to_claim: Literal["supports_claim", "contradicts_claim", "provides_context", "inconclusive"] = Field( + description="How this result relates to the claim being verified" + ) + content_fetched: bool = Field(default=False, description="Whether the full article content was fetched") + + +class SearchPerformed(BaseModel): + query: str = Field(description="The exact search query used") + search_intent: str = Field(description="What claim or fact this search was attempting to verify") + result_status: Literal["results_found", "no_results", "results_inconclusive"] = Field( + description="Whether the search returned actionable results" + ) + results: list[SearchResult] = Field(default_factory=list, description="Individual search results") + + +class VerificationSummary(BaseModel): + total_searches: int = Field(description="Total number of web searches performed") + claims_contradicted: int = Field(description="Number of claims found to be false based on search evidence") + claims_unverifiable: int = Field(description="Number of claims that could not be verified") + key_findings: str = Field(description="Narrative summary of the most important verification findings") + + +class VerificationEvidence(BaseModel): + searches_performed: list[SearchPerformed] = Field(description="Record of all web searches") + verification_summary: VerificationSummary = Field(description="Summary of verification activities") + + class Stage3Output(BaseModel): """Main model for Stage 3 output.""" @@ -153,3 +193,6 @@ class Stage3Output(BaseModel): thought_summaries: str = Field( description="A summary of your reasoning process, key observations, and analytical steps taken during the analysis" ) + verification_evidence: VerificationEvidence = Field( + description="Complete documentation of all web searches performed during fact-checking" + ) From efe3c08acffd7bf836eeb00d7d30f9dbe5b8cd0a Mon Sep 17 00:00:00 2001 From: Quan Cao Date: Fri, 6 Feb 2026 15:42:39 +0700 Subject: [PATCH 02/13] Fix MCP connection issue for Gemini CLI --- .gemini/settings.json | 3 ++- src/processing_pipeline/stage_3.py | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.gemini/settings.json b/.gemini/settings.json index 5d54c5e..63f3450 100644 --- a/.gemini/settings.json +++ b/.gemini/settings.json @@ -15,7 +15,8 @@ "SEARXNG_URL": "$SEARXNG_URL" }, "includeTools": ["searxng_web_search", "web_url_read"], - "trust": true + "trust": true, + "timeout": 60000 } } } diff --git a/src/processing_pipeline/stage_3.py b/src/processing_pipeline/stage_3.py index f15fc32..4d5f6d0 100644 --- a/src/processing_pipeline/stage_3.py +++ b/src/processing_pipeline/stage_3.py @@ -476,6 +476,7 @@ def __analyze_with_custom_search( "HOME": os.environ.get("HOME", ""), "GEMINI_API_KEY": os.environ["GOOGLE_GEMINI_KEY"], "GEMINI_SYSTEM_MD": system_instruction_path, + "SEARXNG_URL": os.environ.get("SEARXNG_URL", ""), } cmd = [ From 687c8dec95d9eed1b913cf287387d7dd93ab3577 Mon Sep 17 00:00:00 2001 From: Quan Cao Date: Sun, 8 Feb 2026 16:11:04 +0700 Subject: [PATCH 03/13] Enhance Stage 3 prompts with post-cutoff event handling Add guidance for recognizing and correctly handling events that occurred after the model's training cutoff date, preventing false positives when web search results conflict with pre-training knowledge. --- prompts/Stage_3_analysis_prompt.md | 44 +++++++++++++++++++++------ prompts/Stage_3_system_instruction.md | 6 +++- 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/prompts/Stage_3_analysis_prompt.md b/prompts/Stage_3_analysis_prompt.md index 4c71c97..e7826b2 100644 --- a/prompts/Stage_3_analysis_prompt.md +++ b/prompts/Stage_3_analysis_prompt.md @@ -162,18 +162,41 @@ When multiple sources are available, PRIORITIZE in this order: 4. **Official Sources** (government sites, official statements) - Authoritative for policy/data 5. **Regional/Local News** (Efecto Cocuyo, El Pitazo, Tal Cual for Venezuela) - Valuable for local events -**State-Sponsored Media WARNING:** -The following are state-sponsored propaganda outlets and should NEVER be used as reliable sources: -- **Russian**: RT, Sputnik, Sputnik Mundo, Radio Sputnik, TASS, RIA Novosti -- **Venezuelan State**: TeleSUR, VTV (Venezolana de Television) -- **Other**: Xinhua, CGTN, PressTV, Granma, Prensa Latina - -If the audio clip ORIGINATES from one of these sources, note this in your analysis as context for understanding potential propaganda motives. - -**IMPORTANT: "No results found" does NOT mean the claim is false.** +**IMPORTANT: Interpreting search results correctly is essential.** - No results = uncertainty = low confidence score (0-40) - Results contradict claim = potential disinformation = higher confidence score (40-100 depending on source quality) - Results support claim = not disinformation = confidence score 0 +- **Results from tier-1/tier-2 sources that support the claim MUST be trusted.** You MUST NOT dismiss search results from Reuters, AP, BBC, CNN, etc. as "fabricated" or "poisoned." If these sources confirm a claim, the claim is true -- even if it conflicts with your pre-training knowledge. Your training data may be outdated. + +##### **C.2 Knowledge Cutoff and Post-Training Events (CRITICAL)** + +**Your training data has a knowledge cutoff.** Events that occurred after your cutoff will feel unfamiliar, implausible, or "wrong" to you. This does NOT mean they did not happen. You MUST follow this protocol: + +**When web search results conflict with your pre-training knowledge:** + +1. **Trust the web search results over your own knowledge.** Your knowledge is static; the web is current. If Reuters, AP, BBC, CNN, or other tier-1/tier-2 sources report an event, that event happened -- even if you have no memory of it and it feels implausible. + +2. **NEVER conclude that search results from independent sources are "fabricated," "poisoned," "planted," or part of a disinformation campaign.** The editorial standards and independence of tier-1 and tier-2 sources make coordinated fabrication across multiple outlets effectively impossible. If you find yourself thinking "these search results must be fake," STOP -- you are likely encountering a post-cutoff event that is real but unfamiliar to you. + +3. **Recognize the pattern of a post-cutoff event:** + - Your instinct says "this never happened" + - But multiple independent tier-1/tier-2 sources confirm it did happen + - The sources include specific details (dates, names, locations) consistent with real reporting + - **Conclusion: The event is real. Your training data is outdated. Adjust your analysis accordingly.** + +4. **Apply this rule BEFORE scoring:** If web search confirms the factual claims in the content, the content is reporting real events. The confidence score for factual accuracy must be 0, even if the source is biased or the framing is propagandistic. + +**Example of correct reasoning:** +- Audio from Sputnik claims "President X was captured by US forces" +- Web search returns: Reuters, AP, BBC, CNN all confirm President X was captured +- Your training data says President X is still in power +- **Correct conclusion:** The capture is a real post-cutoff event. The audio's factual claims are TRUE. Score the factual claims at 0. Analyze SEPARATELY for bias, framing, or propaganda techniques in how the events are presented. + +**Example of INCORRECT reasoning (DO NOT DO THIS):** +- Same scenario as above +- "The search results must be fabricated because this contradicts known reality" +- "This is a sophisticated disinformation campaign that has poisoned search indexes" +- **This is WRONG.** You are substituting your outdated training knowledge for current web evidence. ##### **D. Summary and Explanation** @@ -329,6 +352,7 @@ After completing your initial analysis, perform this structured review: - Adjust scores to match available evidence - Document reasoning for any score changes - Ensure final scores reflect only demonstrably false content + - **MANDATORY CHECK:** If you scored above 0 but web search results from tier-1/tier-2 sources SUPPORT the claims, you MUST reduce the score to 0. Do NOT dismiss web search evidence by hypothesizing that results are "fabricated" or "poisoned." If multiple independent sources confirm a claim, the claim is true. 4. **Common Error Check** Review for these frequent mistakes: @@ -337,6 +361,8 @@ After completing your initial analysis, perform this structured review: - Treating bias as equivalent to disinformation - Scoring based on disagreement rather than falsity - Treating "no search results" as evidence of falsity (it is uncertainty) + - **Dismissing web search results that conflict with your pre-training knowledge** (post-cutoff events are real even if unfamiliar to you) + - **Conflating source reputation with factual accuracy** (propaganda outlets can report real events with biased framing -- verify facts independently, do not assume content is fabricated because the source is biased) 5. **Breaking News Verification Checklist** Before finalizing any score above 30%, answer these questions: diff --git a/prompts/Stage_3_system_instruction.md b/prompts/Stage_3_system_instruction.md index a01e5fa..2b4c065 100644 --- a/prompts/Stage_3_system_instruction.md +++ b/prompts/Stage_3_system_instruction.md @@ -4,8 +4,9 @@ You are an advanced language model specialized in in-depth disinformation and po **General Guidelines:** - **Accuracy:** Ensure precise transcription and translation, preserving the original meaning and cultural nuances. - **Cultural Sensitivity:** Be mindful of cultural contexts, idiomatic expressions, and dialects specific to Spanish and Arabic-speaking immigrant communities especially. -- **Evidence-Based Analysis:** Verify all factual claims using web search before scoring. High scores (80-100) require strong contradictory evidence from reputable sources, not absence of confirmation. Base all conclusions on demonstrably false claims that can be definitively proven incorrect with cited sources. +- **Evidence-Based Analysis:** Verify all factual claims using web search before scoring. High scores (80-100) require strong contradictory evidence from reputable sources, not absence of confirmation. Base all conclusions on demonstrably false claims that can be definitively proven incorrect with cited sources. When web search results from multiple independent tier-1/tier-2 sources confirm a claim, you MUST accept those results as reflecting reality and assign a confidence score of 0 for that claim, regardless of whether the claim conflicts with your pre-training knowledge. - **Searching Guidelines:** When verifying factual claims, prioritize information that matches or is recent to the recording date of the provided audio file. Do not rely on old or unrelated information that predates the recording. If the recording references recent events or claims about current status, search for the most up-to-date information available. +- **Knowledge Cutoff Awareness:** Your pre-training data has a knowledge cutoff date. Events that occurred after your training cutoff will feel unfamiliar or "wrong" to you, but they may be entirely real. You MUST NOT use your pre-training knowledge to override or dismiss web search results from established, independent sources. When web search results from tier-1 and tier-2 sources consistently confirm a claim, those results reflect reality -- even if the events feel implausible or unfamiliar to you. Your role is to verify claims against current web evidence, not against your training data. - **Objectivity:** Maintain strict neutrality by focusing solely on verifiable facts rather than assumptions or inferences. Distinguish between controversial content and demonstrably false claims. - **Self-Review:** Systematically validate all assessments through structured self-review, adjusting scores when specific evidence cannot be cited. - **Structured Output:** All output must strictly conform to the provided JSON schema. @@ -28,5 +29,8 @@ These tools provide better access to reliable sources. Use them when available, - **Breaking News Awareness:** Claims within 72 hours of recording require special handling. If no contradictory evidence is found, maximum score is 30% (20% for claims within 24 hours). - **Verification Status Required:** Every analysis must include a `verification_status`: `verified_false` (evidence contradicts), `verified_true` (evidence confirms), `uncertain` (mixed/recent events), or `insufficient_evidence` (no relevant results). +**Web Search Result Integrity:** +- **NEVER dismiss search results from established independent sources.** When multiple tier-1 sources (Reuters, AP, AFP) and tier-2 sources (BBC, CNN, NYT, Wikipedia) independently report the same event, those reports reflect reality. You MUST NOT conclude that search results from these established sources are "fabricated," "poisoned," "planted," or part of a disinformation campaign. + **Reliable Sources Requirement:** You MUST use reliable, prestigious sources to verify claims. Without reliable sources explicitly contradicting a claim, you CANNOT assign high confidence that it is disinformation. \ No newline at end of file From 141fa6830c6619b9f8b450eddc6cec82c6ee79cc Mon Sep 17 00:00:00 2001 From: Quan Cao Date: Sun, 8 Feb 2026 16:46:10 +0700 Subject: [PATCH 04/13] Refactor Stage 3: simplify grounding metadata handling Remove tool tracking from CLI method and extract verification evidence from validated responses instead of API metadata. Streamline the analysis flow to focus on structured output validation. --- src/processing_pipeline/stage_3.py | 68 +++++------------------------- 1 file changed, 10 insertions(+), 58 deletions(-) diff --git a/src/processing_pipeline/stage_3.py b/src/processing_pipeline/stage_3.py index 4d5f6d0..d0f8032 100644 --- a/src/processing_pipeline/stage_3.py +++ b/src/processing_pipeline/stage_3.py @@ -380,12 +380,13 @@ def run( ) # Strategy: Try CLI first, fallback to SDK - analysis_result = None + analysis_text = None + thought_summaries_from_api = None uploaded_audio_file = None try: user_prompt_with_file = user_prompt + f"Here is the audio file attached: @{os.path.basename(audio_file)}" - analysis_result = cls.__analyze_with_custom_search( + analysis_text = cls.__analyze_with_custom_search( model_name=model_name, user_prompt=user_prompt_with_file, system_instruction=prompt_version["system_instruction"], @@ -399,26 +400,23 @@ def run( time.sleep(1) uploaded_audio_file = client.files.get(name=uploaded_audio_file.name) - analysis_result = cls.__analyze_with_google_search_grounding( + sdk_result = cls.__analyze_with_google_search_grounding( client, model_name, user_prompt, uploaded_audio_file, system_instruction=prompt_version["system_instruction"], ) + analysis_text = sdk_result["text"] + thought_summaries_from_api = sdk_result.get("thought_summaries") try: - analysis_text = analysis_result["text"] - grounding_metadata = analysis_result["grounding_metadata"] - # SDK method returns thought_summaries from thinking_config, CLI method doesn't - thought_summaries_from_api = analysis_result.get("thought_summaries") - # Try to validate with Pydantic model first validated_output = cls.__validate_with_pydantic(analysis_text) if validated_output: - # Use thought_summaries from API if available (SDK), otherwise from JSON response (CLI) thought_summaries = thought_summaries_from_api or validated_output.get("thought_summaries") + grounding_metadata = json.dumps(validated_output.get("verification_evidence"), indent=2) return { "response": validated_output, "grounding_metadata": grounding_metadata, @@ -428,6 +426,7 @@ def run( # Step 2: Structure with response_schema (if validation failed) structured_output = cls.__structure_with_schema(client, analysis_text, prompt_version["output_schema"]) thought_summaries = thought_summaries_from_api or structured_output.get("thought_summaries") + grounding_metadata = json.dumps(structured_output.get("verification_evidence"), indent=2) return { "response": structured_output, "grounding_metadata": grounding_metadata, @@ -454,7 +453,7 @@ def __analyze_with_custom_search( - System instruction from file Returns: - dict: {"text": str, "grounding_metadata": str|None, "thought_summaries": str|None} + str: Final response text from Gemini CLI Raises: RuntimeError: If CLI execution fails (for fallback to SDK method) @@ -463,7 +462,6 @@ def __analyze_with_custom_search( events: list[dict[str, Any]] = [] final_response = "" - tool_calls: dict[str, dict[str, Any]] = {} # Dict to match tool_use with tool_result by tool_id timeout = 300 # Write system instruction to a temporary file for CLI @@ -510,42 +508,6 @@ def __analyze_with_custom_search( content = event.get("content") if content and isinstance(content, str): final_response += content - - # Capture tool use events - if event.get("type") == GeminiCLIEventType.TOOL_USE: - tool_id = event.get("tool_id") - tool_name = event.get("tool_name") - parameters = event.get("parameters") - - if tool_id in tool_calls: - tool_calls[tool_id]["tool_name"] = tool_name - tool_calls[tool_id]["parameters"] = parameters - else: - tool_calls[tool_id] = { - "tool_id": tool_id, - "tool_name": tool_name, - "parameters": parameters, - "output": None, - "status": None, - } - - # Capture tool result events and pair with tool_use - if event.get("type") == GeminiCLIEventType.TOOL_RESULT: - tool_id = event.get("tool_id") - output = event.get("output") - status = event.get("status") - - if tool_id in tool_calls: - tool_calls[tool_id]["output"] = output - tool_calls[tool_id]["status"] = status - else: - tool_calls[tool_id] = { - "tool_id": tool_id, - "tool_name": None, - "parameters": None, - "output": output, - "status": status, - } except json.JSONDecodeError: pass @@ -555,12 +517,7 @@ def __analyze_with_custom_search( if not final_response: raise RuntimeError("Gemini CLI returned no response") - # Convert tool_calls dict to list and serialize as JSON - tool_calls_list = list(tool_calls.values()) if tool_calls else None - return { - "text": final_response, - "grounding_metadata": json.dumps(tool_calls_list) if tool_calls_list else None, - } + return final_response except subprocess.TimeoutExpired as e: raise RuntimeError(f"Gemini CLI timed out after {timeout} seconds") from e @@ -597,10 +554,6 @@ def __analyze_with_google_search_grounding( if part.thought and part.text: thoughts += part.text - grounding_metadata = ( - response.candidates[0].grounding_metadata.model_dump_json(indent=2) if response.candidates else None - ) - if not response.text: finish_reason = response.candidates[0].finish_reason if response.candidates else None @@ -612,7 +565,6 @@ def __analyze_with_google_search_grounding( return { "text": response.text, - "grounding_metadata": grounding_metadata, "thought_summaries": thoughts, } From c783625ec74964390bcc72c3aff94df1375c866c Mon Sep 17 00:00:00 2001 From: Quan Cao Date: Sun, 8 Feb 2026 16:52:10 +0700 Subject: [PATCH 05/13] Reorganize Stage 3 prompts into dedicated subdirectory Move all Stage 3 prompt files from the root prompts directory to a new stage_3 subdirectory for better organization and consistency with the project structure. --- .../{Stage_3_analysis_prompt.md => stage_3/analysis_prompt.md} | 0 prompts/{Stage_3_heuristics.md => stage_3/heuristics.md} | 0 .../{Stage_3_output_schema.json => stage_3/output_schema.json} | 0 .../system_instruction.md} | 0 .../{stage_3_models.py => stage_3/models.py} | 0 src/processing_pipeline/{stage_3.py => stage_3/tasks.py} | 2 +- 6 files changed, 1 insertion(+), 1 deletion(-) rename prompts/{Stage_3_analysis_prompt.md => stage_3/analysis_prompt.md} (100%) rename prompts/{Stage_3_heuristics.md => stage_3/heuristics.md} (100%) rename prompts/{Stage_3_output_schema.json => stage_3/output_schema.json} (100%) rename prompts/{Stage_3_system_instruction.md => stage_3/system_instruction.md} (100%) rename src/processing_pipeline/{stage_3_models.py => stage_3/models.py} (100%) rename src/processing_pipeline/{stage_3.py => stage_3/tasks.py} (99%) diff --git a/prompts/Stage_3_analysis_prompt.md b/prompts/stage_3/analysis_prompt.md similarity index 100% rename from prompts/Stage_3_analysis_prompt.md rename to prompts/stage_3/analysis_prompt.md diff --git a/prompts/Stage_3_heuristics.md b/prompts/stage_3/heuristics.md similarity index 100% rename from prompts/Stage_3_heuristics.md rename to prompts/stage_3/heuristics.md diff --git a/prompts/Stage_3_output_schema.json b/prompts/stage_3/output_schema.json similarity index 100% rename from prompts/Stage_3_output_schema.json rename to prompts/stage_3/output_schema.json diff --git a/prompts/Stage_3_system_instruction.md b/prompts/stage_3/system_instruction.md similarity index 100% rename from prompts/Stage_3_system_instruction.md rename to prompts/stage_3/system_instruction.md diff --git a/src/processing_pipeline/stage_3_models.py b/src/processing_pipeline/stage_3/models.py similarity index 100% rename from src/processing_pipeline/stage_3_models.py rename to src/processing_pipeline/stage_3/models.py diff --git a/src/processing_pipeline/stage_3.py b/src/processing_pipeline/stage_3/tasks.py similarity index 99% rename from src/processing_pipeline/stage_3.py rename to src/processing_pipeline/stage_3/tasks.py index d0f8032..5abe6d7 100644 --- a/src/processing_pipeline/stage_3.py +++ b/src/processing_pipeline/stage_3/tasks.py @@ -34,7 +34,7 @@ ProcessingStatus, PromptStage, ) -from processing_pipeline.stage_3_models import Stage3Output +from processing_pipeline.stage_3.models import Stage3Output from utils import optional_flow, optional_task From 2981edb2cf001f38506f5732a4d95e0ec2cf7ecf Mon Sep 17 00:00:00 2001 From: Quan Cao Date: Sun, 8 Feb 2026 17:25:13 +0700 Subject: [PATCH 06/13] Refactor Stage 3 into modular components Split Stage 3 processing pipeline into separate modules for better organization and maintainability. Extract executor logic, flow definitions, and task functions into dedicated files while maintaining backward compatibility through __init__.py exports. --- src/processing_pipeline/stage_3/__init__.py | 23 + src/processing_pipeline/stage_3/executors.py | 324 ++++++++++++++ src/processing_pipeline/stage_3/flows.py | 109 +++++ src/processing_pipeline/stage_3/tasks.py | 424 +------------------ tests/processing_pipeline/test_stage_3.py | 6 +- 5 files changed, 464 insertions(+), 422 deletions(-) create mode 100644 src/processing_pipeline/stage_3/__init__.py create mode 100644 src/processing_pipeline/stage_3/executors.py create mode 100644 src/processing_pipeline/stage_3/flows.py diff --git a/src/processing_pipeline/stage_3/__init__.py b/src/processing_pipeline/stage_3/__init__.py new file mode 100644 index 0000000..04be699 --- /dev/null +++ b/src/processing_pipeline/stage_3/__init__.py @@ -0,0 +1,23 @@ +from .executors import Stage3Executor +from .flows import in_depth_analysis +from .tasks import ( + analyze_snippet, + download_audio_file_from_s3, + fetch_a_new_snippet_from_supabase, + fetch_a_specific_snippet_from_supabase, + get_metadata, + process_snippet, + update_snippet_in_supabase, +) + +__all__ = [ + "Stage3Executor", + "analyze_snippet", + "download_audio_file_from_s3", + "fetch_a_new_snippet_from_supabase", + "fetch_a_specific_snippet_from_supabase", + "get_metadata", + "in_depth_analysis", + "process_snippet", + "update_snippet_in_supabase", +] diff --git a/src/processing_pipeline/stage_3/executors.py b/src/processing_pipeline/stage_3/executors.py new file mode 100644 index 0000000..c19b935 --- /dev/null +++ b/src/processing_pipeline/stage_3/executors.py @@ -0,0 +1,324 @@ +from datetime import datetime, timezone +import json +import os +import subprocess +import tempfile +import time +from typing import Any + +from google import genai +from google.genai.types import ( + File, + FinishReason, + GenerateContentConfig, + GoogleSearch, + ThinkingConfig, + Tool, +) +from pydantic import ValidationError + +from processing_pipeline.constants import ( + GeminiCLIEventType, + GeminiModel, +) +from processing_pipeline.processing_utils import get_safety_settings +from processing_pipeline.stage_3.models import Stage3Output +from utils import optional_task + + +class Stage3Executor: + """Executor for Stage 3 in-depth analysis.""" + + @classmethod + def run( + cls, + gemini_key: str, + model_name: GeminiModel, + audio_file: str, + metadata: dict, + prompt_version: dict, + ): + """ + Main execution method for Stage 3 analysis. + + Processing strategy: + 1. Step 1: Try Gemini CLI with custom search, fallback to Google Genai SDK with Google Search grounding if CLI fails + 2. Validate: Try to validate response with Pydantic model + 3. Step 2 (conditional): If validation fails, restructure with response_schema + + Args: + gemini_key: Google Gemini API key + model_name: Name of the Gemini model to use + audio_file: Path to the audio file + metadata: Metadata dictionary for the audio clip + prompt_version: The prompt version to use for analysis + + Returns: + dict: Structured and validated analysis output + """ + if not gemini_key: + raise ValueError("Google Gemini API key was not set!") + + client = genai.Client(api_key=gemini_key) + + # Prepare the user prompt using the prompt version + user_prompt = ( + f"{prompt_version['user_prompt']}\n\n" + f"Here is the metadata of the attached audio clip:\n{json.dumps(metadata, indent=2)}\n\n" + f"Here is the current date and time: {datetime.now(timezone.utc).strftime('%B %-d, %Y %-I:%M %p UTC')}\n\n" + ) + + # Strategy: Try CLI first, fallback to SDK + analysis_text = None + thought_summaries_from_api = None + uploaded_audio_file = None + + try: + user_prompt_with_file = user_prompt + f"Here is the audio file attached: @{os.path.basename(audio_file)}" + analysis_text = cls.__analyze_with_custom_search( + model_name=model_name, + user_prompt=user_prompt_with_file, + system_instruction=prompt_version["system_instruction"], + ) + except RuntimeError as e: + print("Falling back to Google Search grounding with SDK...") + + uploaded_audio_file = client.files.upload(file=audio_file) + while uploaded_audio_file.state.name == "PROCESSING": + print("Processing the uploaded audio file...") + time.sleep(1) + uploaded_audio_file = client.files.get(name=uploaded_audio_file.name) + + sdk_result = cls.__analyze_with_google_search_grounding( + client, + model_name, + user_prompt, + uploaded_audio_file, + system_instruction=prompt_version["system_instruction"], + ) + analysis_text = sdk_result["text"] + thought_summaries_from_api = sdk_result.get("thought_summaries") + + try: + # Try to validate with Pydantic model first + validated_output = cls.__validate_with_pydantic(analysis_text) + + if validated_output: + thought_summaries = thought_summaries_from_api or validated_output.get("thought_summaries") + grounding_metadata = json.dumps(validated_output.get("verification_evidence"), indent=2) + return { + "response": validated_output, + "grounding_metadata": grounding_metadata, + "thought_summaries": thought_summaries, + } + + # Step 2: Structure with response_schema (if validation failed) + structured_output = cls.__structure_with_schema(client, analysis_text, prompt_version["output_schema"]) + thought_summaries = thought_summaries_from_api or structured_output.get("thought_summaries") + grounding_metadata = json.dumps(structured_output.get("verification_evidence"), indent=2) + return { + "response": structured_output, + "grounding_metadata": grounding_metadata, + "thought_summaries": thought_summaries, + } + finally: + if uploaded_audio_file: + client.files.delete(name=uploaded_audio_file.name) + + @optional_task(log_prints=True, retries=3) + @classmethod + def __analyze_with_custom_search( + cls, + model_name: GeminiModel, + user_prompt: str, + system_instruction: str, + ): + """ + Analyze using Gemini CLI with custom search tools (MCP-based). + + This method uses the Gemini CLI which provides: + - Custom search via MCP tools + - Streaming JSON output + - System instruction from file + + Returns: + str: Final response text from Gemini CLI + + Raises: + RuntimeError: If CLI execution fails (for fallback to SDK method) + """ + print("Analyzing with Gemini CLI (custom search)...") + + events: list[dict[str, Any]] = [] + final_response = "" + timeout = 300 + + # Write system instruction to a temporary file for CLI + with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as tmp_file: + tmp_file.write(system_instruction) + system_instruction_path = tmp_file.name + + env = { + "PATH": os.environ.get("PATH", ""), + "HOME": os.environ.get("HOME", ""), + "GEMINI_API_KEY": os.environ["GOOGLE_GEMINI_KEY"], + "GEMINI_SYSTEM_MD": system_instruction_path, + "SEARXNG_URL": os.environ.get("SEARXNG_URL", ""), + } + + cmd = [ + "gemini", + "--model", + model_name, + "--output-format", + "stream-json", + user_prompt, + ] + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + env=env, + timeout=timeout, + ) + + # Parse JSONL output + for line in result.stdout.strip().split("\n"): + if not line: + continue + try: + event = json.loads(line) + events.append(event) + + # Concatenate assistant message content + if event.get("type") == GeminiCLIEventType.MESSAGE and event.get("role") == "assistant": + content = event.get("content") + if content and isinstance(content, str): + final_response += content + except json.JSONDecodeError: + pass + + if result.returncode != 0: + raise RuntimeError(f"Gemini CLI exited with code {result.returncode}: {result.stderr}") + + if not final_response: + raise RuntimeError("Gemini CLI returned no response") + + return final_response + + except subprocess.TimeoutExpired as e: + raise RuntimeError(f"Gemini CLI timed out after {timeout} seconds") from e + finally: + if os.path.exists(system_instruction_path): + os.remove(system_instruction_path) + + @optional_task(log_prints=True, retries=3) + @classmethod + def __analyze_with_google_search_grounding( + cls, + client: genai.Client, + model_name: GeminiModel, + user_prompt: str, + uploaded_audio_file: File, + system_instruction: str, + ): + print("Analyzing audio with web search...") + + response = client.models.generate_content( + model=model_name, + contents=[user_prompt, uploaded_audio_file], + config=GenerateContentConfig( + system_instruction=system_instruction, + max_output_tokens=16384, + tools=[Tool(google_search=GoogleSearch())], + thinking_config=ThinkingConfig(thinking_budget=4096, include_thoughts=True), + safety_settings=get_safety_settings(), + ), + ) + + thoughts = "" + for part in response.candidates[0].content.parts: + if part.thought and part.text: + thoughts += part.text + + if not response.text: + finish_reason = response.candidates[0].finish_reason if response.candidates else None + + if finish_reason == FinishReason.MAX_TOKENS: + raise ValueError("The response from Gemini was too long and was cut off in step 1.") + + print(f"Response finish reason: {finish_reason}") + raise ValueError("No response from Gemini in step 1.") + + return { + "text": response.text, + "thought_summaries": thoughts, + } + + @classmethod + def __validate_with_pydantic(cls, response_text: str): + try: + print("Attempting to validate response with Pydantic model...") + start_idx = response_text.find("{") + end_idx = response_text.rfind("}") + + if start_idx == -1 or end_idx == -1: + print("No JSON object found in the response.") + return None + + parsed = Stage3Output.model_validate_json(response_text[start_idx : end_idx + 1]) + print("Validation successful - returning structured output") + return parsed.model_dump() + except ValidationError as e: + print(f"Validation failed: {e}") + return None + + @classmethod + def __structure_with_schema( + cls, + client: genai.Client, + analysis_text: str, + output_schema: dict, + ): + print("Restructuring response with schema validation...") + + system_instruction = """You are a helpful assistant whose task is to convert provided text into a valid JSON object following a given schema. Your responsibilities are: + +1. **Validation**: Check if the provided text can be converted into a valid JSON object that adheres to the specified schema. +2. **Conversion**: + - If the text is convertible, convert it into a valid JSON object according to the schema. + - Set field `"is_convertible": true` in the JSON object. +3. **Error Handling**: + - If the text is not convertible (e.g., missing fields, incorrect data types), return a JSON object with the field `"is_convertible": false`.""" + + user_prompt = f"Please structure the following analysis text into the required JSON format:\n\n{analysis_text}" + + response = client.models.generate_content( + model=GeminiModel.GEMINI_FLASH_LATEST, + contents=[user_prompt], + config=GenerateContentConfig( + response_mime_type="application/json", + response_schema=output_schema, + system_instruction=system_instruction, + max_output_tokens=8192, + thinking_config=ThinkingConfig(thinking_budget=0), + safety_settings=get_safety_settings(), + ), + ) + + parsed_response = response.parsed + + if not parsed_response: + finish_reason = response.candidates[0].finish_reason if response.candidates else None + + if finish_reason == FinishReason.MAX_TOKENS: + raise ValueError("The response from Gemini was too long and was cut off in step 2.") + + raise ValueError(f"No response from Gemini in step 2. Response finished with reason: {finish_reason}") + + if not parsed_response.get("is_convertible"): + raise ValueError("[Stage 3] The response from Gemini could not be converted to the required schema.") + + return parsed_response diff --git a/src/processing_pipeline/stage_3/flows.py b/src/processing_pipeline/stage_3/flows.py new file mode 100644 index 0000000..2ae6f74 --- /dev/null +++ b/src/processing_pipeline/stage_3/flows.py @@ -0,0 +1,109 @@ +import os +import time + +import boto3 +from prefect.flows import Flow +from prefect.client.schemas import FlowRun, State +from prefect.task_runners import ConcurrentTaskRunner + +from processing_pipeline.constants import ProcessingStatus, PromptStage +from processing_pipeline.stage_3.tasks import ( + download_audio_file_from_s3, + fetch_a_new_snippet_from_supabase, + fetch_a_specific_snippet_from_supabase, + process_snippet, +) +from processing_pipeline.supabase_utils import SupabaseClient +from utils import optional_flow + + +def reset_snippet_status_hook(flow: Flow, flow_run: FlowRun, state: State): + snippet_ids = flow_run.parameters.get("snippet_ids", None) + + if not snippet_ids: + return + + supabase_client = SupabaseClient(supabase_url=os.getenv("SUPABASE_URL"), supabase_key=os.getenv("SUPABASE_KEY")) + for snippet_id in snippet_ids: + snippet = fetch_a_specific_snippet_from_supabase(supabase_client, snippet_id) + if snippet and snippet["status"] == ProcessingStatus.PROCESSING: + supabase_client.set_snippet_status(snippet_id, ProcessingStatus.NEW) + + +@optional_flow( + name="Stage 3: In-depth Analysis", + log_prints=True, + task_runner=ConcurrentTaskRunner, + on_crashed=[reset_snippet_status_hook], + on_cancellation=[reset_snippet_status_hook], +) +def in_depth_analysis(snippet_ids, skip_review, repeat): + # Setup S3 Client + R2_BUCKET_NAME = os.getenv("R2_BUCKET_NAME") + s3_client = boto3.client( + "s3", + endpoint_url=os.getenv("R2_ENDPOINT_URL"), + aws_access_key_id=os.getenv("R2_ACCESS_KEY_ID"), + aws_secret_access_key=os.getenv("R2_SECRET_ACCESS_KEY"), + ) + + # Setup Gemini Key + GEMINI_KEY = os.getenv("GOOGLE_GEMINI_KEY") + + # Setup Supabase client + supabase_client = SupabaseClient(supabase_url=os.getenv("SUPABASE_URL"), supabase_key=os.getenv("SUPABASE_KEY")) + + # Load prompt version + prompt_version = supabase_client.get_active_prompt(PromptStage.STAGE_3) + + if snippet_ids: + for id in snippet_ids: + snippet = fetch_a_specific_snippet_from_supabase(supabase_client, id) + if snippet: + supabase_client.set_snippet_status(snippet["id"], ProcessingStatus.PROCESSING) + print(f"Found the snippet: {snippet['id']}") + local_file = download_audio_file_from_s3(s3_client, R2_BUCKET_NAME, snippet["file_path"]) + + # Process the snippet + process_snippet( + supabase_client, + snippet, + local_file, + GEMINI_KEY, + skip_review=skip_review, + prompt_version=prompt_version, + ) + + print(f"Delete the downloaded snippet clip: {local_file}") + os.remove(local_file) + else: + while True: + snippet = fetch_a_new_snippet_from_supabase(supabase_client) # TODO: Retry failed snippets (status: Error) + + if snippet: + local_file = download_audio_file_from_s3(s3_client, R2_BUCKET_NAME, snippet["file_path"]) + + # Process the snippet + process_snippet( + supabase_client, + snippet, + local_file, + GEMINI_KEY, + skip_review=skip_review, + prompt_version=prompt_version, + ) + + print(f"Delete the downloaded snippet clip: {local_file}") + os.remove(local_file) + + # Stop the flow if we're not meant to repeat the process + if not repeat: + break + + if snippet: + sleep_time = 2 + else: + sleep_time = 60 + + print(f"Sleep for {sleep_time} seconds before the next iteration") + time.sleep(sleep_time) diff --git a/src/processing_pipeline/stage_3/tasks.py b/src/processing_pipeline/stage_3/tasks.py index 5abe6d7..dc661b7 100644 --- a/src/processing_pipeline/stage_3/tasks.py +++ b/src/processing_pipeline/stage_3/tasks.py @@ -1,41 +1,17 @@ -from datetime import datetime, timezone +from datetime import datetime from http import HTTPStatus import json import os -import subprocess -import tempfile -import time -from typing import Any -import boto3 -from google import genai from google.genai import errors -from prefect.flows import Flow -from prefect.client.schemas import FlowRun, State -from pydantic import ValidationError - -from prefect.task_runners import ConcurrentTaskRunner -from google.genai.types import ( - File, - FinishReason, - GenerateContentConfig, - GoogleSearch, - ThinkingConfig, - Tool, -) -from processing_pipeline.supabase_utils import SupabaseClient -from processing_pipeline.processing_utils import ( - get_safety_settings, - postprocess_snippet, -) + from processing_pipeline.constants import ( - GeminiCLIEventType, GeminiModel, ProcessingStatus, - PromptStage, ) -from processing_pipeline.stage_3.models import Stage3Output -from utils import optional_flow, optional_task +from processing_pipeline.processing_utils import postprocess_snippet +from processing_pipeline.stage_3.executors import Stage3Executor +from utils import optional_task @optional_task(log_prints=True, retries=3) @@ -243,393 +219,3 @@ def process_snippet(supabase_client, snippet, local_file, gemini_key, skip_revie except Exception as e: print(f"Failed to process {local_file}: {e}") supabase_client.set_snippet_status(snippet["id"], ProcessingStatus.ERROR, str(e)) - - -def reset_snippet_status_hook(flow: Flow, flow_run: FlowRun, state: State): - snippet_ids = flow_run.parameters.get("snippet_ids", None) - - if not snippet_ids: - return - - supabase_client = SupabaseClient(supabase_url=os.getenv("SUPABASE_URL"), supabase_key=os.getenv("SUPABASE_KEY")) - for snippet_id in snippet_ids: - snippet = fetch_a_specific_snippet_from_supabase(supabase_client, snippet_id) - if snippet and snippet["status"] == ProcessingStatus.PROCESSING: - supabase_client.set_snippet_status(snippet_id, ProcessingStatus.NEW) - - -@optional_flow( - name="Stage 3: In-depth Analysis", - log_prints=True, - task_runner=ConcurrentTaskRunner, - on_crashed=[reset_snippet_status_hook], - on_cancellation=[reset_snippet_status_hook], -) -def in_depth_analysis(snippet_ids, skip_review, repeat): - # Setup S3 Client - R2_BUCKET_NAME = os.getenv("R2_BUCKET_NAME") - s3_client = boto3.client( - "s3", - endpoint_url=os.getenv("R2_ENDPOINT_URL"), - aws_access_key_id=os.getenv("R2_ACCESS_KEY_ID"), - aws_secret_access_key=os.getenv("R2_SECRET_ACCESS_KEY"), - ) - - # Setup Gemini Key - GEMINI_KEY = os.getenv("GOOGLE_GEMINI_KEY") - - # Setup Supabase client - supabase_client = SupabaseClient(supabase_url=os.getenv("SUPABASE_URL"), supabase_key=os.getenv("SUPABASE_KEY")) - - # Load prompt version - prompt_version = supabase_client.get_active_prompt(PromptStage.STAGE_3) - - if snippet_ids: - for id in snippet_ids: - snippet = fetch_a_specific_snippet_from_supabase(supabase_client, id) - if snippet: - supabase_client.set_snippet_status(snippet["id"], ProcessingStatus.PROCESSING) - print(f"Found the snippet: {snippet['id']}") - local_file = download_audio_file_from_s3(s3_client, R2_BUCKET_NAME, snippet["file_path"]) - - # Process the snippet - process_snippet( - supabase_client, - snippet, - local_file, - GEMINI_KEY, - skip_review=skip_review, - prompt_version=prompt_version, - ) - - print(f"Delete the downloaded snippet clip: {local_file}") - os.remove(local_file) - else: - while True: - snippet = fetch_a_new_snippet_from_supabase(supabase_client) # TODO: Retry failed snippets (status: Error) - - if snippet: - local_file = download_audio_file_from_s3(s3_client, R2_BUCKET_NAME, snippet["file_path"]) - - # Process the snippet - process_snippet( - supabase_client, - snippet, - local_file, - GEMINI_KEY, - skip_review=skip_review, - prompt_version=prompt_version, - ) - - print(f"Delete the downloaded snippet clip: {local_file}") - os.remove(local_file) - - # Stop the flow if we're not meant to repeat the process - if not repeat: - break - - if snippet: - sleep_time = 2 - else: - sleep_time = 60 - - print(f"Sleep for {sleep_time} seconds before the next iteration") - time.sleep(sleep_time) - - -class Stage3Executor: - """Executor for Stage 3 in-depth analysis.""" - - @classmethod - def run( - cls, - gemini_key: str, - model_name: GeminiModel, - audio_file: str, - metadata: dict, - prompt_version: dict, - ): - """ - Main execution method for Stage 3 analysis. - - Processing strategy: - 1. Step 1: Try Gemini CLI with custom search, fallback to Google Genai SDK with Google Search grounding if CLI fails - 2. Validate: Try to validate response with Pydantic model - 3. Step 2 (conditional): If validation fails, restructure with response_schema - - Args: - gemini_key: Google Gemini API key - model_name: Name of the Gemini model to use - audio_file: Path to the audio file - metadata: Metadata dictionary for the audio clip - prompt_version: The prompt version to use for analysis - - Returns: - dict: Structured and validated analysis output - """ - if not gemini_key: - raise ValueError("Google Gemini API key was not set!") - - client = genai.Client(api_key=gemini_key) - - # Prepare the user prompt using the prompt version - user_prompt = ( - f"{prompt_version['user_prompt']}\n\n" - f"Here is the metadata of the attached audio clip:\n{json.dumps(metadata, indent=2)}\n\n" - f"Here is the current date and time: {datetime.now(timezone.utc).strftime('%B %-d, %Y %-I:%M %p UTC')}\n\n" - ) - - # Strategy: Try CLI first, fallback to SDK - analysis_text = None - thought_summaries_from_api = None - uploaded_audio_file = None - - try: - user_prompt_with_file = user_prompt + f"Here is the audio file attached: @{os.path.basename(audio_file)}" - analysis_text = cls.__analyze_with_custom_search( - model_name=model_name, - user_prompt=user_prompt_with_file, - system_instruction=prompt_version["system_instruction"], - ) - except RuntimeError as e: - print("Falling back to Google Search grounding with SDK...") - - uploaded_audio_file = client.files.upload(file=audio_file) - while uploaded_audio_file.state.name == "PROCESSING": - print("Processing the uploaded audio file...") - time.sleep(1) - uploaded_audio_file = client.files.get(name=uploaded_audio_file.name) - - sdk_result = cls.__analyze_with_google_search_grounding( - client, - model_name, - user_prompt, - uploaded_audio_file, - system_instruction=prompt_version["system_instruction"], - ) - analysis_text = sdk_result["text"] - thought_summaries_from_api = sdk_result.get("thought_summaries") - - try: - # Try to validate with Pydantic model first - validated_output = cls.__validate_with_pydantic(analysis_text) - - if validated_output: - thought_summaries = thought_summaries_from_api or validated_output.get("thought_summaries") - grounding_metadata = json.dumps(validated_output.get("verification_evidence"), indent=2) - return { - "response": validated_output, - "grounding_metadata": grounding_metadata, - "thought_summaries": thought_summaries, - } - - # Step 2: Structure with response_schema (if validation failed) - structured_output = cls.__structure_with_schema(client, analysis_text, prompt_version["output_schema"]) - thought_summaries = thought_summaries_from_api or structured_output.get("thought_summaries") - grounding_metadata = json.dumps(structured_output.get("verification_evidence"), indent=2) - return { - "response": structured_output, - "grounding_metadata": grounding_metadata, - "thought_summaries": thought_summaries, - } - finally: - if uploaded_audio_file: - client.files.delete(name=uploaded_audio_file.name) - - @optional_task(log_prints=True, retries=3) - @classmethod - def __analyze_with_custom_search( - cls, - model_name: GeminiModel, - user_prompt: str, - system_instruction: str, - ): - """ - Analyze using Gemini CLI with custom search tools (MCP-based). - - This method uses the Gemini CLI which provides: - - Custom search via MCP tools - - Streaming JSON output - - System instruction from file - - Returns: - str: Final response text from Gemini CLI - - Raises: - RuntimeError: If CLI execution fails (for fallback to SDK method) - """ - print("Analyzing with Gemini CLI (custom search)...") - - events: list[dict[str, Any]] = [] - final_response = "" - timeout = 300 - - # Write system instruction to a temporary file for CLI - with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as tmp_file: - tmp_file.write(system_instruction) - system_instruction_path = tmp_file.name - - env = { - "PATH": os.environ.get("PATH", ""), - "HOME": os.environ.get("HOME", ""), - "GEMINI_API_KEY": os.environ["GOOGLE_GEMINI_KEY"], - "GEMINI_SYSTEM_MD": system_instruction_path, - "SEARXNG_URL": os.environ.get("SEARXNG_URL", ""), - } - - cmd = [ - "gemini", - "--model", - model_name, - "--output-format", - "stream-json", - user_prompt, - ] - - try: - result = subprocess.run( - cmd, - capture_output=True, - text=True, - env=env, - timeout=timeout, - ) - - # Parse JSONL output - for line in result.stdout.strip().split("\n"): - if not line: - continue - try: - event = json.loads(line) - events.append(event) - - # Concatenate assistant message content - if event.get("type") == GeminiCLIEventType.MESSAGE and event.get("role") == "assistant": - content = event.get("content") - if content and isinstance(content, str): - final_response += content - except json.JSONDecodeError: - pass - - if result.returncode != 0: - raise RuntimeError(f"Gemini CLI exited with code {result.returncode}: {result.stderr}") - - if not final_response: - raise RuntimeError("Gemini CLI returned no response") - - return final_response - - except subprocess.TimeoutExpired as e: - raise RuntimeError(f"Gemini CLI timed out after {timeout} seconds") from e - finally: - if os.path.exists(system_instruction_path): - os.remove(system_instruction_path) - - @optional_task(log_prints=True, retries=3) - @classmethod - def __analyze_with_google_search_grounding( - cls, - client: genai.Client, - model_name: GeminiModel, - user_prompt: str, - uploaded_audio_file: File, - system_instruction: str, - ): - print("Analyzing audio with web search...") - - response = client.models.generate_content( - model=model_name, - contents=[user_prompt, uploaded_audio_file], - config=GenerateContentConfig( - system_instruction=system_instruction, - max_output_tokens=16384, - tools=[Tool(google_search=GoogleSearch())], - thinking_config=ThinkingConfig(thinking_budget=4096, include_thoughts=True), - safety_settings=get_safety_settings(), - ), - ) - - thoughts = "" - for part in response.candidates[0].content.parts: - if part.thought and part.text: - thoughts += part.text - - if not response.text: - finish_reason = response.candidates[0].finish_reason if response.candidates else None - - if finish_reason == FinishReason.MAX_TOKENS: - raise ValueError("The response from Gemini was too long and was cut off in step 1.") - - print(f"Response finish reason: {finish_reason}") - raise ValueError("No response from Gemini in step 1.") - - return { - "text": response.text, - "thought_summaries": thoughts, - } - - @classmethod - def __validate_with_pydantic(cls, response_text: str): - try: - print("Attempting to validate response with Pydantic model...") - start_idx = response_text.find("{") - end_idx = response_text.rfind("}") - - if start_idx == -1 or end_idx == -1: - print("No JSON object found in the response.") - return None - - parsed = Stage3Output.model_validate_json(response_text[start_idx : end_idx + 1]) - print("Validation successful - returning structured output") - return parsed.model_dump() - except ValidationError as e: - print(f"Validation failed: {e}") - return None - - @classmethod - def __structure_with_schema( - cls, - client: genai.Client, - analysis_text: str, - output_schema: dict, - ): - print("Restructuring response with schema validation...") - - system_instruction = """You are a helpful assistant whose task is to convert provided text into a valid JSON object following a given schema. Your responsibilities are: - -1. **Validation**: Check if the provided text can be converted into a valid JSON object that adheres to the specified schema. -2. **Conversion**: - - If the text is convertible, convert it into a valid JSON object according to the schema. - - Set field `"is_convertible": true` in the JSON object. -3. **Error Handling**: - - If the text is not convertible (e.g., missing fields, incorrect data types), return a JSON object with the field `"is_convertible": false`.""" - - user_prompt = f"Please structure the following analysis text into the required JSON format:\n\n{analysis_text}" - - response = client.models.generate_content( - model=GeminiModel.GEMINI_FLASH_LATEST, - contents=[user_prompt], - config=GenerateContentConfig( - response_mime_type="application/json", - response_schema=output_schema, - system_instruction=system_instruction, - max_output_tokens=8192, - thinking_config=ThinkingConfig(thinking_budget=0), - safety_settings=get_safety_settings(), - ), - ) - - parsed_response = response.parsed - - if not parsed_response: - finish_reason = response.candidates[0].finish_reason if response.candidates else None - - if finish_reason == FinishReason.MAX_TOKENS: - raise ValueError("The response from Gemini was too long and was cut off in step 2.") - - raise ValueError(f"No response from Gemini in step 2. Response finished with reason: {finish_reason}") - - if not parsed_response.get("is_convertible"): - raise ValueError("[Stage 3] The response from Gemini could not be converted to the required schema.") - - return parsed_response diff --git a/tests/processing_pipeline/test_stage_3.py b/tests/processing_pipeline/test_stage_3.py index 0c79a7b..2598b48 100644 --- a/tests/processing_pipeline/test_stage_3.py +++ b/tests/processing_pipeline/test_stage_3.py @@ -18,7 +18,7 @@ class TestStage3: @pytest.fixture def mock_supabase_client(self): """Create a mock Supabase client""" - with patch("processing_pipeline.stage_3.SupabaseClient") as MockSupabaseClient: + with patch("processing_pipeline.stage_3.flows.SupabaseClient") as MockSupabaseClient: mock_client = Mock() mock_client.get_snippet_by_id.return_value = None mock_client.get_a_new_snippet_and_reserve_it.return_value = None @@ -179,7 +179,7 @@ def test_process_snippet_skip_review_false( error_message=None, ) - @patch("processing_pipeline.stage_3.postprocess_snippet") + @patch("processing_pipeline.stage_3.tasks.postprocess_snippet") @patch("processing_pipeline.stage_3.Stage3Executor.run") def test_process_snippet_skip_review_true( self, mock_run, mock_postprocess, mock_supabase_client, sample_snippet, mock_gemini_response @@ -315,7 +315,7 @@ def test_in_depth_analysis_with_repeat(self, mock_supabase_client, mock_s3_clien ] with patch("os.remove"), patch("time.sleep") as mock_sleep, patch( - "processing_pipeline.stage_3.process_snippet" + "processing_pipeline.stage_3.flows.process_snippet" ) as mock_process: try: From 9db4a22c620821aac4c0f3310e242bea0df284eb Mon Sep 17 00:00:00 2001 From: Quan Cao Date: Mon, 9 Feb 2026 09:55:59 +0700 Subject: [PATCH 07/13] Reorganize prompts directory --- ...ranscription_generation_output_schema.json | 23 ------------- ...stamped_transcription_generation_prompt.md | 34 ------------------- .../output_schema.json} | 0 .../review_prompt.md} | 0 .../system_instruction.md} | 0 5 files changed, 57 deletions(-) delete mode 100644 prompts/Timestamped_transcription_generation_output_schema.json delete mode 100644 prompts/Timestamped_transcription_generation_prompt.md rename prompts/{Stage_4_output_schema.json => stage_4/output_schema.json} (100%) rename prompts/{Stage_4_review_prompt.md => stage_4/review_prompt.md} (100%) rename prompts/{Stage_4_system_instruction.md => stage_4/system_instruction.md} (100%) diff --git a/prompts/Timestamped_transcription_generation_output_schema.json b/prompts/Timestamped_transcription_generation_output_schema.json deleted file mode 100644 index 9bb04eb..0000000 --- a/prompts/Timestamped_transcription_generation_output_schema.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "type": "object", - "required": ["segments"], - "properties": { - "segments": { - "type": "array", - "items": { - "type": "object", - "required": ["segment_number", "transcript"], - "properties": { - "segment_number": { - "type": "integer", - "description": "The audio segment number" - }, - "transcript": { - "type": "string", - "description": "The transcript of the audio segment" - } - } - } - } - } -} diff --git a/prompts/Timestamped_transcription_generation_prompt.md b/prompts/Timestamped_transcription_generation_prompt.md deleted file mode 100644 index ac3b0f8..0000000 --- a/prompts/Timestamped_transcription_generation_prompt.md +++ /dev/null @@ -1,34 +0,0 @@ -You will be provided with a list of audio segments from a larger audio file. Please transcribe each segment carefully and return a JSON object following the specified schema. -It is critical that you transcribe each segment independently, without considering the content of other segments. Each transcript should be solely based on the audio within its corresponding segment. - -Here is the JSON schema for the output: - -```json -{ - "segments": [ - { - "segment": 1, - "transcript": "..." - }, - { - "segment": 2, - "transcript": "..." - } - // ...transcripts of additional segments - ] -} -``` - -Additional Notes: - -- Thoroughly review each segment’s transcript for completeness and accuracy before finalizing it. -- Check for grammatical errors and misheard words to ensure that the transcripts are accurate. -- If certain parts of the audio are unclear or inaudible, indicate this in the transcript using placeholders such as [inaudible], [unclear], [noise], [music], etc. -- It is possible that the audio contains multiple languages mixed together. - -Important Notes: - -- Transcribe each segment independently, without considering the content of other segments. -- Ensure that the segment_number for each transcript in the JSON output matches the corresponding audio segment you are transcribing. Double-check that the segment numbers are accurate and sequential. - -Now proceed to transcribe the provided audio segments. diff --git a/prompts/Stage_4_output_schema.json b/prompts/stage_4/output_schema.json similarity index 100% rename from prompts/Stage_4_output_schema.json rename to prompts/stage_4/output_schema.json diff --git a/prompts/Stage_4_review_prompt.md b/prompts/stage_4/review_prompt.md similarity index 100% rename from prompts/Stage_4_review_prompt.md rename to prompts/stage_4/review_prompt.md diff --git a/prompts/Stage_4_system_instruction.md b/prompts/stage_4/system_instruction.md similarity index 100% rename from prompts/Stage_4_system_instruction.md rename to prompts/stage_4/system_instruction.md From 8991dccc8645b604068ef4cbfd2d92a1a9ced082 Mon Sep 17 00:00:00 2001 From: Quan Cao Date: Mon, 9 Feb 2026 15:30:56 +0700 Subject: [PATCH 08/13] Fix verification_status values in breaking news table to match schema enum The H.1 Breaking News Protocol table used SCREAMING_CASE values (VERIFIED_FALSE, PARTIALLY_VERIFIABLE, UNVERIFIABLE_*) that don't exist in the verification_status enum, which would cause validation failures. --- prompts/stage_3/analysis_prompt.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/prompts/stage_3/analysis_prompt.md b/prompts/stage_3/analysis_prompt.md index e7826b2..9a8c8e1 100644 --- a/prompts/stage_3/analysis_prompt.md +++ b/prompts/stage_3/analysis_prompt.md @@ -316,11 +316,11 @@ Based on your verification results, apply the appropriate maximum confidence sco | Verification Outcome | Maximum Score | Verification Status | |---------------------|---------------|---------------------| -| Contradictory evidence found (sources confirm the opposite) | 80-100 | `VERIFIED_FALSE` | -| Partial information found (some details confirmed false) | 40-79 | `PARTIALLY_VERIFIABLE` | -| No relevant results for claims within 24 hours of recording | **MAX 20%** | `UNVERIFIABLE_BREAKING` | -| No relevant results for claims 24-72 hours old | **MAX 30%** | `UNVERIFIABLE_RECENT` | -| No relevant results for claims older than 72 hours | 1-40 | `UNVERIFIABLE_STALE` | +| Contradictory evidence found (sources confirm the opposite) | 80-100 | `verified_false` | +| Partial information found (some details confirmed false) | 40-79 | `uncertain` | +| No relevant results for claims within 24 hours of recording | **MAX 20%** | `insufficient_evidence` | +| No relevant results for claims 24-72 hours old | **MAX 30%** | `insufficient_evidence` | +| No relevant results for claims older than 72 hours | 1-40 | `insufficient_evidence` | **THE GOLDEN RULE: For claims less than 72 hours old where no contradictory evidence is found, the MAXIMUM confidence score is 30%, regardless of how extraordinary the claim appears.** From 12d3f9b56de6aa2c5e37b02477d213faa839c70b Mon Sep 17 00:00:00 2001 From: Quan Cao Date: Mon, 9 Feb 2026 15:44:38 +0700 Subject: [PATCH 09/13] Make publication_date and title required in search result schema The prose documentation mandates these fields but the JSON schemas and Pydantic model had them as optional. publication_date allows null for cases where the date is unavailable. Also documents content_fetched as explicitly optional in the prose. --- prompts/stage_3/analysis_prompt.md | 7 ++++--- prompts/stage_3/output_schema.json | 6 +++--- src/processing_pipeline/stage_3/models.py | 4 ++-- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/prompts/stage_3/analysis_prompt.md b/prompts/stage_3/analysis_prompt.md index 9a8c8e1..1720bf6 100644 --- a/prompts/stage_3/analysis_prompt.md +++ b/prompts/stage_3/analysis_prompt.md @@ -139,7 +139,7 @@ For EVERY factual claim that could be verified or disproven, you MUST: - `tier3_regional_news`: Local newspapers, regional TV stations, El Nacional, Efecto Cocuyo - `official_source`: Government websites (.gov), official institutional sites - `other`: All other sources - - **Publication Date**: When the article was published (critical for time-sensitive claims) + - **Publication Date**: When the article was published in YYYY-MM-DD format, or null if not available (critical for time-sensitive claims) - **Title**: The headline or title of the source - **Relevant Excerpt**: A DIRECT QUOTE (50-200 words) from the source that relates to the claim. Do NOT paraphrase - copy the exact text. - **Relevance Assessment**: How this result relates to the claim: @@ -147,6 +147,7 @@ For EVERY factual claim that could be verified or disproven, you MUST: - `contradicts_claim`: Evidence that the claim is false or misleading - `provides_context`: Relevant background but doesn't directly verify/contradict - `inconclusive`: Cannot determine relationship to claim + - **Content Fetched** (optional): Whether the full article content was fetched via `web_url_read` (boolean, defaults to false) 4. **Categorize Search Outcome**: - `results_found`: Search returned relevant, actionable results @@ -990,12 +991,12 @@ Ensure your output strictly adheres to this schema. "type": "array", "items": { "type": "object", - "required": ["url", "source_name", "source_type", "relevant_excerpt", "relevance_to_claim"], + "required": ["url", "source_name", "source_type", "publication_date", "title", "relevant_excerpt", "relevance_to_claim"], "properties": { "url": { "type": "string" }, "source_name": { "type": "string" }, "source_type": { "type": "string", "enum": ["tier1_wire_service", "tier1_factchecker", "tier2_major_news", "tier3_regional_news", "official_source", "other"] }, - "publication_date": { "type": "string" }, + "publication_date": { "type": ["string", "null"] }, "title": { "type": "string" }, "relevant_excerpt": { "type": "string" }, "relevance_to_claim": { "type": "string", "enum": ["supports_claim", "contradicts_claim", "provides_context", "inconclusive"] }, diff --git a/prompts/stage_3/output_schema.json b/prompts/stage_3/output_schema.json index 1178bb6..cdb2929 100644 --- a/prompts/stage_3/output_schema.json +++ b/prompts/stage_3/output_schema.json @@ -385,7 +385,7 @@ "description": "Individual search results with full details.", "items": { "type": "object", - "required": ["url", "source_name", "source_type", "relevant_excerpt", "relevance_to_claim"], + "required": ["url", "source_name", "source_type", "publication_date", "title", "relevant_excerpt", "relevance_to_claim"], "properties": { "url": { "type": "string", @@ -401,8 +401,8 @@ "description": "Classification of source reliability tier." }, "publication_date": { - "type": "string", - "description": "Publication date in ISO 8601 format (YYYY-MM-DD) or 'unknown' if not available." + "type": ["string", "null"], + "description": "Publication date in ISO 8601 format (YYYY-MM-DD), or null if not available." }, "title": { "type": "string", diff --git a/src/processing_pipeline/stage_3/models.py b/src/processing_pipeline/stage_3/models.py index 593ceb2..36c2b7c 100644 --- a/src/processing_pipeline/stage_3/models.py +++ b/src/processing_pipeline/stage_3/models.py @@ -141,8 +141,8 @@ class SearchResult(BaseModel): "tier1_wire_service", "tier1_factchecker", "tier2_major_news", "tier3_regional_news", "official_source", "other" ] = Field(description="Classification of source reliability tier") - publication_date: str | None = Field(default=None, description="Publication date (YYYY-MM-DD) or None if unknown") - title: str | None = Field(default=None, description="Title or headline of the article/page") + publication_date: str | None = Field(description="Publication date in ISO 8601 format (YYYY-MM-DD), or null if not available") + title: str = Field(description="Title or headline of the article/page") relevant_excerpt: str = Field(description="Direct quote from the source relevant to the claim (50-200 words)") relevance_to_claim: Literal["supports_claim", "contradicts_claim", "provides_context", "inconclusive"] = Field( description="How this result relates to the claim being verified" From 03e05c15dddc6a5eaae4959a8ebfeb09e6f32a43 Mon Sep 17 00:00:00 2001 From: Quan Cao Date: Mon, 9 Feb 2026 15:57:06 +0700 Subject: [PATCH 10/13] Replace ambiguous percentage notation with integer-out-of-100 for confidence scores confidence_scores.overall is an integer 0-100, not a percentage. Updated all score references in analysis_prompt.md and system_instruction.md to use explicit integer values (e.g., "30 (out of 100)") instead of "30%". --- prompts/stage_3/analysis_prompt.md | 14 +++++++------- prompts/stage_3/system_instruction.md | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/prompts/stage_3/analysis_prompt.md b/prompts/stage_3/analysis_prompt.md index 1720bf6..7961efb 100644 --- a/prompts/stage_3/analysis_prompt.md +++ b/prompts/stage_3/analysis_prompt.md @@ -117,7 +117,7 @@ Perform the following steps: - relevance_to_claim: "contradicts_claim" ``` - **Without reliable sources contradicting the claim, maximum confidence score is 40%.** + **Without reliable sources contradicting the claim, maximum confidence score is 40 (out of 100).** ##### **C.1 Verification Evidence Documentation (MANDATORY)** @@ -293,7 +293,7 @@ No search results does NOT mean the claim is false. Unusual or surprising claims **CRITICAL: Claims about very recent events require special handling.** -Before assigning any confidence score above 30%, you MUST evaluate whether the claim qualifies as potentially unverifiable breaking news. +Before assigning any confidence score above 30 (out of 100), you MUST evaluate whether the claim qualifies as potentially unverifiable breaking news. **Step 1: Calculate Event Recency** @@ -319,11 +319,11 @@ Based on your verification results, apply the appropriate maximum confidence sco |---------------------|---------------|---------------------| | Contradictory evidence found (sources confirm the opposite) | 80-100 | `verified_false` | | Partial information found (some details confirmed false) | 40-79 | `uncertain` | -| No relevant results for claims within 24 hours of recording | **MAX 20%** | `insufficient_evidence` | -| No relevant results for claims 24-72 hours old | **MAX 30%** | `insufficient_evidence` | +| No relevant results for claims within 24 hours of recording | **MAX 20** | `insufficient_evidence` | +| No relevant results for claims 24-72 hours old | **MAX 30** | `insufficient_evidence` | | No relevant results for claims older than 72 hours | 1-40 | `insufficient_evidence` | -**THE GOLDEN RULE: For claims less than 72 hours old where no contradictory evidence is found, the MAXIMUM confidence score is 30%, regardless of how extraordinary the claim appears.** +**THE GOLDEN RULE: For claims less than 72 hours old where no contradictory evidence is found, the MAXIMUM confidence score is 30 (out of 100), regardless of how extraordinary the claim appears.** ##### **I. Required Self-Review Process** @@ -366,11 +366,11 @@ After completing your initial analysis, perform this structured review: - **Conflating source reputation with factual accuracy** (propaganda outlets can report real events with biased framing -- verify facts independently, do not assume content is fabricated because the source is biased) 5. **Breaking News Verification Checklist** - Before finalizing any score above 30%, answer these questions: + Before finalizing any score above 30 (out of 100), answer these questions: - [ ] Have I calculated the time delta between recording and current time? - [ ] Is this claim within the 72-hour breaking news window? - [ ] If within the breaking news window, did I find CONTRADICTORY evidence (not just absence of evidence)? - - [ ] If no contradictory evidence found for a recent claim, is my score capped at 30% or lower? + - [ ] If no contradictory evidence found for a recent claim, is my score capped at 30 or lower? - [ ] Have I included the required Breaking News Protocol note if applicable? - [ ] Have I documented ALL my search queries and results in `verification_evidence`? diff --git a/prompts/stage_3/system_instruction.md b/prompts/stage_3/system_instruction.md index 2b4c065..78fc9e9 100644 --- a/prompts/stage_3/system_instruction.md +++ b/prompts/stage_3/system_instruction.md @@ -26,7 +26,7 @@ These tools provide better access to reliable sources. Use them when available, **Conservative Confidence Scoring:** - **Evidence-Based Scoring:** High confidence scores (60+) REQUIRE tier-1 source URLs and direct excerpts that contradict the claim. Without documented contradictory evidence, maximum score is 40. - **Absence ≠ Falsity:** No search results means UNCERTAINTY, not disinformation. Score as `insufficient_evidence`, not `verified_false`. -- **Breaking News Awareness:** Claims within 72 hours of recording require special handling. If no contradictory evidence is found, maximum score is 30% (20% for claims within 24 hours). +- **Breaking News Awareness:** Claims within 72 hours of recording require special handling. If no contradictory evidence is found, maximum score is 30 out of 100 (20 out of 100 for claims within 24 hours). - **Verification Status Required:** Every analysis must include a `verification_status`: `verified_false` (evidence contradicts), `verified_true` (evidence confirms), `uncertain` (mixed/recent events), or `insufficient_evidence` (no relevant results). **Web Search Result Integrity:** From 5d45977718958830483574d4cbb92d08d71551ee Mon Sep 17 00:00:00 2001 From: Quan Cao Date: Mon, 9 Feb 2026 18:01:11 +0700 Subject: [PATCH 11/13] Add more instruction on content_fetched field --- prompts/stage_3/analysis_prompt.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/prompts/stage_3/analysis_prompt.md b/prompts/stage_3/analysis_prompt.md index 7961efb..a52b69d 100644 --- a/prompts/stage_3/analysis_prompt.md +++ b/prompts/stage_3/analysis_prompt.md @@ -115,6 +115,7 @@ Perform the following steps: - source_type: "tier1_wire_service" - relevant_excerpt: "[exact quote from article]" - relevance_to_claim: "contradicts_claim" + - content_fetched: true (because we used web_url_read in step 2) ``` **Without reliable sources contradicting the claim, maximum confidence score is 40 (out of 100).** @@ -147,7 +148,7 @@ For EVERY factual claim that could be verified or disproven, you MUST: - `contradicts_claim`: Evidence that the claim is false or misleading - `provides_context`: Relevant background but doesn't directly verify/contradict - `inconclusive`: Cannot determine relationship to claim - - **Content Fetched** (optional): Whether the full article content was fetched via `web_url_read` (boolean, defaults to false) + - **Content Fetched**: Set to `true` when you used `web_url_read` to retrieve the full page content (meaning `relevant_excerpt` is a direct quote from the article), or `false` when only the search snippet/metadata from `searxng_web_search` was available (meaning `relevant_excerpt` comes from the search result summary, not the full article) 4. **Categorize Search Outcome**: - `results_found`: Search returned relevant, actionable results From 6d4a6e14a9d56decaea772aca7dfa4d63910a3f2 Mon Sep 17 00:00:00 2001 From: Quan Cao Date: Mon, 9 Feb 2026 18:03:47 +0700 Subject: [PATCH 12/13] Fix stage 3 prompt paths to match new directory structure Update import script to reference stage 3 prompts from their new location in the prompts/stage_3/ subdirectory, aligning with the reorganized prompt directory structure. --- src/scripts/import_prompts_to_db.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/scripts/import_prompts_to_db.py b/src/scripts/import_prompts_to_db.py index e940b99..5321a71 100644 --- a/src/scripts/import_prompts_to_db.py +++ b/src/scripts/import_prompts_to_db.py @@ -40,9 +40,9 @@ "output_schema": "prompts/stage_1/preprocess/initial_detection_output_schema.json", }, PromptStage.STAGE_3: { - "system_instruction": "prompts/Stage_3_system_instruction.md", - "user_prompt": "prompts/Stage_3_analysis_prompt.md", - "output_schema": "prompts/Stage_3_output_schema.json", + "system_instruction": "prompts/stage_3/system_instruction.md", + "user_prompt": "prompts/stage_3/analysis_prompt.md", + "output_schema": "prompts/stage_3/output_schema.json", }, PromptStage.STAGE_4: { "system_instruction": "prompts/Stage_4_system_instruction.md", From 2639987df8d3c3b4af7fd85a7f3ddcc3d97918f6 Mon Sep 17 00:00:00 2001 From: Quan Cao Date: Mon, 9 Feb 2026 18:11:07 +0700 Subject: [PATCH 13/13] Remove unused custom transcription generator --- src/processing_pipeline/constants.py | 23 +-- src/processing_pipeline/stage_1/flows.py | 42 ++--- src/processing_pipeline/stage_1/tasks.py | 9 - .../timestamped_transcription_generator.py | 170 ------------------ tests/processing_pipeline/test_stage_1.py | 36 ---- 5 files changed, 19 insertions(+), 261 deletions(-) delete mode 100644 src/processing_pipeline/timestamped_transcription_generator.py diff --git a/src/processing_pipeline/constants.py b/src/processing_pipeline/constants.py index 52742c7..120ecab 100644 --- a/src/processing_pipeline/constants.py +++ b/src/processing_pipeline/constants.py @@ -41,36 +41,25 @@ class PromptStage(StrEnum): def get_user_prompt_for_stage_3(): - return open("prompts/Stage_3_analysis_prompt.md", "r").read() + return open("prompts/stage_3/analysis_prompt.md", "r").read() def get_system_instruction_for_stage_3(): - return open("prompts/Stage_3_system_instruction.md", "r").read() - + return open("prompts/stage_3/system_instruction.md", "r").read() def get_output_schema_for_stage_3(): - return json.load(open("prompts/Stage_3_output_schema.json", "r")) - - -def get_timestamped_transcription_generation_prompt(): - return open("prompts/Timestamped_transcription_generation_prompt.md", "r").read() - - -def get_timestamped_transcription_generation_output_schema(): - return json.load(open("prompts/Timestamped_transcription_generation_output_schema.json", "r")) + return json.load(open("prompts/stage_3/output_schema.json", "r")) def get_system_instruction_for_stage_4(): - return open("prompts/Stage_4_system_instruction.md", "r").read() - + return open("prompts/stage_4/system_instruction.md", "r").read() def get_user_prompt_for_stage_4(): - return open("prompts/Stage_4_review_prompt.md", "r").read() + return open("prompts/stage_4/review_prompt.md", "r").read() def get_output_schema_for_stage_4(): - return json.load(open("prompts/Stage_4_output_schema.json", "r")) - + return json.load(open("prompts/stage_4/output_schema.json", "r")) def get_gemini_timestamped_transcription_generation_prompt(): return open("prompts/Gemini_timestamped_transcription_generation_prompt.md", "r").read() diff --git a/src/processing_pipeline/stage_1/flows.py b/src/processing_pipeline/stage_1/flows.py index 974c58c..3afd752 100644 --- a/src/processing_pipeline/stage_1/flows.py +++ b/src/processing_pipeline/stage_1/flows.py @@ -9,7 +9,7 @@ from prefect.client.schemas import FlowRun, State from prefect.task_runners import ConcurrentTaskRunner -from processing_pipeline.constants import ProcessingStatus, PromptStage +from processing_pipeline.constants import GeminiModel, ProcessingStatus, PromptStage from processing_pipeline.stage_1.tasks import ( delete_stage_1_llm_responses, @@ -18,12 +18,12 @@ fetch_a_new_audio_file_from_supabase, fetch_audio_file_by_id, fetch_stage_1_llm_response_by_id, + get_audio_file_metadata, process_audio_file, reset_status_of_audio_files, reset_status_of_stage_1_llm_response, set_audio_file_status, set_status_of_stage_1_llm_response, - transcribe_audio_file_with_custom_timestamped_transcription_generator, transcribe_audio_file_with_timestamp_with_gemini, update_stage_1_llm_response_detection_result, update_stage_1_llm_response_timestamped_transcription, @@ -219,18 +219,9 @@ def regenerate_timestamped_transcript(stage_1_llm_response_ids): if stage_1_llm_response: print(f"Found stage 1 llm response {id}") - # Get metadata of the transcription audio_file = stage_1_llm_response["audio_file"] local_file = download_audio_file_from_s3(s3_client, audio_file["file_path"]) - recorded_at = datetime.strptime(audio_file["recorded_at"], "%Y-%m-%dT%H:%M:%S+00:00") - metadata = { - "radio_station_name": audio_file["radio_station_name"], - "radio_station_code": audio_file["radio_station_code"], - "location": {"state": audio_file["location_state"], "city": audio_file["location_city"]}, - "recorded_at": recorded_at.strftime("%B %-d, %Y %-I:%M %p"), - "recording_day_of_week": recorded_at.strftime("%A"), - "time_zone": "UTC", - } + metadata = get_audio_file_metadata(audio_file) initial_detection_result = stage_1_llm_response["initial_detection_result"] or {} flagged_snippets = initial_detection_result.get("flagged_snippets", []) @@ -238,32 +229,25 @@ def regenerate_timestamped_transcript(stage_1_llm_response_ids): if len(flagged_snippets) == 0: print("No flagged snippets found during the initial detection phase.") else: - try: - transcriptor = "gemini-1206" - timestamped_transcription = transcribe_audio_file_with_timestamp_with_gemini( - gemini_client=gemini_client, - audio_file=local_file, - prompt_version=transcription_prompt_version, - ) - except ValueError as e: - print( - f"Failed to transcribe the audio file with Gemini 2.5 Pro: {e}\n" - "Falling back to the custom timestamped-transcript generator" - ) - transcriptor = "custom" - timestamped_transcription = transcribe_audio_file_with_custom_timestamped_transcription_generator( - local_file - ) + # Timestamped transcription + transcriptor = GeminiModel.GEMINI_FLASH_LATEST + timestamped_transcription = transcribe_audio_file_with_timestamp_with_gemini( + gemini_client=gemini_client, + audio_file=local_file, + prompt_version=transcription_prompt_version, + model_name=transcriptor, + ) update_stage_1_llm_response_timestamped_transcription( supabase_client, id, timestamped_transcription, transcriptor ) - print("Processing the timestamped transcription with Gemini 2.5 Pro") + # Main detection detection_result = disinformation_detection_with_gemini( gemini_client=gemini_client, timestamped_transcription=timestamped_transcription["timestamped_transcription"], metadata=metadata, prompt_version=detection_prompt_version, + model_name=GeminiModel.GEMINI_FLASH_LATEST, ) print(f"Detection result:\n{json.dumps(detection_result, indent=2)}\n") update_stage_1_llm_response_detection_result(supabase_client, id, detection_result) diff --git a/src/processing_pipeline/stage_1/tasks.py b/src/processing_pipeline/stage_1/tasks.py index ea45d5a..6174624 100644 --- a/src/processing_pipeline/stage_1/tasks.py +++ b/src/processing_pipeline/stage_1/tasks.py @@ -14,7 +14,6 @@ Stage1PreprocessTranscriptionExecutor, ) from processing_pipeline.supabase_utils import SupabaseClient -from processing_pipeline.timestamped_transcription_generator import TimestampedTranscriptionGenerator from utils import optional_task @@ -186,14 +185,6 @@ def transcribe_audio_file_with_timestamp_with_gemini( return {"timestamped_transcription": timestamped_transcription} -@optional_task(log_prints=True) -def transcribe_audio_file_with_custom_timestamped_transcription_generator(audio_file): - print(f"Transcribing the audio file {audio_file} with the custom timestamped-transcription-generator") - gemini_key = os.getenv("GOOGLE_GEMINI_KEY") - timestamped_transcription = TimestampedTranscriptionGenerator.run(audio_file, gemini_key, 10) - return {"timestamped_transcription": timestamped_transcription} - - @optional_task(log_prints=True, retries=3) def disinformation_detection_with_gemini( gemini_client: genai.Client | None, diff --git a/src/processing_pipeline/timestamped_transcription_generator.py b/src/processing_pipeline/timestamped_transcription_generator.py deleted file mode 100644 index 866665a..0000000 --- a/src/processing_pipeline/timestamped_transcription_generator.py +++ /dev/null @@ -1,170 +0,0 @@ -import json -import os -import pathlib -from pydub import AudioSegment -from google import genai -from google.genai.types import ( - SafetySetting, - HarmCategory, - HarmBlockThreshold, - Part, - ThinkingConfig, - GenerateContentConfig, -) -from processing_pipeline.constants import ( - GeminiModel, - get_timestamped_transcription_generation_output_schema, - get_timestamped_transcription_generation_prompt, -) - - -class TimestampedTranscriptionGenerator: - - SYSTEM_INSTRUCTION = ( - "You are a specialized language model designed to transcribe audio content in multiple languages." - ) - USER_PROMPT = get_timestamped_transcription_generation_prompt() - OUTPUT_SCHEMA = get_timestamped_transcription_generation_output_schema() - - @classmethod - def run(cls, audio_file, gemini_key, segment_length): - print("Splitting the file into 2 equal parts...") - first_part, second_part = cls.split_file_into_two_parts(audio_file, segment_length) - - try: - print("Splitting the first part into segments...") - first_part_segments = cls.split_file_into_segments(first_part, segment_length * 1000) - try: - print("Transcribing the first part...") - first = cls.transcribe_segments(first_part_segments, gemini_key) - finally: - print("Removing the first part segments...") - for s in first_part_segments: - os.remove(s) - - print("Splitting the second part into segments...") - second_part_segments = cls.split_file_into_segments(second_part, segment_length * 1000) - try: - print("Transcribing the second part...") - second = cls.transcribe_segments(second_part_segments, gemini_key) - finally: - print("Removing the second part segments...") - for s in second_part_segments: - os.remove(s) - finally: - print("Removing the two audio parts...") - os.remove(first_part) - os.remove(second_part) - - print("Combining segments from both parts...") - segments = first + second - print("Extracting the transcriptions from the segments...") - segment_transcripts = [segment["transcript"] for segment in segments] - - print("Formatting the transcriptions into a timestamped transcription...") - return cls.build_timestamped_transcription(segment_transcripts, segment_length) - - @classmethod - def transcribe_segments(cls, audio_segments, gemini_key): - if not gemini_key: - raise ValueError("Google Gemini API key was not set!") - - if not audio_segments: - raise ValueError("No audio segments provided!") - - client = genai.Client(api_key=gemini_key) - - segments = [] - for index, segment_path in enumerate(audio_segments): - segments.extend( - [ - f"\n\n", - Part.from_bytes(data=pathlib.Path(segment_path).read_bytes(), mime_type="audio/mp3"), - f"\n\n\n", - ] - ) - - result = client.models.generate_content( - model=GeminiModel.GEMINI_FLASH_LATEST, - contents=[cls.USER_PROMPT] + segments, - config=GenerateContentConfig( - response_mime_type="application/json", - response_schema=cls.OUTPUT_SCHEMA, - system_instruction=cls.SYSTEM_INSTRUCTION, - max_output_tokens=16384, - thinking_config=ThinkingConfig(thinking_budget=1024), - safety_settings=[ - SafetySetting( - category=HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, - threshold=HarmBlockThreshold.BLOCK_NONE, - ), - SafetySetting( - category=HarmCategory.HARM_CATEGORY_HATE_SPEECH, - threshold=HarmBlockThreshold.BLOCK_NONE, - ), - SafetySetting( - category=HarmCategory.HARM_CATEGORY_HARASSMENT, - threshold=HarmBlockThreshold.BLOCK_NONE, - ), - SafetySetting( - category=HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, - threshold=HarmBlockThreshold.BLOCK_NONE, - ), - ], - ), - ) - return json.loads(result.text)["segments"] - - @classmethod - def build_timestamped_transcription(cls, segment_transcriptions, segment_length): - result = "" - for i, transcription in enumerate(segment_transcriptions): - # Convert the segment duration to minutes and seconds - minutes = segment_length * i // 60 - seconds = segment_length * i % 60 - - result += f"[{minutes:02}:{seconds:02}] {transcription}\n" - - return result - - @classmethod - def split_file_into_segments(cls, audio_file, segment_length_ms): - audio = AudioSegment.from_mp3(audio_file) - segments = [] - - for i in range(0, len(audio), segment_length_ms): - # Slice the audio segment - subclip = audio[i : i + segment_length_ms] - - # Export the subclip - output_file = f"{audio_file}_segment_{(i // segment_length_ms) + 1}.mp3" - subclip.export(output_file, format="mp3") - - segments.append(output_file) - - del audio - return segments - - @classmethod - def split_file_into_two_parts(cls, file, segment_length): - audio = AudioSegment.from_mp3(file) - half_length = len(audio) // 2 - segment_length_ms = segment_length * 1000 - - # Round down the half_length to the nearest multiple of segment_length_ms - split_point = int((half_length // segment_length_ms) * segment_length_ms) - print(f"Split point is at {split_point} milliseconds") - - first_part = audio[:split_point] - second_part = audio[split_point:] - - # Export the parts - first_part.export(f"{file}_part_1.mp3", format="mp3") - second_part.export(f"{file}_part_2.mp3", format="mp3") - - # Release the memory - del audio - del first_part - del second_part - - return f"{file}_part_1.mp3", f"{file}_part_2.mp3" diff --git a/tests/processing_pipeline/test_stage_1.py b/tests/processing_pipeline/test_stage_1.py index 99873df..06d9817 100644 --- a/tests/processing_pipeline/test_stage_1.py +++ b/tests/processing_pipeline/test_stage_1.py @@ -11,7 +11,6 @@ fetch_stage_1_llm_response_by_id, download_audio_file_from_s3, transcribe_audio_file_with_timestamp_with_gemini, - transcribe_audio_file_with_custom_timestamped_transcription_generator, disinformation_detection_with_gemini, insert_stage_1_llm_response, process_audio_file, @@ -157,17 +156,6 @@ def test_transcribe_with_timestamp_with_gemini_success(self, mock_environment): model_name=GeminiModel.GEMINI_FLASH_LATEST ) - def test_transcribe_with_custom_generator_success(self, mock_environment): - """Test successful transcription with custom generator""" - with patch("processing_pipeline.stage_1.TimestampedTranscriptionGenerator") as mock_generator: - mock_generator.run.return_value = "Test timestamped transcription" - - result = transcribe_audio_file_with_custom_timestamped_transcription_generator("test.mp3") - - assert result["timestamped_transcription"] == "Test timestamped transcription" - mock_generator.run.assert_called_once() - - class TestDetectionFunctions: def test_disinformation_detection_success(self, mock_environment): @@ -685,15 +673,6 @@ def test_undo_disinformation_detection_no_responses(self, mock_supabase_client): mock_supabase_client.reset_audio_file_status.assert_called_once_with([1, 2]) mock_supabase_client.delete_stage_1_llm_responses.assert_called_once_with([1, 2]) - @patch("processing_pipeline.stage_1.TimestampedTranscriptionGenerator") - def test_transcribe_audio_file_with_custom_generator_error(self, mock_generator): - """Test custom generator with error""" - mock_generator.run.side_effect = Exception("Generation failed") - - with pytest.raises(Exception, match="Generation failed"): - transcribe_audio_file_with_custom_timestamped_transcription_generator("test.mp3") - - def test_initial_disinformation_detection_specific_file(self, mock_supabase_client, mock_s3_client): """Test initial disinformation detection with specific file""" @@ -715,21 +694,6 @@ def test_initial_disinformation_detection_specific_file(self, mock_supabase_clie mock_supabase_client.get_audio_file_by_id.assert_called_once_with(1) mock_s3_client.download_file.assert_called_once() - def test_transcribe_audio_file_with_custom_generator_multiple_segments(self, mock_supabase_client): - """Test transcription with custom generator handling multiple segments""" - with patch("processing_pipeline.stage_1.TimestampedTranscriptionGenerator") as mock_generator: - # Mock generator to return transcription with multiple segments - mock_generator.run.return_value = ( - "[00:00] First segment.\n" "[00:10] Second segment.\n" "[00:20] Third segment.\n" - ) - - result = transcribe_audio_file_with_custom_timestamped_transcription_generator("test.mp3") - - assert isinstance(result, dict) - assert "timestamped_transcription" in result - assert len(result["timestamped_transcription"].split("\n")) == 4 # 3 segments + empty line - mock_generator.run.assert_called_once_with("test.mp3", os.getenv("GOOGLE_GEMINI_KEY"), 10) - def test_disinformation_detection_with_unicode_handling(self, mock_supabase_client, mock_genai): """Test disinformation detection with Unicode characters""" timestamped_transcription = "Test transcription with Unicode: áéíóú ñ"