Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 35 additions & 12 deletions src/processing_pipeline/stage_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def update_snippet_in_supabase(
snippet_id,
gemini_response,
grounding_metadata,
analyzed_by,
status,
error_message,
):
Expand All @@ -98,6 +99,7 @@ def update_snippet_in_supabase(
context=gemini_response["context"],
political_leaning=gemini_response["political_leaning"],
grounding_metadata=grounding_metadata,
analyzed_by=analyzed_by,
status=status,
error_message=error_message,
)
Expand Down Expand Up @@ -151,34 +153,46 @@ def analyze_snippet(gemini_key, audio_file, metadata):

try:
print(f"Attempting analysis with {main_model}")
return Stage3Executor.run(
analyzing_response = Stage3Executor.run(
gemini_key=gemini_key,
model_name=main_model,
audio_file=audio_file,
metadata=metadata,
)
return {
**analyzing_response,
"analyzed_by": main_model,
}
except errors.ServerError as e:
print(f"Server error with {main_model} (code {e.code}): {e.message}")
print(f"Falling back to {fallback_model}")
return Stage3Executor.run(
analyzing_response = Stage3Executor.run(
gemini_key=gemini_key,
model_name=fallback_model,
audio_file=audio_file,
metadata=metadata,
)
return {
**analyzing_response,
"analyzed_by": fallback_model,
}
except errors.ClientError as e:
if e.code in [HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN]:
print(f"Auth error with {main_model} (code {e.code}): {e.message}")
raise
else:
print(f"Client error with {main_model} (code {e.code}): {e.message}")
print(f"Falling back to {fallback_model}")
return Stage3Executor.run(
analyzing_response = Stage3Executor.run(
gemini_key=gemini_key,
model_name=fallback_model,
audio_file=audio_file,
metadata=metadata,
)
return {
**analyzing_response,
"analyzed_by": fallback_model,
}
Comment on lines 154 to +195
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There's some code duplication in the except blocks for ServerError and ClientError, as they both contain the same fallback logic. You can combine these two except blocks to reduce code repetition and improve maintainability.

    try:
        print(f"Attempting analysis with {main_model}")
        analyzing_response = Stage3Executor.run(
            gemini_key=gemini_key,
            model_name=main_model,
            audio_file=audio_file,
            metadata=metadata,
        )
        return {
            **analyzing_response,
            "analyzed_by": main_model,
        }
    except (errors.ServerError, errors.ClientError) as e:
        if isinstance(e, errors.ClientError) and e.code in [
            HTTPStatus.UNAUTHORIZED,
            HTTPStatus.FORBIDDEN,
        ]:
            print(f"Auth error with {main_model} (code {e.code}): {e.message}")
            raise

        error_type = type(e).__name__
        print(f"{error_type} with {main_model} (code {e.code}): {e.message}")
        print(f"Falling back to {fallback_model}")
        analyzing_response = Stage3Executor.run(
            gemini_key=gemini_key,
            model_name=fallback_model,
            audio_file=audio_file,
            metadata=metadata,
        )
        return {
            **analyzing_response,
            "analyzed_by": fallback_model,
        }



@optional_task(log_prints=True)
Expand All @@ -189,30 +203,33 @@ def process_snippet(supabase_client, snippet, local_file, gemini_key, skip_revie
metadata = get_metadata(snippet)
print(f"Metadata:\n{json.dumps(metadata, indent=2, ensure_ascii=False)}")

response, grounding_metadata = analyze_snippet(
analyzing_response = analyze_snippet(
gemini_key=gemini_key,
audio_file=local_file,
metadata=metadata,
)

status = "Processed" if skip_review else "Ready for review"
status = ProcessingStatus.PROCESSED if skip_review else ProcessingStatus.READY_FOR_REVIEW
update_snippet_in_supabase(
supabase_client=supabase_client,
snippet_id=snippet["id"],
gemini_response=response,
grounding_metadata=grounding_metadata,
gemini_response=analyzing_response["response"],
grounding_metadata=analyzing_response["grounding_metadata"],
analyzed_by=analyzing_response["analyzed_by"],
status=status,
error_message=None,
)

if skip_review:
postprocess_snippet(supabase_client, snippet["id"], response["disinformation_categories"])
postprocess_snippet(
supabase_client, snippet["id"], analyzing_response["response"]["disinformation_categories"]
)

print(f"Processing completed for audio file {local_file} - snippet ID: {snippet['id']}")

except Exception as e:
print(f"Failed to process {local_file}: {e}")
supabase_client.set_snippet_status(snippet["id"], "Error", str(e))
supabase_client.set_snippet_status(snippet["id"], ProcessingStatus.ERROR, str(e))


def reset_snippet_status_hook(flow: Flow, flow_run: FlowRun, state: State):
Expand Down Expand Up @@ -255,7 +272,7 @@ def in_depth_analysis(snippet_ids, skip_review, repeat):
for id in snippet_ids:
snippet = fetch_a_specific_snippet_from_supabase(supabase_client, id)
if snippet:
supabase_client.set_snippet_status(snippet["id"], "Processing")
supabase_client.set_snippet_status(snippet["id"], ProcessingStatus.PROCESSING)
print(f"Found the snippet: {snippet['id']}")
local_file = download_audio_file_from_s3(s3_client, R2_BUCKET_NAME, snippet["file_path"])

Expand Down Expand Up @@ -353,10 +370,16 @@ def run(
validated_output = cls.__validate_with_pydantic(analysis_text)

if validated_output:
return validated_output, grounding_metadata
return {
"response": validated_output,
"grounding_metadata": grounding_metadata,
}

# Step 2: Structure with response_schema (if validation failed)
return cls.__structure_with_schema(client, analysis_text), grounding_metadata
return {
"response": cls.__structure_with_schema(client, analysis_text),
"grounding_metadata": grounding_metadata,
}
Comment on lines +373 to +382
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Validate downstream callers for the new dict return.

Stage3Executor.run previously yielded positional values; after this change it returns a mapping. Any remaining callers still unpacking the old tuple will now break.


🏁 Script executed:

#!/bin/bash
# Locate every usage of Stage3Executor.run to ensure callers expect a dict.
rg -n "Stage3Executor\.run" --type=py

Length of output: 932


I need to examine how callers actually use the return value from Stage3Executor.run to verify if the review concern is valid.


---

🏁 Script executed:

```shell
#!/bin/bash
# Extract context around production calls to Stage3Executor.run in stage_3.py
echo "=== Line 156 context ===" 
sed -n '150,165p' src/processing_pipeline/stage_3.py

echo -e "\n=== Line 169 context ===" 
sed -n '163,178p' src/processing_pipeline/stage_3.py

echo -e "\n=== Line 186 context ===" 
sed -n '180,195p' src/processing_pipeline/stage_3.py

Length of output: 1802


🏁 Script executed:

#!/bin/bash
# Extract context around test calls to Stage3Executor.run
echo "=== Test line 265 context ===" 
sed -n '260,275p' tests/processing_pipeline/test_stage_3.py

echo -e "\n=== Test line 282 context ===" 
sed -n '277,290p' tests/processing_pipeline/test_stage_3.py

Length of output: 1386


🏁 Script executed:

#!/bin/bash
# Search for any tuple unpacking patterns with Stage3Executor.run or analyzing_response
rg -n -A2 "analyzing_response\s*=" src/processing_pipeline/stage_3.py | head -40

Length of output: 549


🏁 Script executed:

#!/bin/bash
# Check for any tuple unpacking patterns in tests
rg -n -A2 "result\s*=" tests/processing_pipeline/test_stage_3.py | grep -A2 "Stage3Executor"

Length of output: 139


🏁 Script executed:

#!/bin/bash
# Look for tuple unpacking (a, b = ...) patterns with analyzing_response or Stage3Executor
rg -n "\w+\s*,\s*\w+\s*=.*Stage3Executor" src/ tests/

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Confirm the exact tuple unpacking in test at line 265-276
sed -n '265,280p' tests/processing_pipeline/test_stage_3.py

Length of output: 593



---

🏁 Script executed:

```shell
#!/bin/bash
# Identify the test function containing the tuple unpacking at line 265-276
sed -n '250,276p' tests/processing_pipeline/test_stage_3.py | grep -n "def test"

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Get full context of the test method
sed -n '245,280p' tests/processing_pipeline/test_stage_3.py

Length of output: 1472



---

🏁 Script executed:

```shell
#!/bin/bash
# Find the test function name by looking backwards from line 265
sed -n '230,265p' tests/processing_pipeline/test_stage_3.py | tail -40

Length of output: 1527


🏁 Script executed:

#!/bin/bash
# Search for def test functions near line 265
rg -n "def test_" tests/processing_pipeline/test_stage_3.py | grep -A1 -B1 "153\|183\|265\|338"

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Get broader context to find function name
awk 'NR>=230 && NR<=280' tests/processing_pipeline/test_stage_3.py

Length of output: 2084


Update test assertions to expect dict instead of tuple.

The test test_stage_3_executor in tests/processing_pipeline/test_stage_3.py at lines 273-275 explicitly expects Stage3Executor.run to return a tuple and attempts to unpack it. With the change to return a dict, these assertions and the unpacking will break:

assert isinstance(result, tuple)
assert len(result) == 2
response, grounding_metadata = result

Update the test to access dict keys instead: result["response"] and result["grounding_metadata"].

🤖 Prompt for AI Agents
In src/processing_pipeline/stage_3.py around lines 373 to 382,
Stage3Executor.run now returns a dict with keys "response" and
"grounding_metadata" but the tests still expect and unpack a tuple; update the
test assertions in tests/processing_pipeline/test_stage_3.py (around lines
273-275) to stop asserting tuple semantics and instead access result["response"]
and result["grounding_metadata"], removing the tuple type/length checks and the
unpacking.

finally:
client.files.delete(name=uploaded_audio_file.name)

Expand Down
2 changes: 2 additions & 0 deletions src/processing_pipeline/supabase_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ def update_snippet(
context,
political_leaning,
grounding_metadata,
analyzed_by,
status,
error_message
):
Expand All @@ -218,6 +219,7 @@ def update_snippet(
"context": context,
"political_leaning": political_leaning,
"grounding_metadata": grounding_metadata,
"analyzed_by": analyzed_by,
"previous_analysis": None,
"status": status,
"error_message": error_message,
Expand Down