From 37bcb4cfe695f34c1628a6ee47e4a1ad8fdb35b3 Mon Sep 17 00:00:00 2001 From: Quan Cao Date: Fri, 31 Oct 2025 11:30:17 +0700 Subject: [PATCH] Implement Gemini 2.5 Pro with Flash fallback in Stage 3 Add resilient error handling to Stage 3 analysis by implementing automatic fallback from Gemini 2.5 Pro to Flash on recoverable API errors - Extract analysis logic into analyze_snippet() task function - Fallback to Flash on server errors (5xx) and client errors except auth failures (401, 403) - Use HTTPStatus enum constants for clearer error code handling This improves processing reliability while maximizing use of the more capable Pro model when available --- src/processing_pipeline/stage_3.py | 46 +++++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/src/processing_pipeline/stage_3.py b/src/processing_pipeline/stage_3.py index 3222984..21af168 100644 --- a/src/processing_pipeline/stage_3.py +++ b/src/processing_pipeline/stage_3.py @@ -1,7 +1,9 @@ from datetime import datetime +from http import HTTPStatus import os import time from google import genai +from google.genai import errors import json import boto3 from pydantic import ValidationError @@ -140,16 +142,52 @@ def __get_metadata(snippet): @optional_task(log_prints=True) -def process_snippet(supabase_client, snippet, local_file, gemini_key, skip_review: bool): +def analyze_snippet(gemini_key, audio_file, metadata): + main_model = GeminiModel.GEMINI_2_5_PRO + fallback_model = GeminiModel.GEMINI_FLASH_LATEST + try: - print(f"Processing snippet: {local_file} with Gemini 2.5 Flash") + print(f"Attempting analysis with {main_model}") + return Stage3Executor.run( + gemini_key=gemini_key, + model_name=main_model, + audio_file=audio_file, + metadata=metadata, + ) + except errors.ServerError as e: + print(f"Server error with {main_model} (code {e.code}): {e.message}") + print(f"Falling back to {fallback_model}") + return Stage3Executor.run( + gemini_key=gemini_key, + model_name=fallback_model, + audio_file=audio_file, + metadata=metadata, + ) + except errors.ClientError as e: + if e.code in [HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN]: + print(f"Auth error with {main_model} (code {e.code}): {e.message}") + raise + else: + print(f"Client error with {main_model} (code {e.code}): {e.message}") + print(f"Falling back to {fallback_model}") + return Stage3Executor.run( + gemini_key=gemini_key, + model_name=fallback_model, + audio_file=audio_file, + metadata=metadata, + ) + +@optional_task(log_prints=True) +def process_snippet(supabase_client, snippet, local_file, gemini_key, skip_review: bool): + print(f"Processing snippet: {local_file}") + + try: metadata = get_metadata(snippet) print(f"Metadata:\n{json.dumps(metadata, indent=2, ensure_ascii=False)}") - response, grounding_metadata = Stage3Executor.run( + response, grounding_metadata = analyze_snippet( gemini_key=gemini_key, - model_name=GeminiModel.GEMINI_2_5_PRO, audio_file=local_file, metadata=metadata, )