diff --git a/.gemini/settings.json b/.gemini/settings.json index 5d54c5e..63f3450 100644 --- a/.gemini/settings.json +++ b/.gemini/settings.json @@ -15,7 +15,8 @@ "SEARXNG_URL": "$SEARXNG_URL" }, "includeTools": ["searxng_web_search", "web_url_read"], - "trust": true + "trust": true, + "timeout": 60000 } } } diff --git a/prompts/Stage_3_system_instruction.md b/prompts/Stage_3_system_instruction.md deleted file mode 100644 index 7c4b9ff..0000000 --- a/prompts/Stage_3_system_instruction.md +++ /dev/null @@ -1,11 +0,0 @@ -**Role Definition:** -You are an advanced language model specialized in in-depth disinformation and political content analysis, capable of processing audio inputs in multiple languages, with a focus on Spanish and Arabic as spoken by immigrant communities in the USA. Your expertise includes transcription, translation, and comprehensive content analysis, capturing nuances such as tone, emotion, political orientation, and cultural context, including rigorous evaluation of demonstrably false claims and careful assessment of political orientation based solely on observable content. - -**General Guidelines:** -- **Accuracy:** Ensure precise transcription and translation, preserving the original meaning and cultural nuances. -- **Cultural Sensitivity:** Be mindful of cultural contexts, idiomatic expressions, and dialects specific to Spanish and Arabic-speaking immigrant communities especially. -- **Evidence-Based Analysis:** Verify all factual claims using Grounding with Google Search before scoring. High scores (80-100) require strong contradictory evidence from reputable sources, not absence of confirmation. Base all conclusions on demonstrably false claims that can be definitively proven incorrect with cited sources. -- **Searching Guidelines:** When verifying factual claims, prioritize information that matches or is recent to the recording date of the provided audio file. Do not rely on old or unrelated information that predates the recording. If the recording references recent events or claims about current status, search for the most up-to-date information available. -- **Objectivity:** Maintain strict neutrality by focusing solely on verifiable facts rather than assumptions or inferences. Distinguish between controversial content and demonstrably false claims. -- **Self-Review:** Systematically validate all assessments through structured self-review, adjusting scores when specific evidence cannot be cited. -- **Structured Output:** All output must strictly conform to the provided JSON schema. \ No newline at end of file diff --git a/prompts/Timestamped_transcription_generation_output_schema.json b/prompts/Timestamped_transcription_generation_output_schema.json deleted file mode 100644 index 9bb04eb..0000000 --- a/prompts/Timestamped_transcription_generation_output_schema.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "type": "object", - "required": ["segments"], - "properties": { - "segments": { - "type": "array", - "items": { - "type": "object", - "required": ["segment_number", "transcript"], - "properties": { - "segment_number": { - "type": "integer", - "description": "The audio segment number" - }, - "transcript": { - "type": "string", - "description": "The transcript of the audio segment" - } - } - } - } - } -} diff --git a/prompts/Timestamped_transcription_generation_prompt.md b/prompts/Timestamped_transcription_generation_prompt.md deleted file mode 100644 index ac3b0f8..0000000 --- a/prompts/Timestamped_transcription_generation_prompt.md +++ /dev/null @@ -1,34 +0,0 @@ -You will be provided with a list of audio segments from a larger audio file. Please transcribe each segment carefully and return a JSON object following the specified schema. -It is critical that you transcribe each segment independently, without considering the content of other segments. Each transcript should be solely based on the audio within its corresponding segment. - -Here is the JSON schema for the output: - -```json -{ - "segments": [ - { - "segment": 1, - "transcript": "..." - }, - { - "segment": 2, - "transcript": "..." - } - // ...transcripts of additional segments - ] -} -``` - -Additional Notes: - -- Thoroughly review each segment’s transcript for completeness and accuracy before finalizing it. -- Check for grammatical errors and misheard words to ensure that the transcripts are accurate. -- If certain parts of the audio are unclear or inaudible, indicate this in the transcript using placeholders such as [inaudible], [unclear], [noise], [music], etc. -- It is possible that the audio contains multiple languages mixed together. - -Important Notes: - -- Transcribe each segment independently, without considering the content of other segments. -- Ensure that the segment_number for each transcript in the JSON output matches the corresponding audio segment you are transcribing. Double-check that the segment numbers are accurate and sequential. - -Now proceed to transcribe the provided audio segments. diff --git a/prompts/Stage_3_analysis_prompt.md b/prompts/stage_3/analysis_prompt.md similarity index 82% rename from prompts/Stage_3_analysis_prompt.md rename to prompts/stage_3/analysis_prompt.md index 6c29228..a52b69d 100644 --- a/prompts/Stage_3_analysis_prompt.md +++ b/prompts/stage_3/analysis_prompt.md @@ -77,11 +77,128 @@ Perform the following steps: - Verify that the transcription matches the audio precisely. - Confirm that the translation accurately reflects the transcription. -- **Ensure Factual Accuracy:** - - **Search**: Use the web search tool to fact-check claims, ensuring search queries match the audio's recording date, time, and location for contextual accuracy. - - **Fetch**: For important claims, use the content fetch tool to read full articles from authoritative sources (news outlets, fact-checkers, official statements). Do not rely solely on search snippets for high-stakes determinations. - - **Verify**: Look for multiple sources to corroborate or contradict claims. Avoid using information from different time periods (e.g., if the audio is from 2025, don't reference data from 2000). - - **Apply**: Incorporate your findings into the analysis that follows. +- **Ensure Factual Accuracy Using Web Search:** + + **Preferred Tools:** + - **`searxng_web_search`** - Use this to search for relevant URLs from reliable sources. + - **`web_url_read`** - Use this to read full article content and extract exact quotes. + + **Two-Step Verification Process:** + + **Step 1: Search for Relevant Sources** + Use `searxng_web_search` (or other available search tools) to find URLs: + - Search for the specific claim (e.g., "Maduro captured January 2026") + - Search for related context (e.g., "US Venezuela military operations 2026") + - Search fact-checker sites (e.g., "site:snopes.com [topic]" or "site:reuters.com [topic]") + - Ensure search queries match the audio's recording date and location for contextual accuracy. + + **Step 2: Read Full Content from URLs** + For promising URLs found, use `web_url_read` to: + - Read the full article content (not just search snippets) + - Extract EXACT QUOTES (not paraphrases) as `relevant_excerpt` + - Note the publication date + - Classify the source tier (tier1_wire_service, tier1_factchecker, tier2_major_news, etc.) + + **CRITICAL:** You must document actual search results in `verification_evidence`. Do not invent or imagine what sources say. + + **Example Workflow:** + ``` + 1. searxng_web_search("Maduro captured US forces 2026") + -> Found: https://reuters.com/article/..., https://apnews.com/... + + 2. web_url_read("https://reuters.com/article/...") + -> Extract: "Reuters reports that as of [date], Venezuelan President Nicolás Maduro remains in power..." + + 3. Document in verification_evidence: + - url: "https://reuters.com/article/..." + - source_name: "Reuters" + - source_type: "tier1_wire_service" + - relevant_excerpt: "[exact quote from article]" + - relevance_to_claim: "contradicts_claim" + - content_fetched: true (because we used web_url_read in step 2) + ``` + + **Without reliable sources contradicting the claim, maximum confidence score is 40 (out of 100).** + +##### **C.1 Verification Evidence Documentation (MANDATORY)** + +**CRITICAL: All verification activities must be fully documented in the `verification_evidence` field of your output.** + +**Search and Documentation Protocol:** + +For EVERY factual claim that could be verified or disproven, you MUST: + +1. **Execute a Search**: Use the web search tool with a specific, well-formed query +2. **Record the Query**: Document the exact search query used +3. **Document ALL Results**: For each search result, record: + - **URL**: The complete URL of the source (REQUIRED) + - **Source Name**: The publication name (e.g., "Reuters", "Associated Press", "BBC News") + - **Source Type Classification**: + - `tier1_wire_service`: AP, Reuters, AFP, EFE, UPI + - `tier1_factchecker`: Snopes, PolitiFact, FactCheck.org, Full Fact, AFP Fact Check, Chequeado, Cotejo.info + - `tier2_major_news`: CNN, BBC, NPR, NYT, Washington Post, The Guardian, BBC Mundo, DW + - `tier3_regional_news`: Local newspapers, regional TV stations, El Nacional, Efecto Cocuyo + - `official_source`: Government websites (.gov), official institutional sites + - `other`: All other sources + - **Publication Date**: When the article was published in YYYY-MM-DD format, or null if not available (critical for time-sensitive claims) + - **Title**: The headline or title of the source + - **Relevant Excerpt**: A DIRECT QUOTE (50-200 words) from the source that relates to the claim. Do NOT paraphrase - copy the exact text. + - **Relevance Assessment**: How this result relates to the claim: + - `supports_claim`: Evidence that the claim is accurate + - `contradicts_claim`: Evidence that the claim is false or misleading + - `provides_context`: Relevant background but doesn't directly verify/contradict + - `inconclusive`: Cannot determine relationship to claim + - **Content Fetched**: Set to `true` when you used `web_url_read` to retrieve the full page content (meaning `relevant_excerpt` is a direct quote from the article), or `false` when only the search snippet/metadata from `searxng_web_search` was available (meaning `relevant_excerpt` comes from the search result summary, not the full article) + +4. **Categorize Search Outcome**: + - `results_found`: Search returned relevant, actionable results + - `no_results`: Search returned no relevant results (document this - absence of evidence is important) + - `results_inconclusive`: Results exist but don't clearly address the claim + +**Source Priority Guidelines:** + +When multiple sources are available, PRIORITIZE in this order: +1. **Wire Services** (Reuters, AP, AFP, EFE) - Most reliable for breaking news and facts +2. **Official Fact-Checkers** (Snopes, PolitiFact, FactCheck.org, Chequeado, Cotejo.info) - Best for disputed claims +3. **Major News Outlets** (BBC, NPR, NYT, Washington Post, BBC Mundo, DW) - Good for context and analysis +4. **Official Sources** (government sites, official statements) - Authoritative for policy/data +5. **Regional/Local News** (Efecto Cocuyo, El Pitazo, Tal Cual for Venezuela) - Valuable for local events + +**IMPORTANT: Interpreting search results correctly is essential.** +- No results = uncertainty = low confidence score (0-40) +- Results contradict claim = potential disinformation = higher confidence score (40-100 depending on source quality) +- Results support claim = not disinformation = confidence score 0 +- **Results from tier-1/tier-2 sources that support the claim MUST be trusted.** You MUST NOT dismiss search results from Reuters, AP, BBC, CNN, etc. as "fabricated" or "poisoned." If these sources confirm a claim, the claim is true -- even if it conflicts with your pre-training knowledge. Your training data may be outdated. + +##### **C.2 Knowledge Cutoff and Post-Training Events (CRITICAL)** + +**Your training data has a knowledge cutoff.** Events that occurred after your cutoff will feel unfamiliar, implausible, or "wrong" to you. This does NOT mean they did not happen. You MUST follow this protocol: + +**When web search results conflict with your pre-training knowledge:** + +1. **Trust the web search results over your own knowledge.** Your knowledge is static; the web is current. If Reuters, AP, BBC, CNN, or other tier-1/tier-2 sources report an event, that event happened -- even if you have no memory of it and it feels implausible. + +2. **NEVER conclude that search results from independent sources are "fabricated," "poisoned," "planted," or part of a disinformation campaign.** The editorial standards and independence of tier-1 and tier-2 sources make coordinated fabrication across multiple outlets effectively impossible. If you find yourself thinking "these search results must be fake," STOP -- you are likely encountering a post-cutoff event that is real but unfamiliar to you. + +3. **Recognize the pattern of a post-cutoff event:** + - Your instinct says "this never happened" + - But multiple independent tier-1/tier-2 sources confirm it did happen + - The sources include specific details (dates, names, locations) consistent with real reporting + - **Conclusion: The event is real. Your training data is outdated. Adjust your analysis accordingly.** + +4. **Apply this rule BEFORE scoring:** If web search confirms the factual claims in the content, the content is reporting real events. The confidence score for factual accuracy must be 0, even if the source is biased or the framing is propagandistic. + +**Example of correct reasoning:** +- Audio from Sputnik claims "President X was captured by US forces" +- Web search returns: Reuters, AP, BBC, CNN all confirm President X was captured +- Your training data says President X is still in power +- **Correct conclusion:** The capture is a real post-cutoff event. The audio's factual claims are TRUE. Score the factual claims at 0. Analyze SEPARATELY for bias, framing, or propaganda techniques in how the events are presented. + +**Example of INCORRECT reasoning (DO NOT DO THIS):** +- Same scenario as above +- "The search results must be fabricated because this contradicts known reality" +- "This is a sophisticated disinformation campaign that has poisoned search indexes" +- **This is WRONG.** You are substituting your outdated training knowledge for current web evidence. ##### **D. Summary and Explanation** @@ -135,7 +252,7 @@ The confidence score represents your degree of certainty that the content contai **Verification Requirement:** -Before assigning confidence scores, verify all factual claims using the available web search tool. For claims that significantly impact the confidence score, fetch and read the full content of authoritative sources rather than relying on search snippets alone. The verification outcome determines the maximum possible score. +Before assigning confidence scores, verify all factual claims using web search. The verification outcome determines the maximum possible score. **Scoring Framework Based on Verification:** @@ -173,6 +290,42 @@ Example: "We need stricter immigration policies" No search results does NOT mean the claim is false. Unusual or surprising claims can be true. Recent events may have limited coverage. When you cannot find contradictory information, score conservatively (0-40), not as disinformation (80-100). +##### **H.1 Breaking News and Recent Events Protocol** + +**CRITICAL: Claims about very recent events require special handling.** + +Before assigning any confidence score above 30 (out of 100), you MUST evaluate whether the claim qualifies as potentially unverifiable breaking news. + +**Step 1: Calculate Event Recency** + +Compare the following two timestamps: +1. **Recording Timestamp**: The `recorded_at` field from the audio metadata +2. **Current Timestamp**: The current date and time provided in the prompt + +Calculate the time difference. If the recording was made within the past **72 hours**, the content may reference breaking news that has not yet been indexed. + +**Step 2: Identify Time-Sensitive Claims** + +A claim is considered "time-sensitive" if it: +- Reports a specific event allegedly occurring within the past 72 hours +- Describes actions by named individuals or organizations as currently happening or just completed +- Claims something has "just happened," is "breaking," or uses similar urgent language +- References events that, if true, would be major news (arrests, deaths, military actions, political developments) + +**Step 3: Apply Breaking News Confidence Caps** + +Based on your verification results, apply the appropriate maximum confidence score: + +| Verification Outcome | Maximum Score | Verification Status | +|---------------------|---------------|---------------------| +| Contradictory evidence found (sources confirm the opposite) | 80-100 | `verified_false` | +| Partial information found (some details confirmed false) | 40-79 | `uncertain` | +| No relevant results for claims within 24 hours of recording | **MAX 20** | `insufficient_evidence` | +| No relevant results for claims 24-72 hours old | **MAX 30** | `insufficient_evidence` | +| No relevant results for claims older than 72 hours | 1-40 | `insufficient_evidence` | + +**THE GOLDEN RULE: For claims less than 72 hours old where no contradictory evidence is found, the MAXIMUM confidence score is 30 (out of 100), regardless of how extraordinary the claim appears.** + ##### **I. Required Self-Review Process** After completing your initial analysis, perform this structured review: @@ -201,6 +354,7 @@ After completing your initial analysis, perform this structured review: - Adjust scores to match available evidence - Document reasoning for any score changes - Ensure final scores reflect only demonstrably false content + - **MANDATORY CHECK:** If you scored above 0 but web search results from tier-1/tier-2 sources SUPPORT the claims, you MUST reduce the score to 0. Do NOT dismiss web search evidence by hypothesizing that results are "fabricated" or "poisoned." If multiple independent sources confirm a claim, the claim is true. 4. **Common Error Check** Review for these frequent mistakes: @@ -209,6 +363,23 @@ After completing your initial analysis, perform this structured review: - Treating bias as equivalent to disinformation - Scoring based on disagreement rather than falsity - Treating "no search results" as evidence of falsity (it is uncertainty) + - **Dismissing web search results that conflict with your pre-training knowledge** (post-cutoff events are real even if unfamiliar to you) + - **Conflating source reputation with factual accuracy** (propaganda outlets can report real events with biased framing -- verify facts independently, do not assume content is fabricated because the source is biased) + +5. **Breaking News Verification Checklist** + Before finalizing any score above 30 (out of 100), answer these questions: + - [ ] Have I calculated the time delta between recording and current time? + - [ ] Is this claim within the 72-hour breaking news window? + - [ ] If within the breaking news window, did I find CONTRADICTORY evidence (not just absence of evidence)? + - [ ] If no contradictory evidence found for a recent claim, is my score capped at 30 or lower? + - [ ] Have I included the required Breaking News Protocol note if applicable? + - [ ] Have I documented ALL my search queries and results in `verification_evidence`? + +6. **Evidence Documentation Check** + - [ ] Did I record URLs for all search results? + - [ ] Did I include direct excerpts (not paraphrases) from sources? + - [ ] Did I classify each source by tier (tier1_wire_service, tier1_factchecker, etc.)? + - [ ] For scores 60+, did I use `web_url_read` to read full article content? ##### **J. Emotional Tone Analysis** The emotional tone analysis identifies and measures emotions expressed in the content. Like our confidence scoring, this requires evidence-based assessment: @@ -403,6 +574,42 @@ Document your analytical reasoning process in the `thought_summaries` field. Thi - Include specific examples from the content that informed your analysis - Document any score adjustments and why they were made +##### **M. Verification Evidence** + +Your output MUST include this critical field for transparency and accountability: **`verification_evidence`** + +Document ALL web searches performed during fact-checking: +```json +{ + "verification_evidence": { + "searches_performed": [ + { + "query": "exact search query used", + "search_intent": "what claim this search verifies", + "result_status": "results_found | no_results | results_inconclusive", + "results": [ + { + "url": "https://example.com/article", + "source_name": "Reuters", + "source_type": "tier1_wire_service", + "publication_date": "2026-01-15", + "title": "Article headline", + "relevant_excerpt": "Excerpt from search result...", + "relevance_to_claim": "contradicts_claim" + } + ] + } + ], + "verification_summary": { + "total_searches": 3, + "claims_contradicted": 1, + "claims_unverifiable": 1, + "key_findings": "Summary of what verification revealed..." + } + } +} +``` + #### **3. Assemble Structured Output** Organize all the information into a structured output conforming to the provided OpenAPI JSON schema. @@ -429,7 +636,8 @@ Ensure your output strictly adheres to this schema. "confidence_scores", "emotional_tone", "political_leaning", - "thought_summaries" + "thought_summaries", + "verification_evidence" ], "properties": { "transcription": { @@ -560,12 +768,17 @@ Ensure your output strictly adheres to this schema. }, "confidence_scores": { "type": "object", - "required": ["overall", "analysis", "categories"], + "required": ["overall", "verification_status", "analysis", "categories"], "properties": { "overall": { "type": "integer", "description": "Overall confidence score of the analysis, ranging from 0 to 100." }, + "verification_status": { + "type": "string", + "enum": ["verified_false", "verified_true", "uncertain", "insufficient_evidence"], + "description": "Overall verification status based on evidence quality." + }, "analysis": { "type": "object", "required": ["claims", "validation_checklist", "score_adjustments"], @@ -760,6 +973,52 @@ Ensure your output strictly adheres to this schema. "thought_summaries": { "type": "string", "description": "A summary of your reasoning process, key observations, and analytical steps taken during the analysis." + }, + "verification_evidence": { + "type": "object", + "required": ["searches_performed", "verification_summary"], + "description": "Complete documentation of all web searches performed during fact-checking.", + "properties": { + "searches_performed": { + "type": "array", + "items": { + "type": "object", + "required": ["query", "search_intent", "result_status", "results"], + "properties": { + "query": { "type": "string" }, + "search_intent": { "type": "string" }, + "result_status": { "type": "string", "enum": ["results_found", "no_results", "results_inconclusive"] }, + "results": { + "type": "array", + "items": { + "type": "object", + "required": ["url", "source_name", "source_type", "publication_date", "title", "relevant_excerpt", "relevance_to_claim"], + "properties": { + "url": { "type": "string" }, + "source_name": { "type": "string" }, + "source_type": { "type": "string", "enum": ["tier1_wire_service", "tier1_factchecker", "tier2_major_news", "tier3_regional_news", "official_source", "other"] }, + "publication_date": { "type": ["string", "null"] }, + "title": { "type": "string" }, + "relevant_excerpt": { "type": "string" }, + "relevance_to_claim": { "type": "string", "enum": ["supports_claim", "contradicts_claim", "provides_context", "inconclusive"] }, + "content_fetched": { "type": "boolean" } + } + } + } + } + } + }, + "verification_summary": { + "type": "object", + "required": ["total_searches", "claims_contradicted", "claims_unverifiable", "key_findings"], + "properties": { + "total_searches": { "type": "integer" }, + "claims_contradicted": { "type": "integer" }, + "claims_unverifiable": { "type": "integer" }, + "key_findings": { "type": "string" } + } + } + } } } } @@ -1563,6 +1822,7 @@ Below is a complete example showing all required fields: }, "confidence_scores": { "overall": 92, + "verification_status": "verified_false", "analysis": { "claims": [ { @@ -1758,7 +2018,7 @@ By following these instructions and listening closely using the detailed heurist Please proceed to analyze the provided audio content following these guidelines: 1. Listen carefully to capture all spoken content. -2. Verify all factual claims using the web search tool before assigning scores (Section H), searching for information relevant to the recording datetime and/or current datetime. For important claims, use the content fetch tool to read full articles. +2. Verify all factual claims using web search before assigning scores (Section H), searching for information relevant to the recording datetime and/or current datetime. Use `web_url_read` to read full articles for important claims. 3. Apply the detailed heuristics for disinformation analysis. 4. Base political orientation assessment solely on observable content elements. 5. Document all findings with specific evidence from the content. diff --git a/prompts/Stage_3_heuristics.md b/prompts/stage_3/heuristics.md similarity index 100% rename from prompts/Stage_3_heuristics.md rename to prompts/stage_3/heuristics.md diff --git a/prompts/Stage_3_output_schema.json b/prompts/stage_3/output_schema.json similarity index 70% rename from prompts/Stage_3_output_schema.json rename to prompts/stage_3/output_schema.json index 619ce6a..cdb2929 100644 --- a/prompts/Stage_3_output_schema.json +++ b/prompts/stage_3/output_schema.json @@ -13,7 +13,8 @@ "confidence_scores", "emotional_tone", "political_leaning", - "thought_summaries" + "thought_summaries", + "verification_evidence" ], "properties": { "is_convertible": { @@ -148,12 +149,17 @@ }, "confidence_scores": { "type": "object", - "required": ["overall", "analysis", "categories"], + "required": ["overall", "verification_status", "analysis", "categories"], "properties": { "overall": { "type": "integer", "description": "Overall confidence score of the analysis, ranging from 0 to 100." }, + "verification_status": { + "type": "string", + "enum": ["verified_false", "verified_true", "uncertain", "insufficient_evidence"], + "description": "Overall verification status based on evidence quality. 'verified_false' requires contradictory evidence with URLs. 'insufficient_evidence' when no search results found." + }, "analysis": { "type": "object", "required": ["claims", "validation_checklist", "score_adjustments"], @@ -348,6 +354,102 @@ "thought_summaries": { "type": "string", "description": "A summary of your reasoning process, key observations, and analytical steps taken during the analysis." + }, + "verification_evidence": { + "type": "object", + "required": ["searches_performed", "verification_summary"], + "description": "Complete documentation of all web searches performed during fact-checking.", + "properties": { + "searches_performed": { + "type": "array", + "description": "Record of all web searches performed during fact-checking.", + "items": { + "type": "object", + "required": ["query", "search_intent", "result_status", "results"], + "properties": { + "query": { + "type": "string", + "description": "The exact search query used." + }, + "search_intent": { + "type": "string", + "description": "What claim or fact this search was attempting to verify." + }, + "result_status": { + "type": "string", + "enum": ["results_found", "no_results", "results_inconclusive"], + "description": "Whether the search returned actionable results." + }, + "results": { + "type": "array", + "description": "Individual search results with full details.", + "items": { + "type": "object", + "required": ["url", "source_name", "source_type", "publication_date", "title", "relevant_excerpt", "relevance_to_claim"], + "properties": { + "url": { + "type": "string", + "description": "Full URL of the source." + }, + "source_name": { + "type": "string", + "description": "Name of the publication or website (e.g., Reuters, AP News, BBC)." + }, + "source_type": { + "type": "string", + "enum": ["tier1_wire_service", "tier1_factchecker", "tier2_major_news", "tier3_regional_news", "official_source", "other"], + "description": "Classification of source reliability tier." + }, + "publication_date": { + "type": ["string", "null"], + "description": "Publication date in ISO 8601 format (YYYY-MM-DD), or null if not available." + }, + "title": { + "type": "string", + "description": "Title or headline of the article/page." + }, + "relevant_excerpt": { + "type": "string", + "description": "Direct quote from the source relevant to the claim (50-200 words). Do NOT paraphrase." + }, + "relevance_to_claim": { + "type": "string", + "enum": ["supports_claim", "contradicts_claim", "provides_context", "inconclusive"], + "description": "How this result relates to the claim being verified." + }, + "content_fetched": { + "type": "boolean", + "description": "Whether the full article content was fetched." + } + } + } + } + } + } + }, + "verification_summary": { + "type": "object", + "required": ["total_searches", "claims_contradicted", "claims_unverifiable", "key_findings"], + "properties": { + "total_searches": { + "type": "integer", + "description": "Total number of web searches performed." + }, + "claims_contradicted": { + "type": "integer", + "description": "Number of claims found to be false or misleading based on search evidence." + }, + "claims_unverifiable": { + "type": "integer", + "description": "Number of claims that could not be verified due to lack of information." + }, + "key_findings": { + "type": "string", + "description": "Narrative summary of the most important verification findings." + } + } + } + } } } } \ No newline at end of file diff --git a/prompts/stage_3/system_instruction.md b/prompts/stage_3/system_instruction.md new file mode 100644 index 0000000..78fc9e9 --- /dev/null +++ b/prompts/stage_3/system_instruction.md @@ -0,0 +1,36 @@ +**Role Definition:** +You are an advanced language model specialized in in-depth disinformation and political content analysis, capable of processing audio inputs in multiple languages, with a focus on Spanish and Arabic as spoken by immigrant communities in the USA. Your expertise includes transcription, translation, and comprehensive content analysis, capturing nuances such as tone, emotion, political orientation, and cultural context, including rigorous evaluation of demonstrably false claims and careful assessment of political orientation based solely on observable content. + +**General Guidelines:** +- **Accuracy:** Ensure precise transcription and translation, preserving the original meaning and cultural nuances. +- **Cultural Sensitivity:** Be mindful of cultural contexts, idiomatic expressions, and dialects specific to Spanish and Arabic-speaking immigrant communities especially. +- **Evidence-Based Analysis:** Verify all factual claims using web search before scoring. High scores (80-100) require strong contradictory evidence from reputable sources, not absence of confirmation. Base all conclusions on demonstrably false claims that can be definitively proven incorrect with cited sources. When web search results from multiple independent tier-1/tier-2 sources confirm a claim, you MUST accept those results as reflecting reality and assign a confidence score of 0 for that claim, regardless of whether the claim conflicts with your pre-training knowledge. +- **Searching Guidelines:** When verifying factual claims, prioritize information that matches or is recent to the recording date of the provided audio file. Do not rely on old or unrelated information that predates the recording. If the recording references recent events or claims about current status, search for the most up-to-date information available. +- **Knowledge Cutoff Awareness:** Your pre-training data has a knowledge cutoff date. Events that occurred after your training cutoff will feel unfamiliar or "wrong" to you, but they may be entirely real. You MUST NOT use your pre-training knowledge to override or dismiss web search results from established, independent sources. When web search results from tier-1 and tier-2 sources consistently confirm a claim, those results reflect reality -- even if the events feel implausible or unfamiliar to you. Your role is to verify claims against current web evidence, not against your training data. +- **Objectivity:** Maintain strict neutrality by focusing solely on verifiable facts rather than assumptions or inferences. Distinguish between controversial content and demonstrably false claims. +- **Self-Review:** Systematically validate all assessments through structured self-review, adjusting scores when specific evidence cannot be cited. +- **Structured Output:** All output must strictly conform to the provided JSON schema. + +**Verification Documentation (MANDATORY):** +- **Document All Searches:** ALWAYS record every web search in the `verification_evidence` field, including searches that return no results. Documenting the absence of evidence is critical for proper scoring. +- **Record Search Results:** For each search, document: query used, URLs found, source names, publication dates, and relevant excerpts (direct quotes, not paraphrases). +- **Source Classification:** Classify each source by tier: tier1_wire_service (Reuters, AP, AFP), tier1_factchecker (Snopes, PolitiFact), tier2_major_news (BBC, CNN, NYT), tier3_regional_news, official_source, or other. + +**Preferred Verification Tools:** +When searching for information to verify claims, prefer using these tools: +- **`searxng_web_search`** - For searching multiple sources and finding relevant URLs. +- **`web_url_read`** - For reading full article content and extracting exact quotes from URLs found. + +These tools provide better access to reliable sources. Use them when available, but other search tools may also be used if needed. + +**Conservative Confidence Scoring:** +- **Evidence-Based Scoring:** High confidence scores (60+) REQUIRE tier-1 source URLs and direct excerpts that contradict the claim. Without documented contradictory evidence, maximum score is 40. +- **Absence ≠ Falsity:** No search results means UNCERTAINTY, not disinformation. Score as `insufficient_evidence`, not `verified_false`. +- **Breaking News Awareness:** Claims within 72 hours of recording require special handling. If no contradictory evidence is found, maximum score is 30 out of 100 (20 out of 100 for claims within 24 hours). +- **Verification Status Required:** Every analysis must include a `verification_status`: `verified_false` (evidence contradicts), `verified_true` (evidence confirms), `uncertain` (mixed/recent events), or `insufficient_evidence` (no relevant results). + +**Web Search Result Integrity:** +- **NEVER dismiss search results from established independent sources.** When multiple tier-1 sources (Reuters, AP, AFP) and tier-2 sources (BBC, CNN, NYT, Wikipedia) independently report the same event, those reports reflect reality. You MUST NOT conclude that search results from these established sources are "fabricated," "poisoned," "planted," or part of a disinformation campaign. + +**Reliable Sources Requirement:** +You MUST use reliable, prestigious sources to verify claims. Without reliable sources explicitly contradicting a claim, you CANNOT assign high confidence that it is disinformation. \ No newline at end of file diff --git a/prompts/Stage_4_output_schema.json b/prompts/stage_4/output_schema.json similarity index 100% rename from prompts/Stage_4_output_schema.json rename to prompts/stage_4/output_schema.json diff --git a/prompts/Stage_4_review_prompt.md b/prompts/stage_4/review_prompt.md similarity index 100% rename from prompts/Stage_4_review_prompt.md rename to prompts/stage_4/review_prompt.md diff --git a/prompts/Stage_4_system_instruction.md b/prompts/stage_4/system_instruction.md similarity index 100% rename from prompts/Stage_4_system_instruction.md rename to prompts/stage_4/system_instruction.md diff --git a/src/processing_pipeline/constants.py b/src/processing_pipeline/constants.py index 52742c7..120ecab 100644 --- a/src/processing_pipeline/constants.py +++ b/src/processing_pipeline/constants.py @@ -41,36 +41,25 @@ class PromptStage(StrEnum): def get_user_prompt_for_stage_3(): - return open("prompts/Stage_3_analysis_prompt.md", "r").read() + return open("prompts/stage_3/analysis_prompt.md", "r").read() def get_system_instruction_for_stage_3(): - return open("prompts/Stage_3_system_instruction.md", "r").read() - + return open("prompts/stage_3/system_instruction.md", "r").read() def get_output_schema_for_stage_3(): - return json.load(open("prompts/Stage_3_output_schema.json", "r")) - - -def get_timestamped_transcription_generation_prompt(): - return open("prompts/Timestamped_transcription_generation_prompt.md", "r").read() - - -def get_timestamped_transcription_generation_output_schema(): - return json.load(open("prompts/Timestamped_transcription_generation_output_schema.json", "r")) + return json.load(open("prompts/stage_3/output_schema.json", "r")) def get_system_instruction_for_stage_4(): - return open("prompts/Stage_4_system_instruction.md", "r").read() - + return open("prompts/stage_4/system_instruction.md", "r").read() def get_user_prompt_for_stage_4(): - return open("prompts/Stage_4_review_prompt.md", "r").read() + return open("prompts/stage_4/review_prompt.md", "r").read() def get_output_schema_for_stage_4(): - return json.load(open("prompts/Stage_4_output_schema.json", "r")) - + return json.load(open("prompts/stage_4/output_schema.json", "r")) def get_gemini_timestamped_transcription_generation_prompt(): return open("prompts/Gemini_timestamped_transcription_generation_prompt.md", "r").read() diff --git a/src/processing_pipeline/stage_1/flows.py b/src/processing_pipeline/stage_1/flows.py index 974c58c..3afd752 100644 --- a/src/processing_pipeline/stage_1/flows.py +++ b/src/processing_pipeline/stage_1/flows.py @@ -9,7 +9,7 @@ from prefect.client.schemas import FlowRun, State from prefect.task_runners import ConcurrentTaskRunner -from processing_pipeline.constants import ProcessingStatus, PromptStage +from processing_pipeline.constants import GeminiModel, ProcessingStatus, PromptStage from processing_pipeline.stage_1.tasks import ( delete_stage_1_llm_responses, @@ -18,12 +18,12 @@ fetch_a_new_audio_file_from_supabase, fetch_audio_file_by_id, fetch_stage_1_llm_response_by_id, + get_audio_file_metadata, process_audio_file, reset_status_of_audio_files, reset_status_of_stage_1_llm_response, set_audio_file_status, set_status_of_stage_1_llm_response, - transcribe_audio_file_with_custom_timestamped_transcription_generator, transcribe_audio_file_with_timestamp_with_gemini, update_stage_1_llm_response_detection_result, update_stage_1_llm_response_timestamped_transcription, @@ -219,18 +219,9 @@ def regenerate_timestamped_transcript(stage_1_llm_response_ids): if stage_1_llm_response: print(f"Found stage 1 llm response {id}") - # Get metadata of the transcription audio_file = stage_1_llm_response["audio_file"] local_file = download_audio_file_from_s3(s3_client, audio_file["file_path"]) - recorded_at = datetime.strptime(audio_file["recorded_at"], "%Y-%m-%dT%H:%M:%S+00:00") - metadata = { - "radio_station_name": audio_file["radio_station_name"], - "radio_station_code": audio_file["radio_station_code"], - "location": {"state": audio_file["location_state"], "city": audio_file["location_city"]}, - "recorded_at": recorded_at.strftime("%B %-d, %Y %-I:%M %p"), - "recording_day_of_week": recorded_at.strftime("%A"), - "time_zone": "UTC", - } + metadata = get_audio_file_metadata(audio_file) initial_detection_result = stage_1_llm_response["initial_detection_result"] or {} flagged_snippets = initial_detection_result.get("flagged_snippets", []) @@ -238,32 +229,25 @@ def regenerate_timestamped_transcript(stage_1_llm_response_ids): if len(flagged_snippets) == 0: print("No flagged snippets found during the initial detection phase.") else: - try: - transcriptor = "gemini-1206" - timestamped_transcription = transcribe_audio_file_with_timestamp_with_gemini( - gemini_client=gemini_client, - audio_file=local_file, - prompt_version=transcription_prompt_version, - ) - except ValueError as e: - print( - f"Failed to transcribe the audio file with Gemini 2.5 Pro: {e}\n" - "Falling back to the custom timestamped-transcript generator" - ) - transcriptor = "custom" - timestamped_transcription = transcribe_audio_file_with_custom_timestamped_transcription_generator( - local_file - ) + # Timestamped transcription + transcriptor = GeminiModel.GEMINI_FLASH_LATEST + timestamped_transcription = transcribe_audio_file_with_timestamp_with_gemini( + gemini_client=gemini_client, + audio_file=local_file, + prompt_version=transcription_prompt_version, + model_name=transcriptor, + ) update_stage_1_llm_response_timestamped_transcription( supabase_client, id, timestamped_transcription, transcriptor ) - print("Processing the timestamped transcription with Gemini 2.5 Pro") + # Main detection detection_result = disinformation_detection_with_gemini( gemini_client=gemini_client, timestamped_transcription=timestamped_transcription["timestamped_transcription"], metadata=metadata, prompt_version=detection_prompt_version, + model_name=GeminiModel.GEMINI_FLASH_LATEST, ) print(f"Detection result:\n{json.dumps(detection_result, indent=2)}\n") update_stage_1_llm_response_detection_result(supabase_client, id, detection_result) diff --git a/src/processing_pipeline/stage_1/tasks.py b/src/processing_pipeline/stage_1/tasks.py index ea45d5a..6174624 100644 --- a/src/processing_pipeline/stage_1/tasks.py +++ b/src/processing_pipeline/stage_1/tasks.py @@ -14,7 +14,6 @@ Stage1PreprocessTranscriptionExecutor, ) from processing_pipeline.supabase_utils import SupabaseClient -from processing_pipeline.timestamped_transcription_generator import TimestampedTranscriptionGenerator from utils import optional_task @@ -186,14 +185,6 @@ def transcribe_audio_file_with_timestamp_with_gemini( return {"timestamped_transcription": timestamped_transcription} -@optional_task(log_prints=True) -def transcribe_audio_file_with_custom_timestamped_transcription_generator(audio_file): - print(f"Transcribing the audio file {audio_file} with the custom timestamped-transcription-generator") - gemini_key = os.getenv("GOOGLE_GEMINI_KEY") - timestamped_transcription = TimestampedTranscriptionGenerator.run(audio_file, gemini_key, 10) - return {"timestamped_transcription": timestamped_transcription} - - @optional_task(log_prints=True, retries=3) def disinformation_detection_with_gemini( gemini_client: genai.Client | None, diff --git a/src/processing_pipeline/stage_3.py b/src/processing_pipeline/stage_3.py deleted file mode 100644 index f15fc32..0000000 --- a/src/processing_pipeline/stage_3.py +++ /dev/null @@ -1,682 +0,0 @@ -from datetime import datetime, timezone -from http import HTTPStatus -import json -import os -import subprocess -import tempfile -import time -from typing import Any - -import boto3 -from google import genai -from google.genai import errors -from prefect.flows import Flow -from prefect.client.schemas import FlowRun, State -from pydantic import ValidationError - -from prefect.task_runners import ConcurrentTaskRunner -from google.genai.types import ( - File, - FinishReason, - GenerateContentConfig, - GoogleSearch, - ThinkingConfig, - Tool, -) -from processing_pipeline.supabase_utils import SupabaseClient -from processing_pipeline.processing_utils import ( - get_safety_settings, - postprocess_snippet, -) -from processing_pipeline.constants import ( - GeminiCLIEventType, - GeminiModel, - ProcessingStatus, - PromptStage, -) -from processing_pipeline.stage_3_models import Stage3Output -from utils import optional_flow, optional_task - - -@optional_task(log_prints=True, retries=3) -def fetch_a_specific_snippet_from_supabase(supabase_client, snippet_id): - response = supabase_client.get_snippet_by_id( - id=snippet_id, - select='*, audio_file(radio_station_name, radio_station_code, location_state, location_city, recorded_at, recording_day_of_week), stage_1_llm_response("detection_result")', - ) - if response: - return response - else: - print(f"Snippet with id {snippet_id} not found") - return None - - -@optional_task(log_prints=True, retries=3) -def fetch_a_new_snippet_from_supabase(supabase_client): - return __fetch_a_new_snippet_from_supabase(supabase_client) - - -def __fetch_a_new_snippet_from_supabase(supabase_client): - response = supabase_client.get_a_new_snippet_and_reserve_it() - if response: - print(f"Found a new snippet: {response['id']}") - return response - else: - print("No new snippets found") - return None - - -@optional_task(log_prints=True, retries=3) -def download_audio_file_from_s3(s3_client, r2_bucket_name, file_path): - return __download_audio_file_from_s3(s3_client, r2_bucket_name, file_path) - - -def __download_audio_file_from_s3(s3_client, r2_bucket_name, file_path): - file_name = os.path.basename(file_path) - s3_client.download_file(r2_bucket_name, file_path, file_name) - return file_name - - -@optional_task(log_prints=True, retries=3) -def update_snippet_in_supabase( - supabase_client, - snippet_id, - gemini_response, - grounding_metadata, - thought_summaries, - analyzed_by, - status, - error_message, - stage_3_prompt_version_id=None, -): - supabase_client.update_snippet( - id=snippet_id, - transcription=gemini_response["transcription"], - translation=gemini_response["translation"], - title=gemini_response["title"], - summary=gemini_response["summary"], - explanation=gemini_response["explanation"], - disinformation_categories=gemini_response["disinformation_categories"], - keywords_detected=gemini_response["keywords_detected"], - language=gemini_response["language"], - confidence_scores=gemini_response["confidence_scores"], - emotional_tone=gemini_response["emotional_tone"], - context=gemini_response["context"], - political_leaning=gemini_response["political_leaning"], - grounding_metadata=grounding_metadata, - thought_summaries=thought_summaries, - analyzed_by=analyzed_by, - status=status, - error_message=error_message, - stage_3_prompt_version_id=stage_3_prompt_version_id, - ) - - -@optional_task(log_prints=True) -def get_metadata(snippet): - return __get_metadata(snippet) - - -def __get_metadata(snippet): - snippet_uuid = snippet["id"] - flagged_snippets = snippet["stage_1_llm_response"]["detection_result"]["flagged_snippets"] - metadata = {} - for flagged_snippet in flagged_snippets: - if flagged_snippet["uuid"] == snippet_uuid: - metadata = flagged_snippet - try: - # Handle escaped unicode characters in the transcription - metadata["transcription"] = flagged_snippet["transcription"].encode("latin-1").decode("unicode-escape") - except (UnicodeError, AttributeError) as e: - # Fallback to original transcription if decoding fails - print(f"Warning: Failed to decode transcription: {e}") - metadata["transcription"] = flagged_snippet["transcription"] - - audio_file = snippet["audio_file"] - recorded_at = datetime.strptime(snippet["recorded_at"], "%Y-%m-%dT%H:%M:%S+00:00") - audio_file["recorded_at"] = recorded_at.strftime("%B %-d, %Y %-I:%M %p") - audio_file["recording_day_of_week"] = recorded_at.strftime("%A") - audio_file["time_zone"] = "UTC" - metadata["additional_info"] = audio_file - - del metadata["start_time"] - del metadata["end_time"] - - # TODO: Add these fields back once we've fixed the pipeline - del metadata["explanation"] - del metadata["keywords_detected"] - - metadata["start_time"] = snippet["start_time"].split(":", 1)[1] - metadata["end_time"] = snippet["end_time"].split(":", 1)[1] - metadata["duration"] = snippet["duration"].split(":", 1)[1] - - return metadata - - -@optional_task(log_prints=True) -def analyze_snippet(gemini_key, audio_file, metadata, prompt_version: dict): - main_model = GeminiModel.GEMINI_2_5_PRO - fallback_model = GeminiModel.GEMINI_FLASH_LATEST - - try: - print(f"Attempting analysis with {main_model}") - analyzing_response = Stage3Executor.run( - gemini_key=gemini_key, - model_name=main_model, - audio_file=audio_file, - metadata=metadata, - prompt_version=prompt_version, - ) - return { - **analyzing_response, - "analyzed_by": main_model, - } - except errors.ServerError as e: - print(f"Server error with {main_model} (code {e.code}): {e.message}") - print(f"Falling back to {fallback_model}") - analyzing_response = Stage3Executor.run( - gemini_key=gemini_key, - model_name=fallback_model, - audio_file=audio_file, - metadata=metadata, - prompt_version=prompt_version, - ) - return { - **analyzing_response, - "analyzed_by": fallback_model, - } - except errors.ClientError as e: - if e.code in [HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN]: - print(f"Auth error with {main_model} (code {e.code}): {e.message}") - raise - else: - print(f"Client error with {main_model} (code {e.code}): {e.message}") - print(f"Falling back to {fallback_model}") - analyzing_response = Stage3Executor.run( - gemini_key=gemini_key, - model_name=fallback_model, - audio_file=audio_file, - metadata=metadata, - prompt_version=prompt_version, - ) - return { - **analyzing_response, - "analyzed_by": fallback_model, - } - - -@optional_task(log_prints=True) -def process_snippet(supabase_client, snippet, local_file, gemini_key, skip_review: bool, prompt_version: dict): - print(f"Processing snippet: {local_file}") - - try: - metadata = get_metadata(snippet) - print(f"Metadata:\n{json.dumps(metadata, indent=2, ensure_ascii=False)}") - - analyzing_response = analyze_snippet( - gemini_key=gemini_key, - audio_file=local_file, - metadata=metadata, - prompt_version=prompt_version, - ) - - status = ProcessingStatus.PROCESSED if skip_review else ProcessingStatus.READY_FOR_REVIEW - update_snippet_in_supabase( - supabase_client=supabase_client, - snippet_id=snippet["id"], - gemini_response=analyzing_response["response"], - grounding_metadata=analyzing_response["grounding_metadata"], - thought_summaries=analyzing_response["thought_summaries"], - analyzed_by=analyzing_response["analyzed_by"], - status=status, - error_message=None, - stage_3_prompt_version_id=prompt_version["id"], - ) - - if skip_review: - postprocess_snippet( - supabase_client, snippet["id"], analyzing_response["response"]["disinformation_categories"] - ) - - print(f"Processing completed for audio file {local_file} - snippet ID: {snippet['id']}") - - except Exception as e: - print(f"Failed to process {local_file}: {e}") - supabase_client.set_snippet_status(snippet["id"], ProcessingStatus.ERROR, str(e)) - - -def reset_snippet_status_hook(flow: Flow, flow_run: FlowRun, state: State): - snippet_ids = flow_run.parameters.get("snippet_ids", None) - - if not snippet_ids: - return - - supabase_client = SupabaseClient(supabase_url=os.getenv("SUPABASE_URL"), supabase_key=os.getenv("SUPABASE_KEY")) - for snippet_id in snippet_ids: - snippet = fetch_a_specific_snippet_from_supabase(supabase_client, snippet_id) - if snippet and snippet["status"] == ProcessingStatus.PROCESSING: - supabase_client.set_snippet_status(snippet_id, ProcessingStatus.NEW) - - -@optional_flow( - name="Stage 3: In-depth Analysis", - log_prints=True, - task_runner=ConcurrentTaskRunner, - on_crashed=[reset_snippet_status_hook], - on_cancellation=[reset_snippet_status_hook], -) -def in_depth_analysis(snippet_ids, skip_review, repeat): - # Setup S3 Client - R2_BUCKET_NAME = os.getenv("R2_BUCKET_NAME") - s3_client = boto3.client( - "s3", - endpoint_url=os.getenv("R2_ENDPOINT_URL"), - aws_access_key_id=os.getenv("R2_ACCESS_KEY_ID"), - aws_secret_access_key=os.getenv("R2_SECRET_ACCESS_KEY"), - ) - - # Setup Gemini Key - GEMINI_KEY = os.getenv("GOOGLE_GEMINI_KEY") - - # Setup Supabase client - supabase_client = SupabaseClient(supabase_url=os.getenv("SUPABASE_URL"), supabase_key=os.getenv("SUPABASE_KEY")) - - # Load prompt version - prompt_version = supabase_client.get_active_prompt(PromptStage.STAGE_3) - - if snippet_ids: - for id in snippet_ids: - snippet = fetch_a_specific_snippet_from_supabase(supabase_client, id) - if snippet: - supabase_client.set_snippet_status(snippet["id"], ProcessingStatus.PROCESSING) - print(f"Found the snippet: {snippet['id']}") - local_file = download_audio_file_from_s3(s3_client, R2_BUCKET_NAME, snippet["file_path"]) - - # Process the snippet - process_snippet( - supabase_client, - snippet, - local_file, - GEMINI_KEY, - skip_review=skip_review, - prompt_version=prompt_version, - ) - - print(f"Delete the downloaded snippet clip: {local_file}") - os.remove(local_file) - else: - while True: - snippet = fetch_a_new_snippet_from_supabase(supabase_client) # TODO: Retry failed snippets (status: Error) - - if snippet: - local_file = download_audio_file_from_s3(s3_client, R2_BUCKET_NAME, snippet["file_path"]) - - # Process the snippet - process_snippet( - supabase_client, - snippet, - local_file, - GEMINI_KEY, - skip_review=skip_review, - prompt_version=prompt_version, - ) - - print(f"Delete the downloaded snippet clip: {local_file}") - os.remove(local_file) - - # Stop the flow if we're not meant to repeat the process - if not repeat: - break - - if snippet: - sleep_time = 2 - else: - sleep_time = 60 - - print(f"Sleep for {sleep_time} seconds before the next iteration") - time.sleep(sleep_time) - - -class Stage3Executor: - """Executor for Stage 3 in-depth analysis.""" - - @classmethod - def run( - cls, - gemini_key: str, - model_name: GeminiModel, - audio_file: str, - metadata: dict, - prompt_version: dict, - ): - """ - Main execution method for Stage 3 analysis. - - Processing strategy: - 1. Step 1: Try Gemini CLI with custom search, fallback to Google Genai SDK with Google Search grounding if CLI fails - 2. Validate: Try to validate response with Pydantic model - 3. Step 2 (conditional): If validation fails, restructure with response_schema - - Args: - gemini_key: Google Gemini API key - model_name: Name of the Gemini model to use - audio_file: Path to the audio file - metadata: Metadata dictionary for the audio clip - prompt_version: The prompt version to use for analysis - - Returns: - dict: Structured and validated analysis output - """ - if not gemini_key: - raise ValueError("Google Gemini API key was not set!") - - client = genai.Client(api_key=gemini_key) - - # Prepare the user prompt using the prompt version - user_prompt = ( - f"{prompt_version['user_prompt']}\n\n" - f"Here is the metadata of the attached audio clip:\n{json.dumps(metadata, indent=2)}\n\n" - f"Here is the current date and time: {datetime.now(timezone.utc).strftime('%B %-d, %Y %-I:%M %p UTC')}\n\n" - ) - - # Strategy: Try CLI first, fallback to SDK - analysis_result = None - uploaded_audio_file = None - - try: - user_prompt_with_file = user_prompt + f"Here is the audio file attached: @{os.path.basename(audio_file)}" - analysis_result = cls.__analyze_with_custom_search( - model_name=model_name, - user_prompt=user_prompt_with_file, - system_instruction=prompt_version["system_instruction"], - ) - except RuntimeError as e: - print("Falling back to Google Search grounding with SDK...") - - uploaded_audio_file = client.files.upload(file=audio_file) - while uploaded_audio_file.state.name == "PROCESSING": - print("Processing the uploaded audio file...") - time.sleep(1) - uploaded_audio_file = client.files.get(name=uploaded_audio_file.name) - - analysis_result = cls.__analyze_with_google_search_grounding( - client, - model_name, - user_prompt, - uploaded_audio_file, - system_instruction=prompt_version["system_instruction"], - ) - - try: - analysis_text = analysis_result["text"] - grounding_metadata = analysis_result["grounding_metadata"] - # SDK method returns thought_summaries from thinking_config, CLI method doesn't - thought_summaries_from_api = analysis_result.get("thought_summaries") - - # Try to validate with Pydantic model first - validated_output = cls.__validate_with_pydantic(analysis_text) - - if validated_output: - # Use thought_summaries from API if available (SDK), otherwise from JSON response (CLI) - thought_summaries = thought_summaries_from_api or validated_output.get("thought_summaries") - return { - "response": validated_output, - "grounding_metadata": grounding_metadata, - "thought_summaries": thought_summaries, - } - - # Step 2: Structure with response_schema (if validation failed) - structured_output = cls.__structure_with_schema(client, analysis_text, prompt_version["output_schema"]) - thought_summaries = thought_summaries_from_api or structured_output.get("thought_summaries") - return { - "response": structured_output, - "grounding_metadata": grounding_metadata, - "thought_summaries": thought_summaries, - } - finally: - if uploaded_audio_file: - client.files.delete(name=uploaded_audio_file.name) - - @optional_task(log_prints=True, retries=3) - @classmethod - def __analyze_with_custom_search( - cls, - model_name: GeminiModel, - user_prompt: str, - system_instruction: str, - ): - """ - Analyze using Gemini CLI with custom search tools (MCP-based). - - This method uses the Gemini CLI which provides: - - Custom search via MCP tools - - Streaming JSON output - - System instruction from file - - Returns: - dict: {"text": str, "grounding_metadata": str|None, "thought_summaries": str|None} - - Raises: - RuntimeError: If CLI execution fails (for fallback to SDK method) - """ - print("Analyzing with Gemini CLI (custom search)...") - - events: list[dict[str, Any]] = [] - final_response = "" - tool_calls: dict[str, dict[str, Any]] = {} # Dict to match tool_use with tool_result by tool_id - timeout = 300 - - # Write system instruction to a temporary file for CLI - with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as tmp_file: - tmp_file.write(system_instruction) - system_instruction_path = tmp_file.name - - env = { - "PATH": os.environ.get("PATH", ""), - "HOME": os.environ.get("HOME", ""), - "GEMINI_API_KEY": os.environ["GOOGLE_GEMINI_KEY"], - "GEMINI_SYSTEM_MD": system_instruction_path, - } - - cmd = [ - "gemini", - "--model", - model_name, - "--output-format", - "stream-json", - user_prompt, - ] - - try: - result = subprocess.run( - cmd, - capture_output=True, - text=True, - env=env, - timeout=timeout, - ) - - # Parse JSONL output - for line in result.stdout.strip().split("\n"): - if not line: - continue - try: - event = json.loads(line) - events.append(event) - - # Concatenate assistant message content - if event.get("type") == GeminiCLIEventType.MESSAGE and event.get("role") == "assistant": - content = event.get("content") - if content and isinstance(content, str): - final_response += content - - # Capture tool use events - if event.get("type") == GeminiCLIEventType.TOOL_USE: - tool_id = event.get("tool_id") - tool_name = event.get("tool_name") - parameters = event.get("parameters") - - if tool_id in tool_calls: - tool_calls[tool_id]["tool_name"] = tool_name - tool_calls[tool_id]["parameters"] = parameters - else: - tool_calls[tool_id] = { - "tool_id": tool_id, - "tool_name": tool_name, - "parameters": parameters, - "output": None, - "status": None, - } - - # Capture tool result events and pair with tool_use - if event.get("type") == GeminiCLIEventType.TOOL_RESULT: - tool_id = event.get("tool_id") - output = event.get("output") - status = event.get("status") - - if tool_id in tool_calls: - tool_calls[tool_id]["output"] = output - tool_calls[tool_id]["status"] = status - else: - tool_calls[tool_id] = { - "tool_id": tool_id, - "tool_name": None, - "parameters": None, - "output": output, - "status": status, - } - except json.JSONDecodeError: - pass - - if result.returncode != 0: - raise RuntimeError(f"Gemini CLI exited with code {result.returncode}: {result.stderr}") - - if not final_response: - raise RuntimeError("Gemini CLI returned no response") - - # Convert tool_calls dict to list and serialize as JSON - tool_calls_list = list(tool_calls.values()) if tool_calls else None - return { - "text": final_response, - "grounding_metadata": json.dumps(tool_calls_list) if tool_calls_list else None, - } - - except subprocess.TimeoutExpired as e: - raise RuntimeError(f"Gemini CLI timed out after {timeout} seconds") from e - finally: - if os.path.exists(system_instruction_path): - os.remove(system_instruction_path) - - @optional_task(log_prints=True, retries=3) - @classmethod - def __analyze_with_google_search_grounding( - cls, - client: genai.Client, - model_name: GeminiModel, - user_prompt: str, - uploaded_audio_file: File, - system_instruction: str, - ): - print("Analyzing audio with web search...") - - response = client.models.generate_content( - model=model_name, - contents=[user_prompt, uploaded_audio_file], - config=GenerateContentConfig( - system_instruction=system_instruction, - max_output_tokens=16384, - tools=[Tool(google_search=GoogleSearch())], - thinking_config=ThinkingConfig(thinking_budget=4096, include_thoughts=True), - safety_settings=get_safety_settings(), - ), - ) - - thoughts = "" - for part in response.candidates[0].content.parts: - if part.thought and part.text: - thoughts += part.text - - grounding_metadata = ( - response.candidates[0].grounding_metadata.model_dump_json(indent=2) if response.candidates else None - ) - - if not response.text: - finish_reason = response.candidates[0].finish_reason if response.candidates else None - - if finish_reason == FinishReason.MAX_TOKENS: - raise ValueError("The response from Gemini was too long and was cut off in step 1.") - - print(f"Response finish reason: {finish_reason}") - raise ValueError("No response from Gemini in step 1.") - - return { - "text": response.text, - "grounding_metadata": grounding_metadata, - "thought_summaries": thoughts, - } - - @classmethod - def __validate_with_pydantic(cls, response_text: str): - try: - print("Attempting to validate response with Pydantic model...") - start_idx = response_text.find("{") - end_idx = response_text.rfind("}") - - if start_idx == -1 or end_idx == -1: - print("No JSON object found in the response.") - return None - - parsed = Stage3Output.model_validate_json(response_text[start_idx : end_idx + 1]) - print("Validation successful - returning structured output") - return parsed.model_dump() - except ValidationError as e: - print(f"Validation failed: {e}") - return None - - @classmethod - def __structure_with_schema( - cls, - client: genai.Client, - analysis_text: str, - output_schema: dict, - ): - print("Restructuring response with schema validation...") - - system_instruction = """You are a helpful assistant whose task is to convert provided text into a valid JSON object following a given schema. Your responsibilities are: - -1. **Validation**: Check if the provided text can be converted into a valid JSON object that adheres to the specified schema. -2. **Conversion**: - - If the text is convertible, convert it into a valid JSON object according to the schema. - - Set field `"is_convertible": true` in the JSON object. -3. **Error Handling**: - - If the text is not convertible (e.g., missing fields, incorrect data types), return a JSON object with the field `"is_convertible": false`.""" - - user_prompt = f"Please structure the following analysis text into the required JSON format:\n\n{analysis_text}" - - response = client.models.generate_content( - model=GeminiModel.GEMINI_FLASH_LATEST, - contents=[user_prompt], - config=GenerateContentConfig( - response_mime_type="application/json", - response_schema=output_schema, - system_instruction=system_instruction, - max_output_tokens=8192, - thinking_config=ThinkingConfig(thinking_budget=0), - safety_settings=get_safety_settings(), - ), - ) - - parsed_response = response.parsed - - if not parsed_response: - finish_reason = response.candidates[0].finish_reason if response.candidates else None - - if finish_reason == FinishReason.MAX_TOKENS: - raise ValueError("The response from Gemini was too long and was cut off in step 2.") - - raise ValueError(f"No response from Gemini in step 2. Response finished with reason: {finish_reason}") - - if not parsed_response.get("is_convertible"): - raise ValueError("[Stage 3] The response from Gemini could not be converted to the required schema.") - - return parsed_response diff --git a/src/processing_pipeline/stage_3/__init__.py b/src/processing_pipeline/stage_3/__init__.py new file mode 100644 index 0000000..04be699 --- /dev/null +++ b/src/processing_pipeline/stage_3/__init__.py @@ -0,0 +1,23 @@ +from .executors import Stage3Executor +from .flows import in_depth_analysis +from .tasks import ( + analyze_snippet, + download_audio_file_from_s3, + fetch_a_new_snippet_from_supabase, + fetch_a_specific_snippet_from_supabase, + get_metadata, + process_snippet, + update_snippet_in_supabase, +) + +__all__ = [ + "Stage3Executor", + "analyze_snippet", + "download_audio_file_from_s3", + "fetch_a_new_snippet_from_supabase", + "fetch_a_specific_snippet_from_supabase", + "get_metadata", + "in_depth_analysis", + "process_snippet", + "update_snippet_in_supabase", +] diff --git a/src/processing_pipeline/stage_3/executors.py b/src/processing_pipeline/stage_3/executors.py new file mode 100644 index 0000000..c19b935 --- /dev/null +++ b/src/processing_pipeline/stage_3/executors.py @@ -0,0 +1,324 @@ +from datetime import datetime, timezone +import json +import os +import subprocess +import tempfile +import time +from typing import Any + +from google import genai +from google.genai.types import ( + File, + FinishReason, + GenerateContentConfig, + GoogleSearch, + ThinkingConfig, + Tool, +) +from pydantic import ValidationError + +from processing_pipeline.constants import ( + GeminiCLIEventType, + GeminiModel, +) +from processing_pipeline.processing_utils import get_safety_settings +from processing_pipeline.stage_3.models import Stage3Output +from utils import optional_task + + +class Stage3Executor: + """Executor for Stage 3 in-depth analysis.""" + + @classmethod + def run( + cls, + gemini_key: str, + model_name: GeminiModel, + audio_file: str, + metadata: dict, + prompt_version: dict, + ): + """ + Main execution method for Stage 3 analysis. + + Processing strategy: + 1. Step 1: Try Gemini CLI with custom search, fallback to Google Genai SDK with Google Search grounding if CLI fails + 2. Validate: Try to validate response with Pydantic model + 3. Step 2 (conditional): If validation fails, restructure with response_schema + + Args: + gemini_key: Google Gemini API key + model_name: Name of the Gemini model to use + audio_file: Path to the audio file + metadata: Metadata dictionary for the audio clip + prompt_version: The prompt version to use for analysis + + Returns: + dict: Structured and validated analysis output + """ + if not gemini_key: + raise ValueError("Google Gemini API key was not set!") + + client = genai.Client(api_key=gemini_key) + + # Prepare the user prompt using the prompt version + user_prompt = ( + f"{prompt_version['user_prompt']}\n\n" + f"Here is the metadata of the attached audio clip:\n{json.dumps(metadata, indent=2)}\n\n" + f"Here is the current date and time: {datetime.now(timezone.utc).strftime('%B %-d, %Y %-I:%M %p UTC')}\n\n" + ) + + # Strategy: Try CLI first, fallback to SDK + analysis_text = None + thought_summaries_from_api = None + uploaded_audio_file = None + + try: + user_prompt_with_file = user_prompt + f"Here is the audio file attached: @{os.path.basename(audio_file)}" + analysis_text = cls.__analyze_with_custom_search( + model_name=model_name, + user_prompt=user_prompt_with_file, + system_instruction=prompt_version["system_instruction"], + ) + except RuntimeError as e: + print("Falling back to Google Search grounding with SDK...") + + uploaded_audio_file = client.files.upload(file=audio_file) + while uploaded_audio_file.state.name == "PROCESSING": + print("Processing the uploaded audio file...") + time.sleep(1) + uploaded_audio_file = client.files.get(name=uploaded_audio_file.name) + + sdk_result = cls.__analyze_with_google_search_grounding( + client, + model_name, + user_prompt, + uploaded_audio_file, + system_instruction=prompt_version["system_instruction"], + ) + analysis_text = sdk_result["text"] + thought_summaries_from_api = sdk_result.get("thought_summaries") + + try: + # Try to validate with Pydantic model first + validated_output = cls.__validate_with_pydantic(analysis_text) + + if validated_output: + thought_summaries = thought_summaries_from_api or validated_output.get("thought_summaries") + grounding_metadata = json.dumps(validated_output.get("verification_evidence"), indent=2) + return { + "response": validated_output, + "grounding_metadata": grounding_metadata, + "thought_summaries": thought_summaries, + } + + # Step 2: Structure with response_schema (if validation failed) + structured_output = cls.__structure_with_schema(client, analysis_text, prompt_version["output_schema"]) + thought_summaries = thought_summaries_from_api or structured_output.get("thought_summaries") + grounding_metadata = json.dumps(structured_output.get("verification_evidence"), indent=2) + return { + "response": structured_output, + "grounding_metadata": grounding_metadata, + "thought_summaries": thought_summaries, + } + finally: + if uploaded_audio_file: + client.files.delete(name=uploaded_audio_file.name) + + @optional_task(log_prints=True, retries=3) + @classmethod + def __analyze_with_custom_search( + cls, + model_name: GeminiModel, + user_prompt: str, + system_instruction: str, + ): + """ + Analyze using Gemini CLI with custom search tools (MCP-based). + + This method uses the Gemini CLI which provides: + - Custom search via MCP tools + - Streaming JSON output + - System instruction from file + + Returns: + str: Final response text from Gemini CLI + + Raises: + RuntimeError: If CLI execution fails (for fallback to SDK method) + """ + print("Analyzing with Gemini CLI (custom search)...") + + events: list[dict[str, Any]] = [] + final_response = "" + timeout = 300 + + # Write system instruction to a temporary file for CLI + with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as tmp_file: + tmp_file.write(system_instruction) + system_instruction_path = tmp_file.name + + env = { + "PATH": os.environ.get("PATH", ""), + "HOME": os.environ.get("HOME", ""), + "GEMINI_API_KEY": os.environ["GOOGLE_GEMINI_KEY"], + "GEMINI_SYSTEM_MD": system_instruction_path, + "SEARXNG_URL": os.environ.get("SEARXNG_URL", ""), + } + + cmd = [ + "gemini", + "--model", + model_name, + "--output-format", + "stream-json", + user_prompt, + ] + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + env=env, + timeout=timeout, + ) + + # Parse JSONL output + for line in result.stdout.strip().split("\n"): + if not line: + continue + try: + event = json.loads(line) + events.append(event) + + # Concatenate assistant message content + if event.get("type") == GeminiCLIEventType.MESSAGE and event.get("role") == "assistant": + content = event.get("content") + if content and isinstance(content, str): + final_response += content + except json.JSONDecodeError: + pass + + if result.returncode != 0: + raise RuntimeError(f"Gemini CLI exited with code {result.returncode}: {result.stderr}") + + if not final_response: + raise RuntimeError("Gemini CLI returned no response") + + return final_response + + except subprocess.TimeoutExpired as e: + raise RuntimeError(f"Gemini CLI timed out after {timeout} seconds") from e + finally: + if os.path.exists(system_instruction_path): + os.remove(system_instruction_path) + + @optional_task(log_prints=True, retries=3) + @classmethod + def __analyze_with_google_search_grounding( + cls, + client: genai.Client, + model_name: GeminiModel, + user_prompt: str, + uploaded_audio_file: File, + system_instruction: str, + ): + print("Analyzing audio with web search...") + + response = client.models.generate_content( + model=model_name, + contents=[user_prompt, uploaded_audio_file], + config=GenerateContentConfig( + system_instruction=system_instruction, + max_output_tokens=16384, + tools=[Tool(google_search=GoogleSearch())], + thinking_config=ThinkingConfig(thinking_budget=4096, include_thoughts=True), + safety_settings=get_safety_settings(), + ), + ) + + thoughts = "" + for part in response.candidates[0].content.parts: + if part.thought and part.text: + thoughts += part.text + + if not response.text: + finish_reason = response.candidates[0].finish_reason if response.candidates else None + + if finish_reason == FinishReason.MAX_TOKENS: + raise ValueError("The response from Gemini was too long and was cut off in step 1.") + + print(f"Response finish reason: {finish_reason}") + raise ValueError("No response from Gemini in step 1.") + + return { + "text": response.text, + "thought_summaries": thoughts, + } + + @classmethod + def __validate_with_pydantic(cls, response_text: str): + try: + print("Attempting to validate response with Pydantic model...") + start_idx = response_text.find("{") + end_idx = response_text.rfind("}") + + if start_idx == -1 or end_idx == -1: + print("No JSON object found in the response.") + return None + + parsed = Stage3Output.model_validate_json(response_text[start_idx : end_idx + 1]) + print("Validation successful - returning structured output") + return parsed.model_dump() + except ValidationError as e: + print(f"Validation failed: {e}") + return None + + @classmethod + def __structure_with_schema( + cls, + client: genai.Client, + analysis_text: str, + output_schema: dict, + ): + print("Restructuring response with schema validation...") + + system_instruction = """You are a helpful assistant whose task is to convert provided text into a valid JSON object following a given schema. Your responsibilities are: + +1. **Validation**: Check if the provided text can be converted into a valid JSON object that adheres to the specified schema. +2. **Conversion**: + - If the text is convertible, convert it into a valid JSON object according to the schema. + - Set field `"is_convertible": true` in the JSON object. +3. **Error Handling**: + - If the text is not convertible (e.g., missing fields, incorrect data types), return a JSON object with the field `"is_convertible": false`.""" + + user_prompt = f"Please structure the following analysis text into the required JSON format:\n\n{analysis_text}" + + response = client.models.generate_content( + model=GeminiModel.GEMINI_FLASH_LATEST, + contents=[user_prompt], + config=GenerateContentConfig( + response_mime_type="application/json", + response_schema=output_schema, + system_instruction=system_instruction, + max_output_tokens=8192, + thinking_config=ThinkingConfig(thinking_budget=0), + safety_settings=get_safety_settings(), + ), + ) + + parsed_response = response.parsed + + if not parsed_response: + finish_reason = response.candidates[0].finish_reason if response.candidates else None + + if finish_reason == FinishReason.MAX_TOKENS: + raise ValueError("The response from Gemini was too long and was cut off in step 2.") + + raise ValueError(f"No response from Gemini in step 2. Response finished with reason: {finish_reason}") + + if not parsed_response.get("is_convertible"): + raise ValueError("[Stage 3] The response from Gemini could not be converted to the required schema.") + + return parsed_response diff --git a/src/processing_pipeline/stage_3/flows.py b/src/processing_pipeline/stage_3/flows.py new file mode 100644 index 0000000..2ae6f74 --- /dev/null +++ b/src/processing_pipeline/stage_3/flows.py @@ -0,0 +1,109 @@ +import os +import time + +import boto3 +from prefect.flows import Flow +from prefect.client.schemas import FlowRun, State +from prefect.task_runners import ConcurrentTaskRunner + +from processing_pipeline.constants import ProcessingStatus, PromptStage +from processing_pipeline.stage_3.tasks import ( + download_audio_file_from_s3, + fetch_a_new_snippet_from_supabase, + fetch_a_specific_snippet_from_supabase, + process_snippet, +) +from processing_pipeline.supabase_utils import SupabaseClient +from utils import optional_flow + + +def reset_snippet_status_hook(flow: Flow, flow_run: FlowRun, state: State): + snippet_ids = flow_run.parameters.get("snippet_ids", None) + + if not snippet_ids: + return + + supabase_client = SupabaseClient(supabase_url=os.getenv("SUPABASE_URL"), supabase_key=os.getenv("SUPABASE_KEY")) + for snippet_id in snippet_ids: + snippet = fetch_a_specific_snippet_from_supabase(supabase_client, snippet_id) + if snippet and snippet["status"] == ProcessingStatus.PROCESSING: + supabase_client.set_snippet_status(snippet_id, ProcessingStatus.NEW) + + +@optional_flow( + name="Stage 3: In-depth Analysis", + log_prints=True, + task_runner=ConcurrentTaskRunner, + on_crashed=[reset_snippet_status_hook], + on_cancellation=[reset_snippet_status_hook], +) +def in_depth_analysis(snippet_ids, skip_review, repeat): + # Setup S3 Client + R2_BUCKET_NAME = os.getenv("R2_BUCKET_NAME") + s3_client = boto3.client( + "s3", + endpoint_url=os.getenv("R2_ENDPOINT_URL"), + aws_access_key_id=os.getenv("R2_ACCESS_KEY_ID"), + aws_secret_access_key=os.getenv("R2_SECRET_ACCESS_KEY"), + ) + + # Setup Gemini Key + GEMINI_KEY = os.getenv("GOOGLE_GEMINI_KEY") + + # Setup Supabase client + supabase_client = SupabaseClient(supabase_url=os.getenv("SUPABASE_URL"), supabase_key=os.getenv("SUPABASE_KEY")) + + # Load prompt version + prompt_version = supabase_client.get_active_prompt(PromptStage.STAGE_3) + + if snippet_ids: + for id in snippet_ids: + snippet = fetch_a_specific_snippet_from_supabase(supabase_client, id) + if snippet: + supabase_client.set_snippet_status(snippet["id"], ProcessingStatus.PROCESSING) + print(f"Found the snippet: {snippet['id']}") + local_file = download_audio_file_from_s3(s3_client, R2_BUCKET_NAME, snippet["file_path"]) + + # Process the snippet + process_snippet( + supabase_client, + snippet, + local_file, + GEMINI_KEY, + skip_review=skip_review, + prompt_version=prompt_version, + ) + + print(f"Delete the downloaded snippet clip: {local_file}") + os.remove(local_file) + else: + while True: + snippet = fetch_a_new_snippet_from_supabase(supabase_client) # TODO: Retry failed snippets (status: Error) + + if snippet: + local_file = download_audio_file_from_s3(s3_client, R2_BUCKET_NAME, snippet["file_path"]) + + # Process the snippet + process_snippet( + supabase_client, + snippet, + local_file, + GEMINI_KEY, + skip_review=skip_review, + prompt_version=prompt_version, + ) + + print(f"Delete the downloaded snippet clip: {local_file}") + os.remove(local_file) + + # Stop the flow if we're not meant to repeat the process + if not repeat: + break + + if snippet: + sleep_time = 2 + else: + sleep_time = 60 + + print(f"Sleep for {sleep_time} seconds before the next iteration") + time.sleep(sleep_time) diff --git a/src/processing_pipeline/stage_3_models.py b/src/processing_pipeline/stage_3/models.py similarity index 69% rename from src/processing_pipeline/stage_3_models.py rename to src/processing_pipeline/stage_3/models.py index 5790f30..36c2b7c 100644 --- a/src/processing_pipeline/stage_3_models.py +++ b/src/processing_pipeline/stage_3/models.py @@ -71,6 +71,9 @@ class CategoryScore(BaseModel): class ConfidenceScores(BaseModel): overall: int = Field(ge=0, le=100, description="Overall confidence score of the analysis, ranging from 0 to 100") + verification_status: Literal["verified_false", "verified_true", "uncertain", "insufficient_evidence"] = Field( + description="Overall verification status based on evidence quality" + ) analysis: Analysis categories: list[CategoryScore] @@ -131,6 +134,43 @@ class PoliticalLeaning(BaseModel): explanation: PoliticalExplanation +class SearchResult(BaseModel): + url: str = Field(description="Full URL of the source") + source_name: str = Field(description="Name of the publication or website (e.g., Reuters, AP News, BBC)") + source_type: Literal[ + "tier1_wire_service", "tier1_factchecker", "tier2_major_news", + "tier3_regional_news", "official_source", "other" + ] = Field(description="Classification of source reliability tier") + publication_date: str | None = Field(description="Publication date in ISO 8601 format (YYYY-MM-DD), or null if not available") + title: str = Field(description="Title or headline of the article/page") + relevant_excerpt: str = Field(description="Direct quote from the source relevant to the claim (50-200 words)") + relevance_to_claim: Literal["supports_claim", "contradicts_claim", "provides_context", "inconclusive"] = Field( + description="How this result relates to the claim being verified" + ) + content_fetched: bool = Field(default=False, description="Whether the full article content was fetched") + + +class SearchPerformed(BaseModel): + query: str = Field(description="The exact search query used") + search_intent: str = Field(description="What claim or fact this search was attempting to verify") + result_status: Literal["results_found", "no_results", "results_inconclusive"] = Field( + description="Whether the search returned actionable results" + ) + results: list[SearchResult] = Field(default_factory=list, description="Individual search results") + + +class VerificationSummary(BaseModel): + total_searches: int = Field(description="Total number of web searches performed") + claims_contradicted: int = Field(description="Number of claims found to be false based on search evidence") + claims_unverifiable: int = Field(description="Number of claims that could not be verified") + key_findings: str = Field(description="Narrative summary of the most important verification findings") + + +class VerificationEvidence(BaseModel): + searches_performed: list[SearchPerformed] = Field(description="Record of all web searches") + verification_summary: VerificationSummary = Field(description="Summary of verification activities") + + class Stage3Output(BaseModel): """Main model for Stage 3 output.""" @@ -153,3 +193,6 @@ class Stage3Output(BaseModel): thought_summaries: str = Field( description="A summary of your reasoning process, key observations, and analytical steps taken during the analysis" ) + verification_evidence: VerificationEvidence = Field( + description="Complete documentation of all web searches performed during fact-checking" + ) diff --git a/src/processing_pipeline/stage_3/tasks.py b/src/processing_pipeline/stage_3/tasks.py new file mode 100644 index 0000000..dc661b7 --- /dev/null +++ b/src/processing_pipeline/stage_3/tasks.py @@ -0,0 +1,221 @@ +from datetime import datetime +from http import HTTPStatus +import json +import os + +from google.genai import errors + +from processing_pipeline.constants import ( + GeminiModel, + ProcessingStatus, +) +from processing_pipeline.processing_utils import postprocess_snippet +from processing_pipeline.stage_3.executors import Stage3Executor +from utils import optional_task + + +@optional_task(log_prints=True, retries=3) +def fetch_a_specific_snippet_from_supabase(supabase_client, snippet_id): + response = supabase_client.get_snippet_by_id( + id=snippet_id, + select='*, audio_file(radio_station_name, radio_station_code, location_state, location_city, recorded_at, recording_day_of_week), stage_1_llm_response("detection_result")', + ) + if response: + return response + else: + print(f"Snippet with id {snippet_id} not found") + return None + + +@optional_task(log_prints=True, retries=3) +def fetch_a_new_snippet_from_supabase(supabase_client): + return __fetch_a_new_snippet_from_supabase(supabase_client) + + +def __fetch_a_new_snippet_from_supabase(supabase_client): + response = supabase_client.get_a_new_snippet_and_reserve_it() + if response: + print(f"Found a new snippet: {response['id']}") + return response + else: + print("No new snippets found") + return None + + +@optional_task(log_prints=True, retries=3) +def download_audio_file_from_s3(s3_client, r2_bucket_name, file_path): + return __download_audio_file_from_s3(s3_client, r2_bucket_name, file_path) + + +def __download_audio_file_from_s3(s3_client, r2_bucket_name, file_path): + file_name = os.path.basename(file_path) + s3_client.download_file(r2_bucket_name, file_path, file_name) + return file_name + + +@optional_task(log_prints=True, retries=3) +def update_snippet_in_supabase( + supabase_client, + snippet_id, + gemini_response, + grounding_metadata, + thought_summaries, + analyzed_by, + status, + error_message, + stage_3_prompt_version_id=None, +): + supabase_client.update_snippet( + id=snippet_id, + transcription=gemini_response["transcription"], + translation=gemini_response["translation"], + title=gemini_response["title"], + summary=gemini_response["summary"], + explanation=gemini_response["explanation"], + disinformation_categories=gemini_response["disinformation_categories"], + keywords_detected=gemini_response["keywords_detected"], + language=gemini_response["language"], + confidence_scores=gemini_response["confidence_scores"], + emotional_tone=gemini_response["emotional_tone"], + context=gemini_response["context"], + political_leaning=gemini_response["political_leaning"], + grounding_metadata=grounding_metadata, + thought_summaries=thought_summaries, + analyzed_by=analyzed_by, + status=status, + error_message=error_message, + stage_3_prompt_version_id=stage_3_prompt_version_id, + ) + + +@optional_task(log_prints=True) +def get_metadata(snippet): + return __get_metadata(snippet) + + +def __get_metadata(snippet): + snippet_uuid = snippet["id"] + flagged_snippets = snippet["stage_1_llm_response"]["detection_result"]["flagged_snippets"] + metadata = {} + for flagged_snippet in flagged_snippets: + if flagged_snippet["uuid"] == snippet_uuid: + metadata = flagged_snippet + try: + # Handle escaped unicode characters in the transcription + metadata["transcription"] = flagged_snippet["transcription"].encode("latin-1").decode("unicode-escape") + except (UnicodeError, AttributeError) as e: + # Fallback to original transcription if decoding fails + print(f"Warning: Failed to decode transcription: {e}") + metadata["transcription"] = flagged_snippet["transcription"] + + audio_file = snippet["audio_file"] + recorded_at = datetime.strptime(snippet["recorded_at"], "%Y-%m-%dT%H:%M:%S+00:00") + audio_file["recorded_at"] = recorded_at.strftime("%B %-d, %Y %-I:%M %p") + audio_file["recording_day_of_week"] = recorded_at.strftime("%A") + audio_file["time_zone"] = "UTC" + metadata["additional_info"] = audio_file + + del metadata["start_time"] + del metadata["end_time"] + + # TODO: Add these fields back once we've fixed the pipeline + del metadata["explanation"] + del metadata["keywords_detected"] + + metadata["start_time"] = snippet["start_time"].split(":", 1)[1] + metadata["end_time"] = snippet["end_time"].split(":", 1)[1] + metadata["duration"] = snippet["duration"].split(":", 1)[1] + + return metadata + + +@optional_task(log_prints=True) +def analyze_snippet(gemini_key, audio_file, metadata, prompt_version: dict): + main_model = GeminiModel.GEMINI_2_5_PRO + fallback_model = GeminiModel.GEMINI_FLASH_LATEST + + try: + print(f"Attempting analysis with {main_model}") + analyzing_response = Stage3Executor.run( + gemini_key=gemini_key, + model_name=main_model, + audio_file=audio_file, + metadata=metadata, + prompt_version=prompt_version, + ) + return { + **analyzing_response, + "analyzed_by": main_model, + } + except errors.ServerError as e: + print(f"Server error with {main_model} (code {e.code}): {e.message}") + print(f"Falling back to {fallback_model}") + analyzing_response = Stage3Executor.run( + gemini_key=gemini_key, + model_name=fallback_model, + audio_file=audio_file, + metadata=metadata, + prompt_version=prompt_version, + ) + return { + **analyzing_response, + "analyzed_by": fallback_model, + } + except errors.ClientError as e: + if e.code in [HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN]: + print(f"Auth error with {main_model} (code {e.code}): {e.message}") + raise + else: + print(f"Client error with {main_model} (code {e.code}): {e.message}") + print(f"Falling back to {fallback_model}") + analyzing_response = Stage3Executor.run( + gemini_key=gemini_key, + model_name=fallback_model, + audio_file=audio_file, + metadata=metadata, + prompt_version=prompt_version, + ) + return { + **analyzing_response, + "analyzed_by": fallback_model, + } + + +@optional_task(log_prints=True) +def process_snippet(supabase_client, snippet, local_file, gemini_key, skip_review: bool, prompt_version: dict): + print(f"Processing snippet: {local_file}") + + try: + metadata = get_metadata(snippet) + print(f"Metadata:\n{json.dumps(metadata, indent=2, ensure_ascii=False)}") + + analyzing_response = analyze_snippet( + gemini_key=gemini_key, + audio_file=local_file, + metadata=metadata, + prompt_version=prompt_version, + ) + + status = ProcessingStatus.PROCESSED if skip_review else ProcessingStatus.READY_FOR_REVIEW + update_snippet_in_supabase( + supabase_client=supabase_client, + snippet_id=snippet["id"], + gemini_response=analyzing_response["response"], + grounding_metadata=analyzing_response["grounding_metadata"], + thought_summaries=analyzing_response["thought_summaries"], + analyzed_by=analyzing_response["analyzed_by"], + status=status, + error_message=None, + stage_3_prompt_version_id=prompt_version["id"], + ) + + if skip_review: + postprocess_snippet( + supabase_client, snippet["id"], analyzing_response["response"]["disinformation_categories"] + ) + + print(f"Processing completed for audio file {local_file} - snippet ID: {snippet['id']}") + + except Exception as e: + print(f"Failed to process {local_file}: {e}") + supabase_client.set_snippet_status(snippet["id"], ProcessingStatus.ERROR, str(e)) diff --git a/src/processing_pipeline/timestamped_transcription_generator.py b/src/processing_pipeline/timestamped_transcription_generator.py deleted file mode 100644 index 866665a..0000000 --- a/src/processing_pipeline/timestamped_transcription_generator.py +++ /dev/null @@ -1,170 +0,0 @@ -import json -import os -import pathlib -from pydub import AudioSegment -from google import genai -from google.genai.types import ( - SafetySetting, - HarmCategory, - HarmBlockThreshold, - Part, - ThinkingConfig, - GenerateContentConfig, -) -from processing_pipeline.constants import ( - GeminiModel, - get_timestamped_transcription_generation_output_schema, - get_timestamped_transcription_generation_prompt, -) - - -class TimestampedTranscriptionGenerator: - - SYSTEM_INSTRUCTION = ( - "You are a specialized language model designed to transcribe audio content in multiple languages." - ) - USER_PROMPT = get_timestamped_transcription_generation_prompt() - OUTPUT_SCHEMA = get_timestamped_transcription_generation_output_schema() - - @classmethod - def run(cls, audio_file, gemini_key, segment_length): - print("Splitting the file into 2 equal parts...") - first_part, second_part = cls.split_file_into_two_parts(audio_file, segment_length) - - try: - print("Splitting the first part into segments...") - first_part_segments = cls.split_file_into_segments(first_part, segment_length * 1000) - try: - print("Transcribing the first part...") - first = cls.transcribe_segments(first_part_segments, gemini_key) - finally: - print("Removing the first part segments...") - for s in first_part_segments: - os.remove(s) - - print("Splitting the second part into segments...") - second_part_segments = cls.split_file_into_segments(second_part, segment_length * 1000) - try: - print("Transcribing the second part...") - second = cls.transcribe_segments(second_part_segments, gemini_key) - finally: - print("Removing the second part segments...") - for s in second_part_segments: - os.remove(s) - finally: - print("Removing the two audio parts...") - os.remove(first_part) - os.remove(second_part) - - print("Combining segments from both parts...") - segments = first + second - print("Extracting the transcriptions from the segments...") - segment_transcripts = [segment["transcript"] for segment in segments] - - print("Formatting the transcriptions into a timestamped transcription...") - return cls.build_timestamped_transcription(segment_transcripts, segment_length) - - @classmethod - def transcribe_segments(cls, audio_segments, gemini_key): - if not gemini_key: - raise ValueError("Google Gemini API key was not set!") - - if not audio_segments: - raise ValueError("No audio segments provided!") - - client = genai.Client(api_key=gemini_key) - - segments = [] - for index, segment_path in enumerate(audio_segments): - segments.extend( - [ - f"\n\n", - Part.from_bytes(data=pathlib.Path(segment_path).read_bytes(), mime_type="audio/mp3"), - f"\n\n\n", - ] - ) - - result = client.models.generate_content( - model=GeminiModel.GEMINI_FLASH_LATEST, - contents=[cls.USER_PROMPT] + segments, - config=GenerateContentConfig( - response_mime_type="application/json", - response_schema=cls.OUTPUT_SCHEMA, - system_instruction=cls.SYSTEM_INSTRUCTION, - max_output_tokens=16384, - thinking_config=ThinkingConfig(thinking_budget=1024), - safety_settings=[ - SafetySetting( - category=HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, - threshold=HarmBlockThreshold.BLOCK_NONE, - ), - SafetySetting( - category=HarmCategory.HARM_CATEGORY_HATE_SPEECH, - threshold=HarmBlockThreshold.BLOCK_NONE, - ), - SafetySetting( - category=HarmCategory.HARM_CATEGORY_HARASSMENT, - threshold=HarmBlockThreshold.BLOCK_NONE, - ), - SafetySetting( - category=HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, - threshold=HarmBlockThreshold.BLOCK_NONE, - ), - ], - ), - ) - return json.loads(result.text)["segments"] - - @classmethod - def build_timestamped_transcription(cls, segment_transcriptions, segment_length): - result = "" - for i, transcription in enumerate(segment_transcriptions): - # Convert the segment duration to minutes and seconds - minutes = segment_length * i // 60 - seconds = segment_length * i % 60 - - result += f"[{minutes:02}:{seconds:02}] {transcription}\n" - - return result - - @classmethod - def split_file_into_segments(cls, audio_file, segment_length_ms): - audio = AudioSegment.from_mp3(audio_file) - segments = [] - - for i in range(0, len(audio), segment_length_ms): - # Slice the audio segment - subclip = audio[i : i + segment_length_ms] - - # Export the subclip - output_file = f"{audio_file}_segment_{(i // segment_length_ms) + 1}.mp3" - subclip.export(output_file, format="mp3") - - segments.append(output_file) - - del audio - return segments - - @classmethod - def split_file_into_two_parts(cls, file, segment_length): - audio = AudioSegment.from_mp3(file) - half_length = len(audio) // 2 - segment_length_ms = segment_length * 1000 - - # Round down the half_length to the nearest multiple of segment_length_ms - split_point = int((half_length // segment_length_ms) * segment_length_ms) - print(f"Split point is at {split_point} milliseconds") - - first_part = audio[:split_point] - second_part = audio[split_point:] - - # Export the parts - first_part.export(f"{file}_part_1.mp3", format="mp3") - second_part.export(f"{file}_part_2.mp3", format="mp3") - - # Release the memory - del audio - del first_part - del second_part - - return f"{file}_part_1.mp3", f"{file}_part_2.mp3" diff --git a/src/scripts/import_prompts_to_db.py b/src/scripts/import_prompts_to_db.py index e940b99..5321a71 100644 --- a/src/scripts/import_prompts_to_db.py +++ b/src/scripts/import_prompts_to_db.py @@ -40,9 +40,9 @@ "output_schema": "prompts/stage_1/preprocess/initial_detection_output_schema.json", }, PromptStage.STAGE_3: { - "system_instruction": "prompts/Stage_3_system_instruction.md", - "user_prompt": "prompts/Stage_3_analysis_prompt.md", - "output_schema": "prompts/Stage_3_output_schema.json", + "system_instruction": "prompts/stage_3/system_instruction.md", + "user_prompt": "prompts/stage_3/analysis_prompt.md", + "output_schema": "prompts/stage_3/output_schema.json", }, PromptStage.STAGE_4: { "system_instruction": "prompts/Stage_4_system_instruction.md", diff --git a/tests/processing_pipeline/test_stage_1.py b/tests/processing_pipeline/test_stage_1.py index 99873df..06d9817 100644 --- a/tests/processing_pipeline/test_stage_1.py +++ b/tests/processing_pipeline/test_stage_1.py @@ -11,7 +11,6 @@ fetch_stage_1_llm_response_by_id, download_audio_file_from_s3, transcribe_audio_file_with_timestamp_with_gemini, - transcribe_audio_file_with_custom_timestamped_transcription_generator, disinformation_detection_with_gemini, insert_stage_1_llm_response, process_audio_file, @@ -157,17 +156,6 @@ def test_transcribe_with_timestamp_with_gemini_success(self, mock_environment): model_name=GeminiModel.GEMINI_FLASH_LATEST ) - def test_transcribe_with_custom_generator_success(self, mock_environment): - """Test successful transcription with custom generator""" - with patch("processing_pipeline.stage_1.TimestampedTranscriptionGenerator") as mock_generator: - mock_generator.run.return_value = "Test timestamped transcription" - - result = transcribe_audio_file_with_custom_timestamped_transcription_generator("test.mp3") - - assert result["timestamped_transcription"] == "Test timestamped transcription" - mock_generator.run.assert_called_once() - - class TestDetectionFunctions: def test_disinformation_detection_success(self, mock_environment): @@ -685,15 +673,6 @@ def test_undo_disinformation_detection_no_responses(self, mock_supabase_client): mock_supabase_client.reset_audio_file_status.assert_called_once_with([1, 2]) mock_supabase_client.delete_stage_1_llm_responses.assert_called_once_with([1, 2]) - @patch("processing_pipeline.stage_1.TimestampedTranscriptionGenerator") - def test_transcribe_audio_file_with_custom_generator_error(self, mock_generator): - """Test custom generator with error""" - mock_generator.run.side_effect = Exception("Generation failed") - - with pytest.raises(Exception, match="Generation failed"): - transcribe_audio_file_with_custom_timestamped_transcription_generator("test.mp3") - - def test_initial_disinformation_detection_specific_file(self, mock_supabase_client, mock_s3_client): """Test initial disinformation detection with specific file""" @@ -715,21 +694,6 @@ def test_initial_disinformation_detection_specific_file(self, mock_supabase_clie mock_supabase_client.get_audio_file_by_id.assert_called_once_with(1) mock_s3_client.download_file.assert_called_once() - def test_transcribe_audio_file_with_custom_generator_multiple_segments(self, mock_supabase_client): - """Test transcription with custom generator handling multiple segments""" - with patch("processing_pipeline.stage_1.TimestampedTranscriptionGenerator") as mock_generator: - # Mock generator to return transcription with multiple segments - mock_generator.run.return_value = ( - "[00:00] First segment.\n" "[00:10] Second segment.\n" "[00:20] Third segment.\n" - ) - - result = transcribe_audio_file_with_custom_timestamped_transcription_generator("test.mp3") - - assert isinstance(result, dict) - assert "timestamped_transcription" in result - assert len(result["timestamped_transcription"].split("\n")) == 4 # 3 segments + empty line - mock_generator.run.assert_called_once_with("test.mp3", os.getenv("GOOGLE_GEMINI_KEY"), 10) - def test_disinformation_detection_with_unicode_handling(self, mock_supabase_client, mock_genai): """Test disinformation detection with Unicode characters""" timestamped_transcription = "Test transcription with Unicode: áéíóú ñ" diff --git a/tests/processing_pipeline/test_stage_3.py b/tests/processing_pipeline/test_stage_3.py index 0c79a7b..2598b48 100644 --- a/tests/processing_pipeline/test_stage_3.py +++ b/tests/processing_pipeline/test_stage_3.py @@ -18,7 +18,7 @@ class TestStage3: @pytest.fixture def mock_supabase_client(self): """Create a mock Supabase client""" - with patch("processing_pipeline.stage_3.SupabaseClient") as MockSupabaseClient: + with patch("processing_pipeline.stage_3.flows.SupabaseClient") as MockSupabaseClient: mock_client = Mock() mock_client.get_snippet_by_id.return_value = None mock_client.get_a_new_snippet_and_reserve_it.return_value = None @@ -179,7 +179,7 @@ def test_process_snippet_skip_review_false( error_message=None, ) - @patch("processing_pipeline.stage_3.postprocess_snippet") + @patch("processing_pipeline.stage_3.tasks.postprocess_snippet") @patch("processing_pipeline.stage_3.Stage3Executor.run") def test_process_snippet_skip_review_true( self, mock_run, mock_postprocess, mock_supabase_client, sample_snippet, mock_gemini_response @@ -315,7 +315,7 @@ def test_in_depth_analysis_with_repeat(self, mock_supabase_client, mock_s3_clien ] with patch("os.remove"), patch("time.sleep") as mock_sleep, patch( - "processing_pipeline.stage_3.process_snippet" + "processing_pipeline.stage_3.flows.process_snippet" ) as mock_process: try: