Skip to content

VER-297: Reactivate the preprocess step: Initial detection in Stage 1#58

Merged
quancao-ea merged 4 commits intomainfrom
fix/reenable-stage-1-preprocess
Feb 2, 2026
Merged

VER-297: Reactivate the preprocess step: Initial detection in Stage 1#58
quancao-ea merged 4 commits intomainfrom
fix/reenable-stage-1-preprocess

Conversation

@quancao-ea
Copy link
Copy Markdown
Collaborator

@quancao-ea quancao-ea commented Feb 2, 2026

Important

Reactivates Stage 1 preprocess step with initial transcription and detection, adding new prompts and updating the processing pipeline.

  • Behavior:
    • Reactivates preprocess step in Stage 1 with initial transcription and detection.
    • Adds initial transcription and detection prompts in prompts/stage_1/preprocess/.
    • Updates initial_disinformation_detection() in flows.py to include new preprocess steps.
  • Prompts:
    • Adds initial_transcription_user_prompt.md and initial_detection_user_prompt.md for transcription and detection.
    • Adds initial_transcription_output_schema.json and initial_detection_output_schema.json for output schemas.
    • Updates import_prompts_to_db.py to include new prompt stages.
  • Pipeline:
    • Refactors stage_1.py into stage_1/ directory with executors.py, flows.py, and tasks.py.
    • Updates constants.py to include new prompt stages STAGE_1_INITIAL_TRANSCRIPTION and STAGE_1_INITIAL_DETECTION.
    • Modifies tasks.py to handle new transcription and detection processes.
  • Misc:
    • Removes old Stage 1 files and updates import paths accordingly.

This description was created by Ellipsis for 8163a06. You can customize this summary. It will automatically update as commits are pushed.

Summary by CodeRabbit

Release Notes

  • New Features

    • Added granular processing stages for initial transcription and initial disinformation detection.
    • Enhanced multilingual audio transcription support with improved cultural sensitivity and accuracy.
  • Refactor

    • Reorganized Stage 1 processing pipeline into a modular structure for improved maintainability.
    • Restructured prompt configurations into a clearer hierarchical directory layout.

✏️ Tip: You can customize this high-level summary in your review settings.

- Split main.py into executors.py, tasks.py, and flows.py
- Re-enable Stage 1 Preprocess
- Refactor executor classes to accept gemini_client instead of gemini_key
- Create Gemini client once in flows and pass through tasks to executors
@linear
Copy link
Copy Markdown

linear Bot commented Feb 2, 2026

Copy link
Copy Markdown
Contributor

@ellipsis-dev ellipsis-dev Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important

Looks good to me! 👍

Reviewed everything up to 8163a06 in 11 seconds. Click for details.
  • Reviewed 2711 lines of code in 21 files
  • Skipped 0 files when reviewing.
  • Skipped posting 0 draft comments. View those below.
  • Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.

Workflow ID: wflow_lUUywnteahdyQZ7M

You can customize Ellipsis by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Feb 2, 2026

Walkthrough

This PR restructures the Stage 1 audio processing pipeline from a monolithic module into a modular architecture with separate executors, flows, and tasks. It introduces new prompt versions for initial transcription and detection preprocessing, reorganizes prompt files into a stage-specific directory structure, and updates constants and import scripts to reflect the new enum-based organization.

Changes

Cohort / File(s) Summary
Prompt Reorganization
prompts/Stage_1_Preprocess_system_instruction.md, prompts/Stage_1_Preprocess_transcription_prompt.md
Removed old root-level prompt files for stage 1 preprocessing.
New Stage 1 Preprocess Prompts
prompts/stage_1/preprocess/initial_detection_system_instruction.md, prompts/stage_1/preprocess/initial_detection_user_prompt.md, prompts/stage_1/preprocess/initial_detection_output_schema.json, prompts/stage_1/preprocess/initial_transcription_user_prompt.md, prompts/stage_1/preprocess/initial_transcription_output_schema.json
Added new, granular prompt files for initial transcription and detection stages with refined instructions, self-review processes, and JSON schemas.
Pipeline Constants
src/processing_pipeline/constants.py
Added new PromptStage enum members (STAGE_1_INITIAL_TRANSCRIPTION, STAGE_1_INITIAL_DETECTION, STAGE_4); removed old stage-1 helper functions.
Stage 1 Modularization
src/processing_pipeline/stage_1/__init__.py, src/processing_pipeline/stage_1/executors.py, src/processing_pipeline/stage_1/flows.py, src/processing_pipeline/stage_1/tasks.py
Refactored monolithic src/processing_pipeline/stage_1.py into modular architecture: executors handle Gemini operations (Stage1PreprocessTranscriptionExecutor, Stage1PreprocessDetectionExecutor, Stage1Executor, GeminiTimestampTranscriptionGenerator), flows orchestrate main workflows (initial_disinformation_detection, undo_disinformation_detection, redo_main_detection, regenerate_timestamped_transcript), and tasks provide granular operations (audio fetching, transcription, detection, status management).
Import Script Updates
src/scripts/import_prompts_to_db.py
Updated PROMPT_MAPPING to use PromptStage enum members instead of string keys; added entries for new initial transcription and detection stages; updated file paths to reflect new directory structure.

