Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions prompts/Stage_3_output_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
"political_leaning"
],
"properties": {
"is_convertible": {
"type": "boolean",
"description": "Indicates whether the provided text can be converted into a valid JSON object according to the given schema. Only use when asked to convert provided text into a valid JSON object."
},
Comment on lines +18 to +21
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Make is_convertible required to match Stage 3 code; add numeric bounds for consistency.

Stage 3 does parsed_response["is_convertible"] (direct indexing). If the field isn’t emitted (it’s optional in the schema), that will raise a KeyError. Add it to the root required list. Also, enforce numeric ranges declared in descriptions to align with Pydantic validations.

Apply:

@@
-    "required": [
+    "required": [
         "transcription",
         "translation",
         "title",
         "summary",
         "explanation",
         "disinformation_categories",
         "keywords_detected",
         "language",
         "context",
         "confidence_scores",
         "emotional_tone",
-        "political_leaning"
+        "political_leaning",
+        "is_convertible"
     ],
@@
         "confidence_scores": {
             "type": "object",
             "required": ["overall", "analysis", "categories"],
             "properties": {
                 "overall": {
-                    "type": "integer",
+                    "type": "integer",
+                    "minimum": 0,
+                    "maximum": 100,
                     "description": "Overall confidence score of the analysis, ranging from 0 to 100."
                 },
@@
                             "score": {
-                                "type": "integer",
+                                "type": "integer",
+                                "minimum": 0,
+                                "maximum": 100,
                                 "description": "Confidence score for this category, ranging from 0 to 100."
                             }
@@
                     "intensity": {
-                        "type": "integer",
+                        "type": "integer",
+                        "minimum": 0,
+                        "maximum": 100,
                         "description": "Intensity of the emotion, ranging from 0 to 100."
                     },
@@
                 "score": {
-                    "type": "number",
+                    "type": "number",
+                    "minimum": -1.0,
+                    "maximum": 1.0,
                     "description": "Political leaning score, ranging from -1.0 to 1.0."
                 },
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"is_convertible": {
"type": "boolean",
"description": "Indicates whether the provided text can be converted into a valid JSON object according to the given schema. Only use when asked to convert provided text into a valid JSON object."
},
"required": [
"transcription",
"translation",
"title",
"summary",
"explanation",
"disinformation_categories",
"keywords_detected",
"language",
"context",
"confidence_scores",
"emotional_tone",
"political_leaning",
"is_convertible"
],
"properties": {
"confidence_scores": {
"type": "object",
"required": ["overall", "analysis", "categories"],
"properties": {
"overall": {
"type": "integer",
"minimum": 0,
"maximum": 100,
"description": "Overall confidence score of the analysis, ranging from 0 to 100."
},
"analysis": {
/* existing schema */
},
"categories": {
"type": "array",
"items": {
"type": "object",
"properties": {
"score": {
"type": "integer",
"minimum": 0,
"maximum": 100,
"description": "Confidence score for this category, ranging from 0 to 100."
}
}
}
}
}
},
"emotional_tone": {
"type": "object",
"properties": {
"intensity": {
"type": "integer",
"minimum": 0,
"maximum": 100,
"description": "Intensity of the emotion, ranging from 0 to 100."
}
/* existing properties */
}
},
"political_leaning": {
"type": "number",
"minimum": -1.0,
"maximum": 1.0,
"description": "Political leaning score, ranging from -1.0 to 1.0."
},
"is_convertible": {
"type": "boolean",
"description": "Indicates whether the provided text can be converted into a valid JSON object according to the given schema. Only use when asked to convert provided text into a valid JSON object."
}
/* other properties */
}
🤖 Prompt for AI Agents
In prompts/Stage_3_output_schema.json around lines 18-21, the is_convertible
boolean is currently optional but Stage 3 code indexes
parsed_response["is_convertible"], which will raise a KeyError if missing; add
"is_convertible" to the root "required" array so it is always emitted. Also, for
any numeric fields in this schema whose descriptions mention ranges, add the
appropriate "minimum" and/or "maximum" properties to their schemas to enforce
those bounds (so Pydantic validation matches the documented ranges).

"transcription": {
"type": "string",
"description": "Transcription of the entire audio clip in the original language."
Expand Down
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ prefect==3.0.1
boto3==1.35.15
selenium==4.24.0
webdriver-manager==4.0.2
google-generativeai==0.8.3
google-genai==1.11.0
psutil==6.0.0
sentry-sdk==2.14.0
Expand Down
259 changes: 202 additions & 57 deletions src/processing_pipeline/stage_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
from google import genai
import json
import boto3
from pydantic import ValidationError

from prefect.task_runners import ConcurrentTaskRunner
from google.genai.types import (
File,
FinishReason,
GenerateContentConfig,
GoogleSearch,
HarmBlockThreshold,
HarmCategory,
SafetySetting,
ThinkingConfig,
Tool,
)
from processing_pipeline.supabase_utils import SupabaseClient
from processing_pipeline.constants import (
Expand All @@ -21,6 +25,7 @@
get_output_schema_for_stage_3,
get_user_prompt_for_stage_3,
)
from processing_pipeline.stage_3_models import Stage3Output
from utils import optional_flow, optional_task


Expand Down Expand Up @@ -79,6 +84,7 @@ def update_snippet_in_supabase(
emotional_tone,
context,
political_leaning,
grounding_metadata,
status,
error_message,
):
Expand All @@ -96,6 +102,7 @@ def update_snippet_in_supabase(
emotional_tone=emotional_tone,
context=context,
political_leaning=political_leaning,
grounding_metadata=grounding_metadata,
status=status,
error_message=error_message,
)
Expand Down Expand Up @@ -150,7 +157,7 @@ def process_snippet(supabase_client, snippet, local_file, gemini_key):
metadata = get_metadata(snippet)
print(f"Metadata:\n{json.dumps(metadata, indent=2)}")

pro_response = Stage3Executor.run(
response, grounding_metadata = Stage3Executor.run(
gemini_key=gemini_key,
model_name=GeminiModel.GEMINI_FLASH_LATEST,
audio_file=local_file,
Expand All @@ -160,18 +167,19 @@ def process_snippet(supabase_client, snippet, local_file, gemini_key):
update_snippet_in_supabase(
supabase_client=supabase_client,
snippet_id=snippet["id"],
transcription=pro_response["transcription"],
translation=pro_response["translation"],
title=pro_response["title"],
summary=pro_response["summary"],
explanation=pro_response["explanation"],
disinformation_categories=pro_response["disinformation_categories"],
keywords_detected=pro_response["keywords_detected"],
language=pro_response["language"],
confidence_scores=pro_response["confidence_scores"],
emotional_tone=pro_response["emotional_tone"],
context=pro_response["context"],
political_leaning=pro_response["political_leaning"],
transcription=response["transcription"],
translation=response["translation"],
title=response["title"],
summary=response["summary"],
explanation=response["explanation"],
disinformation_categories=response["disinformation_categories"],
keywords_detected=response["keywords_detected"],
language=response["language"],
confidence_scores=response["confidence_scores"],
emotional_tone=response["emotional_tone"],
context=response["context"],
political_leaning=response["political_leaning"],
grounding_metadata=grounding_metadata,
status="Ready for review",
error_message=None,
)
Expand Down Expand Up @@ -246,66 +254,203 @@ class Stage3Executor:
OUTPUT_SCHEMA = get_output_schema_for_stage_3()

@classmethod
def run(cls, gemini_key, model_name, audio_file, metadata):
def run(
cls,
gemini_key: str,
model_name: GeminiModel,
audio_file: str,
metadata: dict,
):
"""
Main execution method for Stage 3 analysis.

Performs two-stage processing with validation optimization:
1. Step 1: Analyze audio with Google Search enabled
2. Validate: Try to validate response with Pydantic model
3. Step 2 (conditional): If validation fails, restructure with response_schema

Args:
gemini_key: Google Gemini API key
model_name: Name of the Gemini model to use
audio_file: Path to the audio file
metadata: Metadata dictionary for the audio clip

Returns:
dict: Structured and validated analysis output
"""
if not gemini_key:
raise ValueError("Google Gemini API key was not set!")

client = genai.Client(api_key=gemini_key)

# Upload the audio file and wait for it to finish processing
audio_file = client.files.upload(file=audio_file)
while audio_file.state.name == "PROCESSING":
uploaded_audio_file = client.files.upload(file=audio_file)
while uploaded_audio_file.state.name == "PROCESSING":
print("Processing the uploaded audio file...")
time.sleep(1)
audio_file = client.files.get(name=audio_file.name)
uploaded_audio_file = client.files.get(name=uploaded_audio_file.name)

# Prepare the user prompt
user_prompt = (
f"{cls.USER_PROMPT}\n\nHere is the metadata of the attached audio clip:\n{json.dumps(metadata, indent=2)}"
)

try:
result = client.models.generate_content(
model=model_name,
contents=[audio_file, user_prompt],
config=GenerateContentConfig(
response_mime_type="application/json",
response_schema=cls.OUTPUT_SCHEMA,
system_instruction=cls.SYSTEM_INSTRUCTION,
max_output_tokens=16384,
thinking_config=ThinkingConfig(thinking_budget=4096),
safety_settings=[
SafetySetting(
category=HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT,
threshold=HarmBlockThreshold.BLOCK_NONE,
),
SafetySetting(
category=HarmCategory.HARM_CATEGORY_HATE_SPEECH,
threshold=HarmBlockThreshold.BLOCK_NONE,
),
SafetySetting(
category=HarmCategory.HARM_CATEGORY_HARASSMENT,
threshold=HarmBlockThreshold.BLOCK_NONE,
),
SafetySetting(
category=HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT,
threshold=HarmBlockThreshold.BLOCK_NONE,
),
SafetySetting(
category=HarmCategory.HARM_CATEGORY_CIVIC_INTEGRITY,
threshold=HarmBlockThreshold.BLOCK_NONE,
),
],
),
# Step 1: Analyze with Google Search
analysis_text, grounding_metadata = cls.__analyze_with_search(
client,
model_name,
user_prompt,
uploaded_audio_file,
)

if not result.parsed:
finish_reason = result.candidates[0].finish_reason
if finish_reason == FinishReason.MAX_TOKENS:
raise ValueError("The response from Gemini was too long and was cut off.")
print(f"Response finish reason: {finish_reason}")
raise ValueError("No response from Gemini.")
# Try to validate with Pydantic model first
validated_output = cls.__validate_with_pydantic(analysis_text)

return result.parsed
if validated_output:
return validated_output, grounding_metadata

# Step 2: Structure with response_schema (if validation failed)
return cls.__structure_with_schema(client, analysis_text), grounding_metadata
finally:
client.files.delete(name=audio_file.name)
client.files.delete(name=uploaded_audio_file.name)

@classmethod
def __analyze_with_search(
cls,
client: genai.Client,
model_name: GeminiModel,
user_prompt: str,
audio_file: File,
):
"""
Step 1: Analyze audio with Google Search tool enabled.

Returns:
str: The response text from Gemini
"""
print("Analyzing audio with web search...")

response = client.models.generate_content(
model=model_name,
contents=[user_prompt, audio_file],
config=GenerateContentConfig(
system_instruction=cls.SYSTEM_INSTRUCTION,
max_output_tokens=16384,
tools=[Tool(google_search=GoogleSearch())],
thinking_config=ThinkingConfig(thinking_budget=4096),
safety_settings=cls.__get_safety_settings(),
),
)

grounding_metadata = str(response.candidates[0].grounding_metadata) if response.candidates else None

if not response.text:
finish_reason = response.candidates[0].finish_reason
if finish_reason == FinishReason.MAX_TOKENS:
raise ValueError("The response from Gemini was too long and was cut off in step 1.")
print(f"Response finish reason: {finish_reason}")
raise ValueError("No response from Gemini in step 1.")
Comment on lines +346 to +353
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Guard access to candidates when response.text is empty.

Indexing response.candidates[0] can raise when candidates is empty. Guard it as you do elsewhere.

-        if not response.text:
-            finish_reason = response.candidates[0].finish_reason
+        if not response.text:
+            finish_reason = response.candidates[0].finish_reason if response.candidates else None
             if finish_reason == FinishReason.MAX_TOKENS:
                 raise ValueError("The response from Gemini was too long and was cut off in step 1.")
             print(f"Response finish reason: {finish_reason}")
             raise ValueError("No response from Gemini in step 1.")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
grounding_metadata = str(response.candidates[0].grounding_metadata) if response.candidates else None
if not response.text:
finish_reason = response.candidates[0].finish_reason
if finish_reason == FinishReason.MAX_TOKENS:
raise ValueError("The response from Gemini was too long and was cut off in step 1.")
print(f"Response finish reason: {finish_reason}")
raise ValueError("No response from Gemini in step 1.")
grounding_metadata = str(response.candidates[0].grounding_metadata) if response.candidates else None
if not response.text:
finish_reason = response.candidates[0].finish_reason if response.candidates else None
if finish_reason == FinishReason.MAX_TOKENS:
raise ValueError("The response from Gemini was too long and was cut off in step 1.")
print(f"Response finish reason: {finish_reason}")
raise ValueError("No response from Gemini in step 1.")
🧰 Tools
🪛 Ruff (0.13.3)

351-351: Avoid specifying long messages outside the exception class

(TRY003)


353-353: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In src/processing_pipeline/stage_3.py around lines 346 to 353, the code accesses
response.candidates[0] when response.text is empty which will raise if
candidates is empty; guard candidate access by checking if response.candidates
and len(response.candidates) > 0 before reading grounding_metadata or
finish_reason, use a safe fallback (e.g., grounding_metadata = None and
finish_reason = None or a descriptive default) and adjust the error handling to
reference the safe finish_reason when present, then raise the existing
ValueErrors accordingly.


return response.text, grounding_metadata

@classmethod
def __validate_with_pydantic(cls, response_text: str):
"""
Attempts to validate the response text with the Pydantic model.

Returns:
dict: Validated and structured output if successful
None: If validation fails
"""
try:
print("Attempting to validate response with Pydantic model...")
start_idx = response_text.find("{")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JSON extraction in __validate_with_pydantic uses simple substring search (find and rfind). Consider a more robust extraction (e.g., regex or stricter parsing) in case the response contains extra text.

Suggested change
start_idx = response_text.find("{")
import re; match = re.search(r'{.*}', response_text, re.DOTALL); start_idx = match.start() if match else -1

end_idx = response_text.rfind("}")
Comment on lines +368 to +369
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Extracting the JSON object by finding the first { and the last } is a bit brittle. This approach can fail if the model's response includes { or } characters in explanatory text outside the main JSON object, or within string values inside the JSON itself.

A more robust method would be to use a regular expression to first look for a JSON object enclosed in markdown code fences (e.g., ```json ... ```), which is a common pattern for language models. If that pattern isn't found, you can then fall back to the current find/rfind logic. This would make the parsing more resilient to variations in the model's output format.

Here is an example of how you could implement this, which would require adding import re at the top of the file:

import re

# ... inside __validate_with_pydantic
json_str = None
# Try to find JSON in a markdown block
match = re.search(r"```json\s*(\{.*?\})\s*```", response_text, re.DOTALL)
if match:
    json_str = match.group(1)
else:
    # Fallback to finding the first and last brace
    start_idx = response_text.find("{")
    end_idx = response_text.rfind("}")
    if start_idx != -1 and end_idx != -1:
        json_str = response_text[start_idx : end_idx + 1]

if not json_str:
    print("No JSON object found in the response.")
    return None

try:
    parsed = Stage3Output.model_validate_json(json_str)
    # ...
except ValidationError as e:
    # ...


if start_idx == -1 or end_idx == -1:
print("No JSON object found in the response.")
return None

parsed = Stage3Output.model_validate_json(response_text[start_idx : end_idx + 1])
print("Validation successful - returning structured output")
return parsed.model_dump()
Comment on lines +375 to +377
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Dump Pydantic with aliases to match schema (language.register).

Without by_alias=True, you’ll emit register_ instead of register, diverging from the schema and DB payload.

-            return parsed.model_dump()
+            return parsed.model_dump(by_alias=True)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
parsed = Stage3Output.model_validate_json(response_text[start_idx : end_idx + 1])
print("Validation successful - returning structured output")
return parsed.model_dump()
parsed = Stage3Output.model_validate_json(response_text[start_idx : end_idx + 1])
print("Validation successful - returning structured output")
return parsed.model_dump(by_alias=True)
🤖 Prompt for AI Agents
In src/processing_pipeline/stage_3.py around lines 375 to 377, the code
currently calls parsed.model_dump() which emits field names using Python
attribute names (e.g., register_) instead of the schema aliases (e.g.,
register); update the dump call to include aliases by using
parsed.model_dump(by_alias=True) so the serialized output matches the expected
schema/DB payload.

except ValidationError as e:
print(f"Validation failed: {e}")
return None

@classmethod
def __structure_with_schema(
cls,
client: genai.Client,
analysis_text: str,
):
"""
Step 2: Structure the analysis results using response_schema.

Returns:
dict: Structured and validated output
"""
print("Restructuring response with schema validation...")

system_instruction = """You are a helpful assistant whose task is to convert provided text into a valid JSON object following a given schema. Your responsibilities are:

1. **Validation**: Check if the provided text can be converted into a valid JSON object that adheres to the specified schema.
2. **Conversion**:
- If the text is convertible, convert it into a valid JSON object according to the schema.
- Set field `"is_convertible": true` in the JSON object.
3. **Error Handling**:
- If the text is not convertible (e.g., missing fields, incorrect data types), return a JSON object with the field `"is_convertible": false`."""

user_prompt = f"Please structure the following analysis text into the required JSON format:\n\n{analysis_text}"

response = client.models.generate_content(
model=GeminiModel.GEMINI_FLASH_LATEST,
contents=[user_prompt],
config=GenerateContentConfig(
response_mime_type="application/json",
response_schema=cls.OUTPUT_SCHEMA,
system_instruction=system_instruction,
max_output_tokens=8192,
thinking_config=ThinkingConfig(thinking_budget=0),
safety_settings=cls.__get_safety_settings(),
),
)

parsed_response = response.parsed

if not parsed_response:
finish_reason = response.candidates[0].finish_reason if response.candidates else None
if finish_reason == FinishReason.MAX_TOKENS:
raise ValueError("The response from Gemini was too long and was cut off in step 2.")
raise ValueError(f"No response from Gemini in step 2. Response finished with reason: {finish_reason}")
Comment on lines +422 to +426
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Guard candidate access when parsed_response is None.

At line 423, accessing response.candidates[0].finish_reason will raise an IndexError if candidates is empty:

         if not parsed_response:
-            finish_reason = response.candidates[0].finish_reason if response.candidates else None
+            finish_reason = (response.candidates[0].finish_reason 
+                           if (response.candidates and len(response.candidates) > 0) 
+                           else None)
             if finish_reason == FinishReason.MAX_TOKENS:
                 raise ValueError("The response from Gemini was too long and was cut off in step 2.")
             raise ValueError(f"No response from Gemini in step 2. Response finished with reason: {finish_reason}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if not parsed_response:
finish_reason = response.candidates[0].finish_reason if response.candidates else None
if finish_reason == FinishReason.MAX_TOKENS:
raise ValueError("The response from Gemini was too long and was cut off in step 2.")
raise ValueError(f"No response from Gemini in step 2. Response finished with reason: {finish_reason}")
if not parsed_response:
finish_reason = (response.candidates[0].finish_reason
if (response.candidates and len(response.candidates) > 0)
else None)
if finish_reason == FinishReason.MAX_TOKENS:
raise ValueError("The response from Gemini was too long and was cut off in step 2.")
raise ValueError(f"No response from Gemini in step 2. Response finished with reason: {finish_reason}")
🧰 Tools
🪛 Ruff (0.13.3)

425-425: Avoid specifying long messages outside the exception class

(TRY003)


426-426: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In src/processing_pipeline/stage_3.py around lines 422–426, guard access to
response.candidates before indexing: compute finish_reason by first checking
that response.candidates is truthy and non-empty (e.g., use a length check or
next(iter(response.candidates), None)) and only then read finish_reason from
that candidate; if no candidate exists set finish_reason = None and retain the
existing error raising logic so you never do response.candidates[0] without
verifying a candidate is present.


if not parsed_response.get("is_convertible"):
raise ValueError("[Stage 3] The response from Gemini could not be converted to the required schema.")

return parsed_response

@classmethod
def __get_safety_settings(cls):
return [
SafetySetting(
category=HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT,
threshold=HarmBlockThreshold.BLOCK_NONE,
),
SafetySetting(
category=HarmCategory.HARM_CATEGORY_HATE_SPEECH,
threshold=HarmBlockThreshold.BLOCK_NONE,
),
SafetySetting(
category=HarmCategory.HARM_CATEGORY_HARASSMENT,
threshold=HarmBlockThreshold.BLOCK_NONE,
),
SafetySetting(
category=HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT,
threshold=HarmBlockThreshold.BLOCK_NONE,
),
SafetySetting(
category=HarmCategory.HARM_CATEGORY_CIVIC_INTEGRITY,
threshold=HarmBlockThreshold.BLOCK_NONE,
),
]
Loading