Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
aiohttp==3.13.3
requests==2.32.5
urllib3>=2.6.3
python-ffmpeg==2.0.12
Expand All @@ -11,6 +12,7 @@ selenium==4.24.0
webdriver-manager==4.0.2
google-genai==1.62.0
google-adk==1.24.1
html2text==2025.4.15
psutil==6.0.0
sentry-sdk==2.14.0
pytest==8.3.3
Expand Down
4 changes: 4 additions & 0 deletions src/processing_pipeline/stage_3/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from processing_pipeline.constants import GeminiModel

MAIN_MODEL = GeminiModel.GEMINI_2_5_PRO
FALLBACK_MODEL = GeminiModel.GEMINI_2_5_FLASH
230 changes: 57 additions & 173 deletions src/processing_pipeline/stage_3/executors.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,30 @@
import asyncio
from datetime import datetime, timezone
import json
import os
import subprocess
import tempfile
import time
from typing import Any

from google import genai
from google.genai.types import (
AutomaticFunctionCallingConfig,
File,
FinishReason,
GenerateContentConfig,
GoogleSearch,
ThinkingConfig,
Tool,
)
from pydantic import ValidationError

from processing_pipeline.constants import (
GeminiCLIEventType,
GeminiModel,
)
from processing_pipeline.constants import GeminiModel
from processing_pipeline.processing_utils import get_safety_settings
from processing_pipeline.stage_3.models import Stage3Output
from utils import optional_task
from processing_pipeline.stage_3.web_tools import searxng_web_search, web_url_read


class Stage3Executor:
"""Executor for Stage 3 in-depth analysis."""