Sequence Diagram(s)

sequenceDiagram
    participant Flow as initial_disinformation_detection
    participant Tasks as tasks module
    participant Supabase as Supabase
    participant S3 as S3/R2 Storage
    participant Gemini as Gemini API
    participant Executors as Executor Classes

    Flow->>Supabase: fetch_a_new_audio_file_from_supabase()
    Supabase-->>Flow: audio_file record
    Flow->>S3: download_audio_file_from_s3()
    S3-->>Flow: local audio file
    Flow->>Tasks: process_audio_file()
    Tasks->>Executors: Stage1PreprocessTranscriptionExecutor.run()
    Executors->>Gemini: upload audio + generate_content(user_prompt)
    Gemini-->>Executors: initial_transcription
    Executors-->>Tasks: initial_transcription result
    Tasks->>Executors: Stage1PreprocessDetectionExecutor.run()
    Executors->>Gemini: generate_content(transcription + metadata)
    Gemini-->>Executors: initial_detection_result
    Executors-->>Tasks: initial_detection result
    alt flagged_snippets exist
        Tasks->>Executors: GeminiTimestampTranscriptionGenerator.run()
        Executors->>Executors: split_audio_into_segments()
        Executors->>Executors: transcribe_batch() for each segment
        Executors->>Gemini: generate_content(segments)
        Gemini-->>Executors: timestamped_transcription
        Executors-->>Tasks: timestamped_transcription result
        Tasks->>Executors: Stage1Executor.run()
        Executors->>Gemini: generate_content(timestamped_transcription)
        Gemini-->>Executors: main_detection_result
        Executors-->>Tasks: main_detection result
    end
    Tasks->>Supabase: insert_stage_1_llm_response()
    Supabase-->>Tasks: response inserted
    Tasks->>Supabase: set_audio_file_status()
    Supabase-->>Tasks: status updated
    Tasks-->>Flow: process completed
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • nhphong

Poem

🐰 Hops through prompts with careful paws,
Modular flows without a pause,
Executors clean, tasks refined,
Detection and transcription aligned,
Stage 1 springs forth, reborn anew! 🌟

🚥 Pre-merge checks | ✅ 4 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title clearly summarizes the main change: reactivating the Stage 1 preprocess step for initial detection, which aligns with the primary objective.
Linked Issues check ✅ Passed The PR successfully reactivates the Stage 1 preprocess step for initial detection as required by VER-297, implementing the necessary code refactoring, prompt updates, and client architecture changes.
Out of Scope Changes check ✅ Passed All changes are in scope: prompt reorganization, executor/task/flow refactoring, constants updates, and script modifications directly support reactivating the Stage 1 preprocess step.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/reenable-stage-1-preprocess

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 Pylint (4.0.4)
src/processing_pipeline/stage_1/__init__.py

************* Module .pylintrc
.pylintrc:1:0: F0011: error while parsing the configuration: File contains no section headers.
file: '.pylintrc', line: 1
'disable=C0116\n' (config-parse-error)
[
{
"type": "convention",
"module": "src.processing_pipeline.stage_1",
"obj": "",
"line": 1,
"column": 0,
"endLine": null,
"endColumn": null,
"path": "src/processing_pipeline/stage_1/init.py",
"symbol": "missing-module-docstring",
"message": "Missing module docstring",
"message-id": "C0114"
}
]

src/processing_pipeline/constants.py

