From 3f6dec56996077965e06a48abe7db083923cd8a6 Mon Sep 17 00:00:00 2001 From: Quan Cao Date: Tue, 11 Nov 2025 14:53:01 +0700 Subject: [PATCH] Add analyzed_by field --- src/processing_pipeline/stage_3.py | 47 +++++++++++++++++------ src/processing_pipeline/supabase_utils.py | 2 + 2 files changed, 37 insertions(+), 12 deletions(-) diff --git a/src/processing_pipeline/stage_3.py b/src/processing_pipeline/stage_3.py index 904528d..4cb270a 100644 --- a/src/processing_pipeline/stage_3.py +++ b/src/processing_pipeline/stage_3.py @@ -80,6 +80,7 @@ def update_snippet_in_supabase( snippet_id, gemini_response, grounding_metadata, + analyzed_by, status, error_message, ): @@ -98,6 +99,7 @@ def update_snippet_in_supabase( context=gemini_response["context"], political_leaning=gemini_response["political_leaning"], grounding_metadata=grounding_metadata, + analyzed_by=analyzed_by, status=status, error_message=error_message, ) @@ -151,21 +153,29 @@ def analyze_snippet(gemini_key, audio_file, metadata): try: print(f"Attempting analysis with {main_model}") - return Stage3Executor.run( + analyzing_response = Stage3Executor.run( gemini_key=gemini_key, model_name=main_model, audio_file=audio_file, metadata=metadata, ) + return { + **analyzing_response, + "analyzed_by": main_model, + } except errors.ServerError as e: print(f"Server error with {main_model} (code {e.code}): {e.message}") print(f"Falling back to {fallback_model}") - return Stage3Executor.run( + analyzing_response = Stage3Executor.run( gemini_key=gemini_key, model_name=fallback_model, audio_file=audio_file, metadata=metadata, ) + return { + **analyzing_response, + "analyzed_by": fallback_model, + } except errors.ClientError as e: if e.code in [HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN]: print(f"Auth error with {main_model} (code {e.code}): {e.message}") @@ -173,12 +183,16 @@ def analyze_snippet(gemini_key, audio_file, metadata): else: print(f"Client error with {main_model} (code {e.code}): {e.message}") print(f"Falling back to {fallback_model}") - return Stage3Executor.run( + analyzing_response = Stage3Executor.run( gemini_key=gemini_key, model_name=fallback_model, audio_file=audio_file, metadata=metadata, ) + return { + **analyzing_response, + "analyzed_by": fallback_model, + } @optional_task(log_prints=True) @@ -189,30 +203,33 @@ def process_snippet(supabase_client, snippet, local_file, gemini_key, skip_revie metadata = get_metadata(snippet) print(f"Metadata:\n{json.dumps(metadata, indent=2, ensure_ascii=False)}") - response, grounding_metadata = analyze_snippet( + analyzing_response = analyze_snippet( gemini_key=gemini_key, audio_file=local_file, metadata=metadata, ) - status = "Processed" if skip_review else "Ready for review" + status = ProcessingStatus.PROCESSED if skip_review else ProcessingStatus.READY_FOR_REVIEW update_snippet_in_supabase( supabase_client=supabase_client, snippet_id=snippet["id"], - gemini_response=response, - grounding_metadata=grounding_metadata, + gemini_response=analyzing_response["response"], + grounding_metadata=analyzing_response["grounding_metadata"], + analyzed_by=analyzing_response["analyzed_by"], status=status, error_message=None, ) if skip_review: - postprocess_snippet(supabase_client, snippet["id"], response["disinformation_categories"]) + postprocess_snippet( + supabase_client, snippet["id"], analyzing_response["response"]["disinformation_categories"] + ) print(f"Processing completed for audio file {local_file} - snippet ID: {snippet['id']}") except Exception as e: print(f"Failed to process {local_file}: {e}") - supabase_client.set_snippet_status(snippet["id"], "Error", str(e)) + supabase_client.set_snippet_status(snippet["id"], ProcessingStatus.ERROR, str(e)) def reset_snippet_status_hook(flow: Flow, flow_run: FlowRun, state: State): @@ -255,7 +272,7 @@ def in_depth_analysis(snippet_ids, skip_review, repeat): for id in snippet_ids: snippet = fetch_a_specific_snippet_from_supabase(supabase_client, id) if snippet: - supabase_client.set_snippet_status(snippet["id"], "Processing") + supabase_client.set_snippet_status(snippet["id"], ProcessingStatus.PROCESSING) print(f"Found the snippet: {snippet['id']}") local_file = download_audio_file_from_s3(s3_client, R2_BUCKET_NAME, snippet["file_path"]) @@ -353,10 +370,16 @@ def run( validated_output = cls.__validate_with_pydantic(analysis_text) if validated_output: - return validated_output, grounding_metadata + return { + "response": validated_output, + "grounding_metadata": grounding_metadata, + } # Step 2: Structure with response_schema (if validation failed) - return cls.__structure_with_schema(client, analysis_text), grounding_metadata + return { + "response": cls.__structure_with_schema(client, analysis_text), + "grounding_metadata": grounding_metadata, + } finally: client.files.delete(name=uploaded_audio_file.name) diff --git a/src/processing_pipeline/supabase_utils.py b/src/processing_pipeline/supabase_utils.py index b907430..199e183 100644 --- a/src/processing_pipeline/supabase_utils.py +++ b/src/processing_pipeline/supabase_utils.py @@ -198,6 +198,7 @@ def update_snippet( context, political_leaning, grounding_metadata, + analyzed_by, status, error_message ): @@ -218,6 +219,7 @@ def update_snippet( "context": context, "political_leaning": political_leaning, "grounding_metadata": grounding_metadata, + "analyzed_by": analyzed_by, "previous_analysis": None, "status": status, "error_message": error_message,