@classmethod
def run(
async def run_async(
cls,
gemini_key: str,
gemini_client: genai.Client,
model_name: GeminiModel,
audio_file: str,
metadata: dict,
Expand All @@ -41,13 +33,11 @@ def run(
"""
Main execution method for Stage 3 analysis.

Processing strategy:
1. Step 1: Try Gemini CLI with custom search, fallback to Google Genai SDK with Google Search grounding if CLI fails
2. Validate: Try to validate response with Pydantic model
3. Step 2 (conditional): If validation fails, restructure with response_schema
Uses the Google GenAI SDK with web search tools (searxng_web_search,
web_url_read) for fact checking via automatic function calling.

Args:
gemini_key: Google Gemini API key
gemini_client: Google GenAI client instance
model_name: Name of the Gemini model to use
audio_file: Path to the audio file
metadata: Metadata dictionary for the audio clip
Expand All @@ -56,206 +46,100 @@ def run(
Returns:
dict: Structured and validated analysis output
"""
if not gemini_key:
raise ValueError("Google Gemini API key was not set!")

client = genai.Client(api_key=gemini_key)

# Prepare the user prompt using the prompt version
# Prepare the user prompt
user_prompt = (
f"{prompt_version['user_prompt']}\n\n"
f"Here is the metadata of the attached audio clip:\n{json.dumps(metadata, indent=2)}\n\n"
f"Here is the current date and time: {datetime.now(timezone.utc).strftime('%B %-d, %Y %-I:%M %p UTC')}\n\n"
)

# Strategy: Try CLI first, fallback to SDK
analysis_text = None
thought_summaries_from_api = None
uploaded_audio_file = None
# Upload audio file
uploaded_audio_file = gemini_client.files.upload(file=audio_file)

try:
user_prompt_with_file = user_prompt + f"Here is the audio file attached: @{os.path.basename(audio_file)}"
analysis_text = cls.__analyze_with_custom_search(
model_name=model_name,
user_prompt=user_prompt_with_file,
system_instruction=prompt_version["system_instruction"],
)
except RuntimeError as e:
print("Falling back to Google Search grounding with SDK...")

uploaded_audio_file = client.files.upload(file=audio_file)
while uploaded_audio_file.state.name == "PROCESSING":
print("Processing the uploaded audio file...")
time.sleep(1)
uploaded_audio_file = client.files.get(name=uploaded_audio_file.name)

sdk_result = cls.__analyze_with_google_search_grounding(
client,
model_name,
user_prompt,
uploaded_audio_file,
await asyncio.sleep(1)
uploaded_audio_file = gemini_client.files.get(name=uploaded_audio_file.name)

# Analyze with web search tools
analysis_text, thought_summaries = await cls.__analyze_with_web_search(
gemini_client=gemini_client,
model_name=model_name,
uploaded_audio_file=uploaded_audio_file,
user_prompt=user_prompt,
system_instruction=prompt_version["system_instruction"],
)
analysis_text = sdk_result["text"]
thought_summaries_from_api = sdk_result.get("thought_summaries")

try:
# Try to validate with Pydantic model first
validated_output = cls.__validate_with_pydantic(analysis_text)

if validated_output:
thought_summaries = thought_summaries_from_api or validated_output.get("thought_summaries")
grounding_metadata = json.dumps(validated_output.get("verification_evidence"), indent=2)
return {
"response": validated_output,
"grounding_metadata": grounding_metadata,
"thought_summaries": thought_summaries,
}

# Step 2: Structure with response_schema (if validation failed)
structured_output = cls.__structure_with_schema(client, analysis_text, prompt_version["output_schema"])
thought_summaries = thought_summaries_from_api or structured_output.get("thought_summaries")
grounding_metadata = json.dumps(structured_output.get("verification_evidence"), indent=2)
# Validate with Pydantic, fall back to schema restructuring
output = cls.__validate_with_pydantic(analysis_text)

if not output:
output = await cls.__structure_with_schema(
gemini_client, analysis_text, prompt_version["output_schema"]
)

return {
"response": structured_output,
"grounding_metadata": grounding_metadata,
"thought_summaries": thought_summaries,
"response": output,
"grounding_metadata": json.dumps(output.get("verification_evidence"), indent=2),
"thought_summaries": thought_summaries or output.get("thought_summaries"),
}
finally:
if uploaded_audio_file:
client.files.delete(name=uploaded_audio_file.name)
gemini_client.files.delete(name=uploaded_audio_file.name)

@optional_task(log_prints=True, retries=3)
@classmethod
def __analyze_with_custom_search(
async def __analyze_with_web_search(
cls,
gemini_client: genai.Client,
model_name: GeminiModel,
uploaded_audio_file: File,
user_prompt: str,
system_instruction: str,
):
"""
Analyze using Gemini CLI with custom search tools (MCP-based).
Analyze using the GenAI SDK with web search tools.

This method uses the Gemini CLI which provides:
- Custom search via MCP tools
- Streaming JSON output
- System instruction from file
Uses searxng_web_search and web_url_read as plain Python function tools
with the SDK's automatic function calling.

Returns:
str: Final response text from Gemini CLI

Raises:
RuntimeError: If CLI execution fails (for fallback to SDK method)
tuple: (analysis_text, thought_summaries)
"""
print("Analyzing with Gemini CLI (custom search)...")

events: list[dict[str, Any]] = []
final_response = ""
timeout = 300

# Write system instruction to a temporary file for CLI
with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as tmp_file:
tmp_file.write(system_instruction)
system_instruction_path = tmp_file.name

env = {
"PATH": os.environ.get("PATH", ""),
"HOME": os.environ.get("HOME", ""),
"GEMINI_API_KEY": os.environ["GOOGLE_GEMINI_KEY"],
"GEMINI_SYSTEM_MD": system_instruction_path,
"SEARXNG_URL": os.environ.get("SEARXNG_URL", ""),
}

cmd = [
"gemini",
"--model",
model_name,
"--output-format",
"stream-json",
user_prompt,
]

try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
env=env,
timeout=timeout,
)

# Parse JSONL output
for line in result.stdout.strip().split("\n"):
if not line:
continue
try:
event = json.loads(line)
events.append(event)

# Concatenate assistant message content
if event.get("type") == GeminiCLIEventType.MESSAGE and event.get("role") == "assistant":
content = event.get("content")
if content and isinstance(content, str):
final_response += content
except json.JSONDecodeError:
pass

if result.returncode != 0:
raise RuntimeError(f"Gemini CLI exited with code {result.returncode}: {result.stderr}")

if not final_response:
raise RuntimeError("Gemini CLI returned no response")

return final_response

except subprocess.TimeoutExpired as e:
raise RuntimeError(f"Gemini CLI timed out after {timeout} seconds") from e
finally:
if os.path.exists(system_instruction_path):
os.remove(system_instruction_path)

@optional_task(log_prints=True, retries=3)
@classmethod
def __analyze_with_google_search_grounding(
cls,
client: genai.Client,
model_name: GeminiModel,
user_prompt: str,
uploaded_audio_file: File,
system_instruction: str,
):
print("Analyzing audio with web search...")
print("Analyzing with SDK + web search tools...")

response = client.models.generate_content(
response = await gemini_client.aio.models.generate_content(
model=model_name,
contents=[user_prompt, uploaded_audio_file],
config=GenerateContentConfig(
system_instruction=system_instruction,
max_output_tokens=16384,
tools=[Tool(google_search=GoogleSearch())],
max_output_tokens=32768,
tools=[searxng_web_search, web_url_read],
automatic_function_calling=AutomaticFunctionCallingConfig(
maximum_remote_calls=20,
),
thinking_config=ThinkingConfig(thinking_budget=4096, include_thoughts=True),
safety_settings=get_safety_settings(),
),
)

thoughts = ""
for part in response.candidates[0].content.parts:
if part.thought and part.text:
thoughts += part.text
if response.candidates and response.candidates[0].content:
for part in response.candidates[0].content.parts:
if part.thought and part.text:
thoughts += part.text

if not response.text:
finish_reason = response.candidates[0].finish_reason if response.candidates else None

if finish_reason == FinishReason.MAX_TOKENS:
raise ValueError("The response from Gemini was too long and was cut off in step 1.")
raise ValueError("The response from Gemini was too long and was cut off.")

print(f"Response finish reason: {finish_reason}")
raise ValueError("No response from Gemini in step 1.")
raise ValueError("No response from Gemini.")

return {
"text": response.text,
"thought_summaries": thoughts,
}
return response.text, thoughts

@classmethod
def __validate_with_pydantic(cls, response_text: str):
Expand All @@ -276,9 +160,9 @@ def __validate_with_pydantic(cls, response_text: str):
return None

@classmethod
def __structure_with_schema(
async def __structure_with_schema(
cls,
client: genai.Client,
gemini_client: genai.Client,
analysis_text: str,
output_schema: dict,
):
Expand All @@ -295,7 +179,7 @@ def __structure_with_schema(

user_prompt = f"Please structure the following analysis text into the required JSON format:\n\n{analysis_text}"

response = client.models.generate_content(
response = await gemini_client.aio.models.generate_content(
model=GeminiModel.GEMINI_2_5_FLASH,
contents=[user_prompt],
config=GenerateContentConfig(
Expand Down
Loading