************* Module .pylintrc
.pylintrc:1:0: F0011: error while parsing the configuration: File contains no section headers.
file: '.pylintrc', line: 1
'disable=C0116\n' (config-parse-error)
[
{
"type": "convention",
"module": "src.processing_pipeline.constants",
"obj": "",
"line": 105,
"column": 0,
"endLine": null,
"endColumn": null,
"path": "src/processing_pipeline/constants.py",
"symbol": "line-too-long",
"message": "Line too long (101/100)",
"message-id": "C0301"
},
{
"type": "convention",
"module": "src.processing_pipeline.constants",
"obj": "",
"line": 109,
"column": 0,
"endLine": null,
"endColumn": null,
"path": "src/processing_pipeline/constants.py",
"symbol": "line-too-long",
"message": "Line too long (115/100)",
"message-id": "C0301"
},
{
"type": "convention",
"modul

... [truncated 9025 characters] ...

ini_timestamped_transcription_generation_prompt",
"line": 75,
"column": 0,
"endLine": 75,
"endColumn": 58,
"path": "src/processing_pipeline/constants.py",
"symbol": "missing-function-docstring",
"message": "Missing function or method docstring",
"message-id": "C0116"
},
{
"type": "warning",
"module": "src.processing_pipeline.constants",
"obj": "get_gemini_timestamped_transcription_generation_prompt",
"line": 76,
"column": 11,
"endLine": 76,
"endColumn": 85,
"path": "src/processing_pipeline/constants.py",
"symbol": "unspecified-encoding",
"message": "Using open without explicitly specifying an encoding",
"message-id": "W1514"
}
]

src/processing_pipeline/stage_1/executors.py

************* Module .pylintrc
.pylintrc:1:0: F0011: error while parsing the configuration: File contains no section headers.
file: '.pylintrc', line: 1
'disable=C0116\n' (config-parse-error)
[
{
"type": "convention",
"module": "src.processing_pipeline.stage_1.executors",
"obj": "",
"line": 170,
"column": 0,
"endLine": null,
"endColumn": null,
"path": "src/processing_pipeline/stage_1/executors.py",
"symbol": "line-too-long",
"message": "Line too long (102/100)",
"message-id": "C0301"
},
{
"type": "convention",
"module": "src.processing_pipeline.stage_1.executors",
"obj": "",
"line": 193,
"column": 0,
"endLine": null,
"endColumn": null,
"path": "src/processing_pipeline/stage_1/executors.py",
"symbol": "line-too-long",
"message": "Line too long (103/100)",
"message-id": "C0301"
},
{
"typ

... [truncated 12854 characters] ...

inal_transcription",
"line": 253,
"column": 4,
"endLine": 253,
"endColumn": 34,
"path": "src/processing_pipeline/stage_1/executors.py",
"symbol": "missing-function-docstring",
"message": "Missing function or method docstring",
"message-id": "C0116"
},
{
"type": "convention",
"module": "src.processing_pipeline.stage_1.executors",
"obj": "GeminiTimestampTranscriptionGenerator.split_audio_into_segments",
"line": 268,
"column": 4,
"endLine": 268,
"endColumn": 33,
"path": "src/processing_pipeline/stage_1/executors.py",
"symbol": "missing-function-docstring",
"message": "Missing function or method docstring",
"message-id": "C0116"
}
]

  • 3 others

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello @quancao-ea, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request reactivates and enhances the preprocessing capabilities within Stage 1 of the disinformation detection system. It introduces a two-stage detection process: an initial, lightweight transcription and detection, followed by a more detailed timestamped transcription and main detection only if potential disinformation is found in the initial pass. This change is accompanied by a comprehensive refactoring of the Stage 1 codebase into a modular structure, improving clarity and maintainability, and a reorganization of prompt files to support the new workflow.

Highlights

  • Reactivation of Preprocessing Step: Reintroduces an initial transcription and detection phase within Stage 1 of the disinformation detection pipeline to optimize resource usage.
  • Modular Code Refactoring: The Stage 1 processing logic has been significantly refactored into distinct 'executors', 'flows', and 'tasks' modules for improved organization, maintainability, and clarity.
  • Prompt File Restructuring: Prompt files have been reorganized into 'main' and 'preprocess' subdirectories, and new prompt definitions have been added for the initial transcription and detection steps.
  • Conditional Processing Logic: The pipeline now conditionally skips the more resource-intensive timestamped transcription and main detection steps if no disinformation is identified during the initial detection phase.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request significantly refactors the Stage 1 processing pipeline, modularizing the code by splitting stage_1.py into separate files for flows, tasks, and executors, and introduces a two-step preprocess for initial transcription and detection. While this improves structure and maintainability, a critical security vulnerability exists: the direct concatenation of untrusted data (transcriptions and metadata) into LLM prompts makes the system susceptible to prompt injection attacks, which could bypass disinformation detection logic. This requires immediate attention, ideally through the implementation of delimiters and improved prompt engineering. Furthermore, the review suggests enhancing maintainability by removing magic numbers, reducing code duplication, and ensuring consistent client handling across tasks.

Comment on lines +76 to +80
user_prompt = (
f"{prompt_version['user_prompt']}\n\n"
f"Here is the metadata of the transcription:\n\n{json.dumps(metadata, indent=2)}\n\n"
f"Here is the transcription:\n\n{transcription}"
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The user prompt is constructed by directly concatenating the transcription and metadata into the prompt string. Since the transcription is derived from external audio content, it can be used to perform a prompt injection attack. An attacker could include spoken instructions in the audio that, when transcribed, manipulate the LLM's behavior in the detection stage (e.g., to ignore disinformation or output specific malicious content). Delimiters and clear instructions should be used to separate untrusted content from the prompt's instructions.

Comment on lines +117 to +121
user_prompt = (
f"{prompt_version['user_prompt']}\n\n"
f"Here is the metadata of the transcription:\n\n{json.dumps(metadata, indent=2)}\n\n"
f"Here is the timestamped transcription:\n\n{timestamped_transcription}"
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The user prompt is constructed by directly concatenating the timestamped_transcription and metadata into the prompt string. This allows for prompt injection attacks where malicious content in the transcription or metadata can override the LLM's intended instructions. Using delimiters (e.g., XML-like tags) and updating the system instructions to treat content within those delimiters as data only can help mitigate this risk.

all_transcripts[absolute_segment_num] = segment["transcript"]

print(f"Batch complete: transcribed {actual_count} segments")
time.sleep(2)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The use of time.sleep(2) introduces a magic number. To improve code clarity and maintainability, consider defining this value as a named constant at the top of the class or module (e.g., _BATCH_PROCESSING_DELAY_SECONDS = 2). This makes the purpose of the delay explicit (e.g., to respect API rate limits) and simplifies future adjustments.

Comment on lines +190 to +194
def transcribe_audio_file_with_custom_timestamped_transcription_generator(audio_file):
print(f"Transcribing the audio file {audio_file} with the custom timestamped-transcription-generator")
gemini_key = os.getenv("GOOGLE_GEMINI_KEY")
timestamped_transcription = TimestampedTranscriptionGenerator.run(audio_file, gemini_key, 10)
return {"timestamped_transcription": timestamped_transcription}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This task creates its own Gemini API key from environment variables, which is inconsistent with other tasks like transcribe_audio_file_with_timestamp_with_gemini that receive a gemini_client instance. To improve consistency and centralize client management, consider refactoring this task and the underlying TimestampedTranscriptionGenerator to accept a gemini_client instance instead of creating one from an API key.

Comment on lines +289 to +352
if len(flagged_snippets) == 0:
print("No flagged snippets found during initial detection. Skipping timestamped transcription.")
insert_stage_1_llm_response(
supabase_client=supabase_client,
audio_file_id=audio_file["id"],
initial_transcription=initial_transcription,
initial_detection_result=initial_detection_result,
transcriptor=None,
timestamped_transcription=None,
detection_result=None,
status="Processed",
detection_prompt_version_id=None,
transcription_prompt_version_id=None,
)
else:
# Timestamped transcription
transcriptor = GeminiModel.GEMINI_FLASH_LATEST
timestamped_transcription = transcribe_audio_file_with_timestamp_with_gemini(
gemini_client=gemini_client,
audio_file=local_file,
prompt_version=transcription_prompt_version,
model_name=transcriptor,
)

# Main detection
detection_result = disinformation_detection_with_gemini(
gemini_client=gemini_client,
timestamped_transcription=timestamped_transcription["timestamped_transcription"],
metadata=metadata,
prompt_version=detection_prompt_version,
model_name=GeminiModel.GEMINI_FLASH_LATEST,
)
print(f"Main detection result:\n{json.dumps(detection_result, indent=2, ensure_ascii=False)}\n")

main_flagged_snippets = detection_result["flagged_snippets"]

if len(main_flagged_snippets) == 0:
print("No flagged snippets found during main detection. Setting status to 'Processed'.")
insert_stage_1_llm_response(
supabase_client=supabase_client,
audio_file_id=audio_file["id"],
initial_transcription=initial_transcription,
initial_detection_result=initial_detection_result,
transcriptor=transcriptor,
timestamped_transcription=timestamped_transcription,
detection_result=detection_result,
status="Processed",
detection_prompt_version_id=detection_prompt_version["id"],
transcription_prompt_version_id=transcription_prompt_version["id"],
)
else:
print(f"Found {len(main_flagged_snippets)} flagged snippets during main detection. Setting status to 'New'.")
insert_stage_1_llm_response(
supabase_client=supabase_client,
audio_file_id=audio_file["id"],
initial_transcription=initial_transcription,
initial_detection_result=initial_detection_result,
transcriptor=transcriptor,
timestamped_transcription=timestamped_transcription,
detection_result=detection_result,
status="New",
detection_prompt_version_id=detection_prompt_version["id"],
transcription_prompt_version_id=transcription_prompt_version["id"],
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This section has a nested if/else structure that leads to duplicated calls to insert_stage_1_llm_response. You can simplify this logic by preparing the parameters for the insertion and then making a single call at the end. This will make the code more readable and easier to maintain.

        transcriptor = None
        timestamped_transcription = None
        detection_result = None
        status = "Processed"
        detection_prompt_version_id = None
        transcription_prompt_version_id = None

        if len(flagged_snippets) > 0:
            # Timestamped transcription
            transcriptor = GeminiModel.GEMINI_FLASH_LATEST
            timestamped_transcription = transcribe_audio_file_with_timestamp_with_gemini(
                gemini_client=gemini_client,
                audio_file=local_file,
                prompt_version=transcription_prompt_version,
                model_name=transcriptor,
            )

            # Main detection
            detection_result = disinformation_detection_with_gemini(
                gemini_client=gemini_client,
                timestamped_transcription=timestamped_transcription["timestamped_transcription"],
                metadata=metadata,
                prompt_version=detection_prompt_version,
                model_name=GeminiModel.GEMINI_FLASH_LATEST,
            )
            print(f"Main detection result:\n{json.dumps(detection_result, indent=2, ensure_ascii=False)}\n")

            detection_prompt_version_id = detection_prompt_version["id"]
            transcription_prompt_version_id = transcription_prompt_version["id"]

            if len(detection_result["flagged_snippets"]) > 0:
                print(f"Found {len(detection_result['flagged_snippets'])} flagged snippets during main detection. Setting status to 'New'.")
                status = "New"
            else:
                print("No flagged snippets found during main detection. Setting status to 'Processed'.")
        else:
            print("No flagged snippets found during initial detection. Skipping timestamped transcription.")

        insert_stage_1_llm_response(
            supabase_client=supabase_client,
            audio_file_id=audio_file["id"],
            initial_transcription=initial_transcription,
            initial_detection_result=initial_detection_result,
            transcriptor=transcriptor,
            timestamped_transcription=timestamped_transcription,
            detection_result=detection_result,
            status=status,
            detection_prompt_version_id=detection_prompt_version_id,
            transcription_prompt_version_id=transcription_prompt_version_id,
        )

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Fix all issues with AI agents
In `@src/processing_pipeline/stage_1/executors.py`:
- Around line 34-37: The polling loop that checks uploaded_file.state using
gemini_client.files.get can hang indefinitely; add a timeout mechanism: record a
start time (e.g., start = time.time()), introduce a configurable
max_wait_seconds (or DEFAULT_MAX_WAIT) and on each iteration check if
time.time() - start > max_wait_seconds, then break/raise an exception or set
uploaded_file.state to a failure status and log an error. Ensure the code
references the same uploaded_file and gemini_client.files.get call and keeps the
existing sleep interval while enforcing the timeout to avoid infinite looping.

In `@src/processing_pipeline/stage_1/tasks.py`:
- Line 153: The print call uses an unnecessary f-string prefix without any
placeholders; locate the print statement that prints "Processing initial
transcription with Gemini for disinformation detection" in
src/processing_pipeline/stage_1/tasks.py (the print(...) line) and remove the
leading "f" so it becomes a normal string literal (print("Processing initial
transcription with Gemini for disinformation detection")).
- Around line 382-385: The log message in reset_status_of_audio_files contains a
typo ("Reseting"); update the print statement in the reset_status_of_audio_files
function to use the correct spelling "Resetting the status of the audio files"
while keeping the rest of the message and behavior intact (the call to
supabase_client.reset_audio_file_status(audio_file_ids) should remain
unchanged).
- Around line 102-107: The file handle for audio_file is opened directly in the
call to client.audio.transcriptions.create, which can leak descriptors on
exceptions; change this to open the file with a context manager (use with
open(audio_file, "rb") as f:) and pass that handle (f) into
client.audio.transcriptions.create so the file is always closed after the call
(refer to response and client.audio.transcriptions.create in tasks.py).
🧹 Nitpick comments (10)
src/processing_pipeline/constants.py (2)

43-52: Consider using context managers for file operations.

The open().read() pattern without closing the file handle can lead to resource leaks, especially if these functions are called frequently.

♻️ Suggested refactor using context managers
 def get_user_prompt_for_stage_3():
-    return open("prompts/Stage_3_analysis_prompt.md", "r").read()
+    with open("prompts/Stage_3_analysis_prompt.md", "r") as f:
+        return f.read()


 def get_system_instruction_for_stage_3():
-    return open("prompts/Stage_3_system_instruction.md", "r").read()
+    with open("prompts/Stage_3_system_instruction.md", "r") as f:
+        return f.read()


 def get_output_schema_for_stage_3():
-    return json.load(open("prompts/Stage_3_output_schema.json", "r"))
+    with open("prompts/Stage_3_output_schema.json", "r") as f:
+        return json.load(f)

Apply the same pattern to other file-reading functions in this module.


79-126: Large block of commented-out code.

This __main__ block contains extensive commented-out code. If it's no longer needed, consider removing it to reduce clutter. If it's useful for debugging, consider moving it to a separate script or documenting its purpose.

src/processing_pipeline/stage_1/flows.py (4)

84-102: Consider handling cleanup failure after processing error.

If process_audio_file raises an exception, the local file may not be cleaned up since os.remove is outside any try/finally block. This could cause disk space issues over time.

♻️ Proposed fix to ensure file cleanup
         if audio_file:
             local_file = download_audio_file_from_s3(s3_client, audio_file["file_path"])

-            # Process the audio file
-            process_audio_file(
-                supabase_client=supabase_client,
-                gemini_client=gemini_client,
-                audio_file=audio_file,
-                local_file=local_file,
-                initial_transcription_prompt_version=initial_transcription_prompt_version,
-                initial_detection_prompt_version=initial_detection_prompt_version,
-                transcription_prompt_version=transcription_prompt_version,
-                detection_prompt_version=detection_prompt_version,
-            )
-            processed_audio_files += 1
-            print(f"Processed {processed_audio_files}/{limit} audio files")
-
-            print(f"Delete the downloaded audio file: {local_file}")
-            os.remove(local_file)
+            try:
+                # Process the audio file
+                process_audio_file(
+                    supabase_client=supabase_client,
+                    gemini_client=gemini_client,
+                    audio_file=audio_file,
+                    local_file=local_file,
+                    initial_transcription_prompt_version=initial_transcription_prompt_version,
+                    initial_detection_prompt_version=initial_detection_prompt_version,
+                    transcription_prompt_version=transcription_prompt_version,
+                    detection_prompt_version=detection_prompt_version,
+                )
+                processed_audio_files += 1
+                print(f"Processed {processed_audio_files}/{limit} audio files")
+            finally:
+                print(f"Delete the downloaded audio file: {local_file}")
+                if os.path.exists(local_file):
+                    os.remove(local_file)

158-158: Datetime parsing assumes fixed UTC offset format.

The format string %Y-%m-%dT%H:%M:%S+00:00 will fail if the database returns a different timezone offset (e.g., +05:30). Consider using a more robust parser.

♻️ Proposed fix for flexible datetime parsing
+from datetime import datetime, timezone
+from dateutil import parser as dateutil_parser
 ...
-            recorded_at = datetime.strptime(audio_file["recorded_at"], "%Y-%m-%dT%H:%M:%S+00:00")
+            recorded_at = dateutil_parser.isoparse(audio_file["recorded_at"])

Alternatively, if you want to avoid adding a dependency:

recorded_at = datetime.fromisoformat(audio_file["recorded_at"].replace("+00:00", "+00:00"))

Note: datetime.fromisoformat() in Python 3.11+ handles ISO 8601 formats including timezone offsets.


241-256: Inconsistent transcriptor identifier.

Line 242 uses a hardcoded string "gemini-1206" while other parts of the codebase use GeminiModel enum values (e.g., GeminiModel.GEMINI_FLASH_LATEST). Consider using the enum for consistency, or define this as a constant.

♻️ Proposed fix
             try:
-                transcriptor = "gemini-1206"
+                transcriptor = str(GeminiModel.GEMINI_FLASH_LATEST)
                 timestamped_transcription = transcribe_audio_file_with_timestamp_with_gemini(

286-288: File cleanup should be in a finally block.

Similar to initial_disinformation_detection, if an exception occurs during processing, the downloaded file won't be cleaned up.

♻️ Wrap processing in try/finally for cleanup
         if stage_1_llm_response:
             print(f"Found stage 1 llm response {id}")

             # Get metadata of the transcription
             audio_file = stage_1_llm_response["audio_file"]
             local_file = download_audio_file_from_s3(s3_client, audio_file["file_path"])
-            recorded_at = datetime.strptime(audio_file["recorded_at"], "%Y-%m-%dT%H:%M:%S+00:00")
-            ...
-            print(f"Processing completed for stage 1 llm response {id}")
-            print(f"Delete the downloaded audio file: {local_file}")
-            os.remove(local_file)
+            try:
+                recorded_at = datetime.strptime(audio_file["recorded_at"], "%Y-%m-%dT%H:%M:%S+00:00")
+                ...
+                print(f"Processing completed for stage 1 llm response {id}")
+            finally:
+                print(f"Delete the downloaded audio file: {local_file}")
+                if os.path.exists(local_file):
+                    os.remove(local_file)
src/processing_pipeline/stage_1/executors.py (2)

229-242: Consider using list unpacking for cleaner code.

Per static analysis suggestion, use unpacking instead of concatenation for better readability.

♻️ Proposed fix
         result = gemini_client.models.generate_content(
             model=model_name,
-            contents=[prompt_version["user_prompt"]] + segments,
+            contents=[prompt_version["user_prompt"], *segments],
             config=GenerateContentConfig(

267-285: Audio format is hardcoded to MP3.

AudioSegment.from_mp3() assumes the input is always MP3. If other audio formats are provided, this will fail with an unclear error. Consider using AudioSegment.from_file() with format detection or explicit format parameter.

♻️ Proposed fix for flexible format handling
     `@classmethod`
     def split_audio_into_segments(cls, audio_file: str, segment_length_ms: int) -> list:
-        audio = AudioSegment.from_mp3(audio_file)
+        # Detect format from extension, default to mp3
+        ext = pathlib.Path(audio_file).suffix.lower().lstrip('.')
+        audio = AudioSegment.from_file(audio_file, format=ext or "mp3")
         segments = []
src/processing_pipeline/stage_1/tasks.py (2)

42-55: Same datetime format issue as in flows.py.

Line 44 uses the same hardcoded format string. Consider extracting datetime parsing to a shared utility function.


357-359: Consider logging the full traceback for debugging.

While catching bare Exception is reasonable here to ensure status updates, the current code only prints the exception message. Consider logging the full traceback for easier debugging.

♻️ Proposed fix
+import traceback
 ...
     except Exception as e:
-        print(f"Failed to process audio file {local_file}: {e}")
+        print(f"Failed to process audio file {local_file}: {e}\n{traceback.format_exc()}")
         set_audio_file_status(supabase_client, audio_file["id"], ProcessingStatus.ERROR, str(e))

Comment on lines +34 to +37
while uploaded_file.state.name == "PROCESSING":
print("Processing the uploaded audio file...")
time.sleep(1)
uploaded_file = gemini_client.files.get(name=uploaded_file.name)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Polling loop lacks timeout - potential infinite loop.

If the uploaded file never transitions out of PROCESSING state (due to API issues), this loop will run indefinitely. Consider adding a timeout.

🛡️ Proposed fix to add timeout
+        MAX_WAIT_SECONDS = 300  # 5 minutes
+        waited_seconds = 0
         while uploaded_file.state.name == "PROCESSING":
             print("Processing the uploaded audio file...")
             time.sleep(1)
+            waited_seconds += 1
+            if waited_seconds >= MAX_WAIT_SECONDS:
+                gemini_client.files.delete(name=uploaded_file.name)
+                raise TimeoutError(f"File processing timed out after {MAX_WAIT_SECONDS} seconds")
             uploaded_file = gemini_client.files.get(name=uploaded_file.name)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
while uploaded_file.state.name == "PROCESSING":
print("Processing the uploaded audio file...")
time.sleep(1)
uploaded_file = gemini_client.files.get(name=uploaded_file.name)
MAX_WAIT_SECONDS = 300 # 5 minutes
waited_seconds = 0
while uploaded_file.state.name == "PROCESSING":
print("Processing the uploaded audio file...")
time.sleep(1)
waited_seconds += 1
if waited_seconds >= MAX_WAIT_SECONDS:
gemini_client.files.delete(name=uploaded_file.name)
raise TimeoutError(f"File processing timed out after {MAX_WAIT_SECONDS} seconds")
uploaded_file = gemini_client.files.get(name=uploaded_file.name)
🤖 Prompt for AI Agents
In `@src/processing_pipeline/stage_1/executors.py` around lines 34 - 37, The
polling loop that checks uploaded_file.state using gemini_client.files.get can
hang indefinitely; add a timeout mechanism: record a start time (e.g., start =
time.time()), introduce a configurable max_wait_seconds (or DEFAULT_MAX_WAIT)
and on each iteration check if time.time() - start > max_wait_seconds, then
break/raise an exception or set uploaded_file.state to a failure status and log
an error. Ensure the code references the same uploaded_file and
gemini_client.files.get call and keeps the existing sleep interval while
enforcing the timeout to avoid infinite looping.

Comment on lines +102 to +107
response = client.audio.transcriptions.create(
model="whisper-1",
file=open(audio_file, "rb"),
response_format="verbose_json",
timestamp_granularities=["segment"],
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

File handle not properly closed - potential resource leak.

The file is opened without a context manager, which could leak file descriptors if an exception occurs before the API call completes.

🐛 Proposed fix
     # Transcribe the audio file
-    response = client.audio.transcriptions.create(
-        model="whisper-1",
-        file=open(audio_file, "rb"),
-        response_format="verbose_json",
-        timestamp_granularities=["segment"],
-    )
+    with open(audio_file, "rb") as f:
+        response = client.audio.transcriptions.create(
+            model="whisper-1",
+            file=f,
+            response_format="verbose_json",
+            timestamp_granularities=["segment"],
+        )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
response = client.audio.transcriptions.create(
model="whisper-1",
file=open(audio_file, "rb"),
response_format="verbose_json",
timestamp_granularities=["segment"],
)
with open(audio_file, "rb") as f:
response = client.audio.transcriptions.create(
model="whisper-1",
file=f,
response_format="verbose_json",
timestamp_granularities=["segment"],
)
🤖 Prompt for AI Agents
In `@src/processing_pipeline/stage_1/tasks.py` around lines 102 - 107, The file
handle for audio_file is opened directly in the call to
client.audio.transcriptions.create, which can leak descriptors on exceptions;
change this to open the file with a context manager (use with open(audio_file,
"rb") as f:) and pass that handle (f) into client.audio.transcriptions.create so
the file is always closed after the call (refer to response and
client.audio.transcriptions.create in tasks.py).

metadata: dict,
prompt_version: dict,
):
print(f"Processing initial transcription with Gemini for disinformation detection")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove extraneous f-string prefix.

Static analysis correctly identifies that this f-string has no placeholders.

🐛 Proposed fix
-    print(f"Processing initial transcription with Gemini for disinformation detection")
+    print("Processing initial transcription with Gemini for disinformation detection")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
print(f"Processing initial transcription with Gemini for disinformation detection")
print("Processing initial transcription with Gemini for disinformation detection")
🧰 Tools
🪛 Ruff (0.14.14)

[error] 153-153: f-string without any placeholders

Remove extraneous f prefix

(F541)

🤖 Prompt for AI Agents
In `@src/processing_pipeline/stage_1/tasks.py` at line 153, The print call uses an
unnecessary f-string prefix without any placeholders; locate the print statement
that prints "Processing initial transcription with Gemini for disinformation
detection" in src/processing_pipeline/stage_1/tasks.py (the print(...) line) and
remove the leading "f" so it becomes a normal string literal (print("Processing
initial transcription with Gemini for disinformation detection")).

Comment on lines +382 to +385
@optional_task(log_prints=True, retries=3)
def reset_status_of_audio_files(supabase_client, audio_file_ids):
print(f"Reseting the status of the audio files: {audio_file_ids}")
supabase_client.reset_audio_file_status(audio_file_ids)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Minor typo in log message.

"Reseting" should be "Resetting".

📝 Proposed fix
 `@optional_task`(log_prints=True, retries=3)
 def reset_status_of_audio_files(supabase_client, audio_file_ids):
-    print(f"Reseting the status of the audio files: {audio_file_ids}")
+    print(f"Resetting the status of the audio files: {audio_file_ids}")
     supabase_client.reset_audio_file_status(audio_file_ids)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@optional_task(log_prints=True, retries=3)
def reset_status_of_audio_files(supabase_client, audio_file_ids):
print(f"Reseting the status of the audio files: {audio_file_ids}")
supabase_client.reset_audio_file_status(audio_file_ids)
`@optional_task`(log_prints=True, retries=3)
def reset_status_of_audio_files(supabase_client, audio_file_ids):
print(f"Resetting the status of the audio files: {audio_file_ids}")
supabase_client.reset_audio_file_status(audio_file_ids)
🤖 Prompt for AI Agents
In `@src/processing_pipeline/stage_1/tasks.py` around lines 382 - 385, The log
message in reset_status_of_audio_files contains a typo ("Reseting"); update the
print statement in the reset_status_of_audio_files function to use the correct
spelling "Resetting the status of the audio files" while keeping the rest of the
message and behavior intact (the call to
supabase_client.reset_audio_file_status(audio_file_ids) should remain
unchanged).

@quancao-ea quancao-ea merged commit fe59bd5 into main Feb 2, 2026
2 checks passed
@quancao-ea quancao-ea deleted the fix/reenable-stage-1-preprocess branch March 17, 2026 02:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant