Skip to content

ECHO-165 ingest from upload text upload audio implement lock for processing#124

Merged
spashii merged 9 commits intomainfrom
ECHO-165-ingest-from-upload-text-upload-audio-implement-lock-for-processing
May 2, 2025
Merged

ECHO-165 ingest from upload text upload audio implement lock for processing#124
spashii merged 9 commits intomainfrom
ECHO-165-ingest-from-upload-text-upload-audio-implement-lock-for-processing

Conversation

@ArindamRoy23
Copy link
Copy Markdown
Contributor

@ArindamRoy23 ArindamRoy23 commented Apr 30, 2025

Pull Request: Enhance ETL Pipeline with Redis Lock Support and Improved Audio Processing

Description

This PR introduces Redis lock support for the ETL pipeline while enhancing audio processing capabilities and improving overall logging. The changes focus on preventing concurrent processing conflicts and streamlining the audio processing workflow.

Redis Lock Integration

  • Implemented Redis lock mechanism in run_etl.py to prevent concurrent processing of the same conversation ID
  • Added new LiteLLM configuration parameters:
    • REDIS_LOCK_PREFIX: Prefix for Redis lock keys
    • REDIS_LOCK_EXPIRY: Lock expiration time settings
  • Enhanced error handling and logging for Redis lock operations

Audio Processing Improvements

  • Renamed AUDIO_LIGHTRAG_TIME_THRESHOLD_SECONDS to AUDIO_LIGHTRAG_COOL_OFF_TIME_SECONDS for better clarity
  • Refactored AudioETLPipeline to separate audio and non-audio file processing
  • Enhanced ContextualChunkETLPipeline for better handling of non-audio segments
  • Improved transcript management and error handling in audio processing
  • Added placeholder handling for missing audio paths in DirectusETLPipeline

Logging and Code Quality

  • Enhanced logging throughout the ETL pipeline stages for better visibility
  • Streamlined ProcessTracker methods for improved data management
  • Removed unused code in audio ETL pipeline
  • Added comprehensive logging for Redis lock status and processing flow

Testing Focus Areas

  • Redis lock functionality during concurrent ETL operations
  • Audio processing pipeline with various file types
  • Error handling and recovery scenarios
  • Lock expiration and cleanup
  • Processing of conversations with missing audio paths

Related Ticket

ECHO-165

This PR enhances the robustness and reliability of our ETL pipeline while improving code maintainability and observability.

Summary by CodeRabbit

  • New Features
    • Introduced Redis-based locking to prevent duplicate processing of the same conversation within a configurable expiry period.
    • Added configuration options for Redis lock prefix and expiry time.
  • Improvements
    • Enhanced processing to distinctly handle conversations with and without audio files, ensuring non-audio transcripts are properly managed.
    • Renamed and clarified the audio processing cool-off time setting for improved transparency.
  • Documentation
    • Updated configuration documentation to reflect renamed settings and added Redis lock configuration details.
  • Refactor
    • Streamlined internal logic for processing audio and non-audio files, improving maintainability.
  • Chores
    • Removed an unused method from the process tracker utility.

- Updated logging messages in run_etl.py to indicate progress through the ETL pipeline stages.
- Refactored AudioETLPipeline to separate processing of audio and non-audio files, improving clarity and functionality.
- Enhanced ContextualChunkETLPipeline to handle non-audio segments more effectively, including transcript management and error handling.
- Improved DirectusETLPipeline to ensure proper handling of missing audio paths by introducing a placeholder value.
- Streamlined ProcessTracker methods for better data management and clarity.
- Updated LiteLLM configuration documentation to include new settings for Redis lock management, including `REDIS_LOCK_PREFIX` and `REDIS_LOCK_EXPIRY`.
- Refactored `run_etl.py` to implement Redis locks, preventing concurrent processing of the same conversation ID and ensuring smoother ETL pipeline execution.
- Changed `AUDIO_LIGHTRAG_TIME_THRESHOLD_SECONDS` to `AUDIO_LIGHTRAG_COOL_OFF_TIME_SECONDS` for clarity in audio processing settings.
- Improved error handling and logging in the ETL pipeline to track Redis lock status and processing flow.
- Cleaned up unused code in the audio ETL pipeline for better maintainability.
@linear
Copy link
Copy Markdown

linear bot commented Apr 30, 2025

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 30, 2025

Walkthrough

This update introduces Redis-based locking to the ETL pipeline to prevent concurrent or repeated processing of the same conversation IDs within a set expiry window. The configuration for audio processing cool-off time and Redis locks is clarified and documented, with environment variable names updated accordingly. The ETL, audio, and contextual chunk pipelines are refactored to handle audio and non-audio files distinctly, ensuring proper transcript handling and LightRAG insertion. Documentation is updated to reflect these changes, and minor utility methods are cleaned up for clarity.

Changes

File(s) Change Summary
echo/server/dembrane/audio_lightrag/main/run_etl.py Added Redis locking to prevent duplicate ETL processing for the same conversation IDs; logs TTL for locked IDs, cleans up locks on failure, and updates logging.
echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py Refactored to process audio and non-audio files separately; non-audio files now create a single segment per group, concatenate transcripts, and update Directus accordingly.
echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py Split load logic to handle audio and non-audio segments distinctly; non-audio segments skip audio transcription and directly insert transcripts into LightRAG if not already flagged.
echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py Updated to use new config variable for audio cool-off; missing paths are now labeled as 'NO_AUDIO_FOUND' and handled accordingly.
echo/server/dembrane/config.py Renamed audio cool-off env variable; added Redis lock prefix and expiry configuration with environment variable support and debug logging.
echo/server/dembrane/audio_lightrag/utils/process_tracker.py Removed method for updating status by conversation and chunk ID; retained chunk ID-based update.
echo/docs/litellm_config.md Updated documentation for audio cool-off config and added Redis lock configuration section; clarified feature flags and provided detailed explanations.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant ETL Runner
    participant Redis
    participant Directus
    participant LightRAG

    User->>ETL Runner: Trigger ETL with conv_id_list
    ETL Runner->>Redis: Check for locks on conv_id_list
    Redis-->>ETL Runner: Return locked/unlocked IDs
    ETL Runner->>ETL Runner: Filter out locked IDs
    alt IDs available
        ETL Runner->>Redis: Set locks for processing IDs
        ETL Runner->>Directus: Run Directus ETL pipeline
        Directus-->>ETL Runner: Return processed data
        ETL Runner->>LightRAG: Insert transcripts (audio/non-audio logic)
        LightRAG-->>ETL Runner: Ack
        ETL Runner->>Redis: Release locks on error
    else All IDs locked
        ETL Runner-->>User: Log and exit early
    end
Loading

Assessment against linked issues

Objective Addressed Explanation
Implement Redis-based locking to prevent duplicate ETL processing (ECHO-165)
Handle ingestion from both Upload Text and Upload Audio sources (ECHO-165)
Separate processing logic for audio and non-audio files (ECHO-165)
Update configuration and documentation for new environment variables (ECHO-165)

Suggested reviewers

  • ussaama
  • spashii

Poem

ETL flows with Redis locks in place,
No more duplicate runs in this data race!
Audio or text, the pipeline knows,
Segments and transcripts, wherever it goes.
Cool-off and configs, all tidy and neat,
This code’s looking sharp—ship it, elite! 🚀

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@ArindamRoy23 ArindamRoy23 requested a review from spashii April 30, 2025 21:22
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🔭 Outside diff range comments (1)
echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py (1)

60-78: ⚠️ Potential issue

Potential infinite loop if process_audio_files returns unchanged list

If process_audio_files decides a file is too large or hits an internal guard, the unprocessed_chunk_file_uri_li could come back unchanged ⇒ infinite while-loop. Recommend breaking after N identical iterations or asserting the list shrinks each pass.

♻️ Duplicate comments (1)
echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py (1)

114-118: Same shadowing bug hits non-audio path
The non-audio branch repeats the overwrite issue. Apply the same fix to dodge attribute errors.

🧹 Nitpick comments (10)
echo/server/dembrane/config.py (1)

318-326: Expose lock settings per-env

Great to see locking baked in. One minor win: bubble REDIS_LOCK_EXPIRY / REDIS_LOCK_PREFIX through a get_int() / get_str() helper so tests can override them without env-munging. Totally optional, but it keeps CI slick.

echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py (2)

81-85: Use Path().suffix for safer format parsing

Splitting on "." works until we meet filenames like audio.backup.2024.mp3. A pathlib-based read is bullet-proof and marginally faster:

-from pathlib import Path
-
-conversation_df['format'] = conversation_df.path.apply(lambda x: x.split('.')[-1])
+from pathlib import Path
+
+conversation_df['format'] = conversation_df.path.apply(lambda p: Path(p).suffix.lstrip('.'))

103-106: Vectorise the cool-off filter

apply(lambda …) is O(N ) Python loops. Pandas can do this in C:

-# take diff between current_timestamp and timestamp
-timestamp_diff = conversation_df['timestamp'].apply(lambda x: (run_timestamp - x).total_seconds())
-conversation_df = conversation_df[timestamp_diff > int(AUDIO_LIGHTRAG_COOL_OFF_TIME_SECONDS)]
+# vectorised delta in seconds
+timestamp_diff = (run_timestamp - conversation_df['timestamp']).dt.total_seconds()
+conversation_df = conversation_df[timestamp_diff > AUDIO_LIGHTRAG_COOL_OFF_TIME_SECONDS]

Improves large-batch performance noticeably.

echo/docs/litellm_config.md (2)

45-48: Tiny comma nit

“Files will not be processed if uploaded earlier than cooloff. Currently disabled…”

Add a comma after “Currently” to appease grammar bots.

🧰 Tools
🪛 LanguageTool

[uncategorized] ~47-~47: A comma may be missing after the conjunctive/linking adverb ‘Currently’.
Context: ...essed if uploaded earlier than cooloff. Currently disabled, pass the current tz in run_et...

(SENT_START_CONJUNCTIVE_LINKING_ADVERB_COMMA)


55-57: Good call-out on Redis locks – maybe link to config snippet

Consider adding a one-liner on how the key actually looks:
<REDIS_LOCK_PREFIX><conversation_id> – saves newcomers a dive into the source.

🧰 Tools
🪛 LanguageTool

[uncategorized] ~57-~57: Loose punctuation mark.
Context: ... the ETL pipeline. - REDIS_LOCK_EXPIRY: Time in seconds before a Redis lock exp...

(UNLIKELY_OPENING_PUNCTUATION)

echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py (1)

45-58: Derive unique (project, conversation) pairs without Python set(zip())

Pandas can hand you uniques directly and avoid the temporary Python objects:

-zip_unique_audio = list(
-    set(
-        zip(
-            transform_audio_process_tracker_df.project_id,
-            transform_audio_process_tracker_df.conversation_id,
-            strict=True
-        )
-    )
-)
+zip_unique_audio = (
+    transform_audio_process_tracker_df
+    .loc[:, ['project_id', 'conversation_id']]
+    .drop_duplicates()
+    .itertuples(index=False, name=None)
+)

Cleaner & faster.

echo/server/dembrane/audio_lightrag/main/run_etl.py (2)

42-44: Reuse your Redis connection; save the sockets for real work
You spin up redis.from_url here and again in the failure handler. One client per run is plenty; pass it around instead of reconnecting.


50-53: Negative TTL handling
ttl returns -2/-1 for “no key / no expiry”. Dividing by 60 then rounding yields weird negatives in the logs. Guard the edge case to keep logs sane.

echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py (2)

37-41: Stop recomputing the tracker DF twice per iteration
self.process_tracker() is called twice on the same line – that’s two DataFrame materialisations every loop. Cache it locally once; your CPU will thank you.

-        for conversation_id in self.process_tracker().conversation_id.unique():
-            load_tracker = self.process_tracker()[self.process_tracker()['conversation_id']  == conversation_id]
+        tracker_df = self.process_tracker()
+        for conversation_id in tracker_df.conversation_id.unique():
+            load_tracker = tracker_df[tracker_df['conversation_id'] == conversation_id]

61-68: Missing stream close might leak sockets
get_stream_from_s3(...).read() leaves the stream open. Wrap it in a context manager or explicitly close() afterwards to free up the underlying HTTP connection.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 95b6bcc and 8e7ce93.

📒 Files selected for processing (7)
  • echo/docs/litellm_config.md (1 hunks)
  • echo/server/dembrane/audio_lightrag/main/run_etl.py (6 hunks)
  • echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py (3 hunks)
  • echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py (4 hunks)
  • echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py (3 hunks)
  • echo/server/dembrane/audio_lightrag/utils/process_tracker.py (0 hunks)
  • echo/server/dembrane/config.py (2 hunks)
💤 Files with no reviewable changes (1)
  • echo/server/dembrane/audio_lightrag/utils/process_tracker.py
🧰 Additional context used
🧬 Code Graph Analysis (2)
echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py (2)
echo/server/dembrane/audio_lightrag/utils/audio_utils.py (1)
  • create_directus_segment (97-106)
echo/server/dembrane/audio_lightrag/utils/process_tracker.py (1)
  • update_value_for_chunk_id (34-35)
echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py (3)
echo/server/dembrane/s3.py (1)
  • get_stream_from_s3 (175-179)
echo/server/dembrane/api/stateless.py (2)
  • insert_item (80-115)
  • InsertRequest (70-73)
echo/server/dembrane/api/dependency_auth.py (1)
  • DirectusSession (13-22)
🪛 LanguageTool
echo/docs/litellm_config.md

[uncategorized] ~47-~47: A comma may be missing after the conjunctive/linking adverb ‘Currently’.
Context: ...essed if uploaded earlier than cooloff. Currently disabled, pass the current tz in run_et...

(SENT_START_CONJUNCTIVE_LINKING_ADVERB_COMMA)


[uncategorized] ~57-~57: Loose punctuation mark.
Context: ... the ETL pipeline. - REDIS_LOCK_EXPIRY: Time in seconds before a Redis lock exp...

(UNLIKELY_OPENING_PUNCTUATION)

⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: ci-check-server
🔇 Additional comments (3)
echo/server/dembrane/config.py (1)

280-287: Constant rename LGTM – confirm callers are updated

The switch to AUDIO_LIGHTRAG_COOL_OFF_TIME_SECONDS is clean and the default of 60 keeps existing behaviour. Just grep the repo once to be 100 % sure no module is still referencing the retired AUDIO_LIGHTRAG_TIME_THRESHOLD_SECONDS — otherwise you’ll trip a NameError at runtime.

echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py (1)

7-7: Import rename is on point

No issues – matches the config change.

echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py (1)

9-10: Import of create_directus_segment LGTM

Balances the new non-audio path nicely.

- Updated LiteLLM configuration documentation to rename `AUTO_SELECT_ENABLED` to `ENABLE_CHAT_AUTO_SELECT` for clarity.
- Improved Redis lock handling in `run_etl.py` to atomically acquire locks and provide informative logging for conversation processing.
- Enhanced `AudioETLPipeline` to streamline chunk processing by fetching transcripts in bulk and improving mapping logic for conversation segments.
- Refactored `ContextualChunkETLPipeline` to improve error handling and response validation during item insertion in Directus.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
echo/docs/litellm_config.md (1)

47-47: Cooloff param docs could use a grammar boost

The description is solid but needs a comma after "Currently". Also, "tz" is dev shorthand - spell out "timezone" for maximum clarity.

-AUDIO_LIGHTRAG_COOL_OFF_TIME_SECONDS`: Time threshold for audio processing in seconds (default: 60). Files will not be processed if uploaded earlier than cooloff. Currently disabled, pass the current tz in run_etl to enable
+AUDIO_LIGHTRAG_COOL_OFF_TIME_SECONDS`: Time threshold for audio processing in seconds (default: 60). Files will not be processed if uploaded earlier than cooloff. Currently disabled, pass the current timezone in run_etl to enable
🧰 Tools
🪛 LanguageTool

[uncategorized] ~47-~47: A comma may be missing after the conjunctive/linking adverb ‘Currently’.
Context: ...essed if uploaded earlier than cooloff. Currently disabled, pass the current tz in run_et...

(SENT_START_CONJUNCTIVE_LINKING_ADVERB_COMMA)

echo/server/dembrane/audio_lightrag/main/run_etl.py (1)

71-72: Fix indentation in run call for consistency

The indentation is a bit off. Let's clean it up.

-            process_tracker = directus_pl.run(filtered_conv_ids, 
-                                                run_timestamp=None) # pass timestamp to avoid processing files uploaded earlier than cooloff
+            process_tracker = directus_pl.run(filtered_conv_ids,
+                                             run_timestamp=None)  # pass timestamp to avoid processing files uploaded earlier than cooloff
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8e7ce93 and 5936b8b.

📒 Files selected for processing (4)
  • echo/docs/litellm_config.md (1 hunks)
  • echo/server/dembrane/audio_lightrag/main/run_etl.py (6 hunks)
  • echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py (3 hunks)
  • echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py
  • echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py
🧰 Additional context used
🪛 LanguageTool
echo/docs/litellm_config.md

[uncategorized] ~47-~47: A comma may be missing after the conjunctive/linking adverb ‘Currently’.
Context: ...essed if uploaded earlier than cooloff. Currently disabled, pass the current tz in run_et...

(SENT_START_CONJUNCTIVE_LINKING_ADVERB_COMMA)


[uncategorized] ~57-~57: Loose punctuation mark.
Context: ... the ETL pipeline. - REDIS_LOCK_EXPIRY: Time in seconds before a Redis lock exp...

(UNLIKELY_OPENING_PUNCTUATION)

⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: ci-check-server
🔇 Additional comments (7)
echo/docs/litellm_config.md (2)

53-53: Feature flag naming synced - LGTM

Solid fix syncing the feature flag name with what's in the code. No confusion = happy devs.


55-57: Redis lock documentation is 💯

Clear and comprehensive explanation of the new Redis lock params. Default values are explicitly called out, 10/10 docs.

🧰 Tools
🪛 LanguageTool

[uncategorized] ~57-~57: Loose punctuation mark.
Context: ... the ETL pipeline. - REDIS_LOCK_EXPIRY: Time in seconds before a Redis lock exp...

(UNLIKELY_OPENING_PUNCTUATION)

echo/server/dembrane/audio_lightrag/main/run_etl.py (5)

4-7: Redis imports look sharp

Clean imports for Redis client and config constants. Exactly what's needed, nothing more.


28-28: Docstring update is on point

Concise explanation of the Redis lock behavior matching the 1-hour default from the config.


42-66: Redis lock implementation is fire 🔥

Atomic lock acquisition with SET ... NX is exactly what we needed to squash race conditions. Smart TTL conversion to minutes for human-readable logs, and early bailout if all IDs are locked. 10x engineering.


73-73: Progress indicators in logs are a nice touch

Adding "1/3...", "2/3...", "3/3..." to the logs makes it much easier to track pipeline progress. Simple but effective UX improvement.

Also applies to: 82-82, 91-91


101-108: Lock cleanup on failure is crucial - LGTM

Solid error handling with proper lock cleanup on failure. Reinitializing the Redis client in the except block is a smart defensive move in case the original connection is borked.

Copy link
Copy Markdown
Member

@spashii spashii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some comments

… handling

- Updated LiteLLM configuration documentation to clarify the `AUDIO_LIGHTRAG_COOL_OFF_TIME_SECONDS` setting and its dependency on the current timezone stamp.
- Renamed `REDIS_LOCK_PREFIX` to `AUDIO_LIGHTRAG_REDIS_LOCK_PREFIX` in the configuration to improve clarity and consistency.
- Adjusted `run_etl.py` to utilize the new `AUDIO_LIGHTRAG_REDIS_LOCK_PREFIX` for Redis lock management, ensuring proper handling of conversation IDs during processing.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
echo/server/dembrane/audio_lightrag/main/run_etl.py (1)

99-109: 🛠️ Refactor suggestion

Move filtered_conv_ids initialization earlier to prevent UnboundLocalError

The filtered_conv_ids initialization should happen before any code that might throw exceptions to ensure it's always defined in the exception handler.

    try:
        if not conv_id_list:
            logger.error("Empty conversation ID list provided")
            return None

+        filtered_conv_ids = []  # Initialize early to ensure it's defined for cleanup paths
        # Filter conversation IDs that are already being processed (via Redis locks)
        redis_client = redis.from_url(REDIS_URL)
-        filtered_conv_ids = []
🧹 Nitpick comments (2)
echo/server/dembrane/audio_lightrag/main/run_etl.py (1)

99-104: DRY up the Redis client initialization

You're initializing the Redis client twice - once in the main flow and once in the exception handler. Extract this to a single initialization at the beginning of the function to keep things DRY.

    try:
+        # Initialize Redis client once for both normal flow and exception handling
+        redis_client = redis.from_url(REDIS_URL)
+        filtered_conv_ids = []  # Initialize early to ensure it's defined for cleanup paths
+
        if not conv_id_list:
            logger.error("Empty conversation ID list provided")
            return None

        # Filter conversation IDs that are already being processed (via Redis locks)
-        redis_client = redis.from_url(REDIS_URL)
-        filtered_conv_ids = []
echo/docs/litellm_config.md (1)

47-47: Fix duplicate word in cool-off time documentation

You've got a word duplication - "current current" should just be "current".

- `AUDIO_LIGHTRAG_COOL_OFF_TIME_SECONDS`: Time threshold for audio processing in seconds (default: 60). Files will not be processed if uploaded earlier than cooloff. Currently disabled, pass the current current tz stamp of directus in run_etl to enable
+ `AUDIO_LIGHTRAG_COOL_OFF_TIME_SECONDS`: Time threshold for audio processing in seconds (default: 60). Files will not be processed if uploaded earlier than cooloff. Currently disabled, pass the current tz stamp of directus in run_etl to enable
🧰 Tools
🪛 LanguageTool

[uncategorized] ~47-~47: A comma may be missing after the conjunctive/linking adverb ‘Currently’.
Context: ...essed if uploaded earlier than cooloff. Currently disabled, pass the current current tz s...

(SENT_START_CONJUNCTIVE_LINKING_ADVERB_COMMA)


[duplication] ~47-~47: Possible typo: you repeated a word.
Context: ...n cooloff. Currently disabled, pass the current current tz stamp of directus in run_etl to enab...

(ENGLISH_WORD_REPEAT_RULE)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5936b8b and a2a128a.

📒 Files selected for processing (3)
  • echo/docs/litellm_config.md (1 hunks)
  • echo/server/dembrane/audio_lightrag/main/run_etl.py (6 hunks)
  • echo/server/dembrane/config.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • echo/server/dembrane/config.py
🧰 Additional context used
🪛 LanguageTool
echo/docs/litellm_config.md

[uncategorized] ~47-~47: A comma may be missing after the conjunctive/linking adverb ‘Currently’.
Context: ...essed if uploaded earlier than cooloff. Currently disabled, pass the current current tz s...

(SENT_START_CONJUNCTIVE_LINKING_ADVERB_COMMA)


[duplication] ~47-~47: Possible typo: you repeated a word.
Context: ...n cooloff. Currently disabled, pass the current current tz stamp of directus in run_etl to enab...

(ENGLISH_WORD_REPEAT_RULE)


[uncategorized] ~57-~57: Loose punctuation mark.
Context: ... the ETL pipeline. - REDIS_LOCK_EXPIRY: Time in seconds before a Redis lock exp...

(UNLIKELY_OPENING_PUNCTUATION)

⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: ci-check-server
🔇 Additional comments (3)
echo/server/dembrane/audio_lightrag/main/run_etl.py (2)

42-67: LGTM! You've crushed the Redis lock implementation! 🚀

Solid implementation using atomic SET with NX and EX flags - avoids race conditions like a boss. The TTL check for informative logging is a nice touch for observability.


71-73: LGTM! Progress indicators in logs = 💯

Adding progress indicators (1/3, 2/3, 3/3) to log messages is a killer observability improvement.

echo/docs/litellm_config.md (1)

55-57: LGTM! Redis lock config docs - critical for engineers to understand the lock mechanism

Solid documentation of the Redis lock configuration parameters. The expiry timing and lock prefix are well explained, making it clear how the ETL process avoids duplicate processing.

🧰 Tools
🪛 LanguageTool

[uncategorized] ~57-~57: Loose punctuation mark.
Context: ... the ETL pipeline. - REDIS_LOCK_EXPIRY: Time in seconds before a Redis lock exp...

(UNLIKELY_OPENING_PUNCTUATION)

@ArindamRoy23
Copy link
Copy Markdown
Contributor Author

Made requested changes. Ok to merge i think @spashii

@spashii spashii changed the title Echo 165 ingest from upload text upload audio implement lock for processing ECHO-165 ingest from upload text upload audio implement lock for processing May 1, 2025
- Renamed `REDIS_LOCK_EXPIRY` to `AUDIO_LIGHTRAG_REDIS_LOCK_EXPIRY` in the configuration for consistency.
- Adjusted `run_etl.py` to utilize the new `AUDIO_LIGHTRAG_REDIS_LOCK_EXPIRY` variable for Redis lock management, ensuring proper handling of conversation IDs during processing.
- Updated documentation to reflect the changes in Redis lock configuration.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
echo/docs/litellm_config.md (1)

47-47: Accurate renaming but minor text duplication

The renaming from TIME_THRESHOLD to COOL_OFF_TIME makes the parameter's purpose much clearer. However, there's a word duplication: "current current".

- `AUDIO_LIGHTRAG_COOL_OFF_TIME_SECONDS`: Time threshold for audio processing in seconds (default: 60). Files will not be processed if uploaded earlier than cooloff. Currently disabled, pass the current current tz stamp of directus in run_etl to enable
+ `AUDIO_LIGHTRAG_COOL_OFF_TIME_SECONDS`: Time threshold for audio processing in seconds (default: 60). Files will not be processed if uploaded earlier than cooloff. Currently disabled, pass the current tz stamp of directus in run_etl to enable
🧰 Tools
🪛 LanguageTool

[uncategorized] ~47-~47: A comma may be missing after the conjunctive/linking adverb ‘Currently’.
Context: ...essed if uploaded earlier than cooloff. Currently disabled, pass the current current tz s...

(SENT_START_CONJUNCTIVE_LINKING_ADVERB_COMMA)


[duplication] ~47-~47: Possible typo: you repeated a word.
Context: ...n cooloff. Currently disabled, pass the current current tz stamp of directus in run_etl to enab...

(ENGLISH_WORD_REPEAT_RULE)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a2a128a and 29c469c.

📒 Files selected for processing (3)
  • echo/docs/litellm_config.md (1 hunks)
  • echo/server/dembrane/audio_lightrag/main/run_etl.py (6 hunks)
  • echo/server/dembrane/config.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • echo/server/dembrane/config.py
🧰 Additional context used
🧬 Code Graph Analysis (1)
echo/server/dembrane/audio_lightrag/main/run_etl.py (3)
echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py (2)
  • DirectusETLPipeline (14-129)
  • run (122-129)
echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py (1)
  • run (151-154)
echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py (1)
  • run (123-126)
🪛 LanguageTool
echo/docs/litellm_config.md

[uncategorized] ~47-~47: A comma may be missing after the conjunctive/linking adverb ‘Currently’.
Context: ...essed if uploaded earlier than cooloff. Currently disabled, pass the current current tz s...

(SENT_START_CONJUNCTIVE_LINKING_ADVERB_COMMA)


[duplication] ~47-~47: Possible typo: you repeated a word.
Context: ...n cooloff. Currently disabled, pass the current current tz stamp of directus in run_etl to enab...

(ENGLISH_WORD_REPEAT_RULE)


[uncategorized] ~57-~57: Loose punctuation mark.
Context: ...ne. - AUDIO_LIGHTRAG_REDIS_LOCK_EXPIRY: Time in seconds before a Redis lock exp...

(UNLIKELY_OPENING_PUNCTUATION)

⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: ci-check-server
🔇 Additional comments (9)
echo/server/dembrane/audio_lightrag/main/run_etl.py (7)

7-11: Solid Redis config imports! 🚀

Clean separation of Redis config params with explicit naming conventions. This makes it clear these are specific to the audio_lightrag ETL pipeline and avoids namespace collisions.


32-32: Slick docstring update!

Accurate reflection of the new Redis locking capability makes for self-documenting code.


46-65: Atomic lock acquisition FTW! 💯

Excellent implementation of the Redis lock system. You've properly used Redis's atomic SET with NX and expiry options, crushing any race conditions. Great job adding TTL checking for informative logging too - that'll be a huge help during debugging.


66-70: Perfect early return pattern!

Smart check to bail out early if all conversation IDs are locked - no point in spinning up the ETL pipeline machinery if there's nothing to process.


75-77: Clean pipeline execution update!

Setting run_timestamp=None explicitly makes it clear this is an intentional choice rather than a forgotten parameter. The numbered logging (1/3) provides great visibility into the pipeline progress.


86-86: Consistent progress logging! 👌

Maintaining the X/3 format across all pipeline stages helps engineers tracking logs to understand exactly where something failed.

Also applies to: 95-95


105-112: Robust failure handling!

Great defensive programming - releasing locks on failure ensures the system can recover gracefully. The try/except around the release operation itself is particularly thorough.

One small optimization I'd consider: initialize filtered_conv_ids before the first try block to ensure it's always defined even if Redis initialization fails.

def run_etl_pipeline(conv_id_list: list[str]) -> Optional[bool]:
    """
    ...
    """
+   filtered_conv_ids: list[str] = []
    try:
        if not conv_id_list:
            logger.error("Empty conversation ID list provided")
            return None

        # Filter conversation IDs that are already being processed (via Redis locks)
        redis_client = redis.from_url(REDIS_URL)
-       filtered_conv_ids = []
echo/docs/litellm_config.md (2)

53-54: Flag renaming for consistency

Great, renamed the feature flag to match the naming pattern of other flags.


55-57: Redis lock docs looking sharp! 🔐

Clean documentation of the Redis lock config parameters. The explanations are clear and give users enough context to understand what these settings control without diving into the implementation.

🧰 Tools
🪛 LanguageTool

[uncategorized] ~57-~57: Loose punctuation mark.
Context: ...ne. - AUDIO_LIGHTRAG_REDIS_LOCK_EXPIRY: Time in seconds before a Redis lock exp...

(UNLIKELY_OPENING_PUNCTUATION)

@spashii spashii added this pull request to the merge queue May 1, 2025
github-merge-queue bot pushed a commit that referenced this pull request May 1, 2025
…essing (#124)

* Enhance ETL pipeline logging and audio processing (ECHO-165)

- Updated logging messages in run_etl.py to indicate progress through the ETL pipeline stages.
- Refactored AudioETLPipeline to separate processing of audio and non-audio files, improving clarity and functionality.
- Enhanced ContextualChunkETLPipeline to handle non-audio segments more effectively, including transcript management and error handling.
- Improved DirectusETLPipeline to ensure proper handling of missing audio paths by introducing a placeholder value.
- Streamlined ProcessTracker methods for better data management and clarity.

* Enhance LiteLLM configuration and ETL pipeline with Redis lock support

- Updated LiteLLM configuration documentation to include new settings for Redis lock management, including `REDIS_LOCK_PREFIX` and `REDIS_LOCK_EXPIRY`.
- Refactored `run_etl.py` to implement Redis locks, preventing concurrent processing of the same conversation ID and ensuring smoother ETL pipeline execution.
- Changed `AUDIO_LIGHTRAG_TIME_THRESHOLD_SECONDS` to `AUDIO_LIGHTRAG_COOL_OFF_TIME_SECONDS` for clarity in audio processing settings.
- Improved error handling and logging in the ETL pipeline to track Redis lock status and processing flow.
- Cleaned up unused code in the audio ETL pipeline for better maintainability.

* Refactor LiteLLM configuration and enhance ETL pipeline functionality

- Updated LiteLLM configuration documentation to rename `AUTO_SELECT_ENABLED` to `ENABLE_CHAT_AUTO_SELECT` for clarity.
- Improved Redis lock handling in `run_etl.py` to atomically acquire locks and provide informative logging for conversation processing.
- Enhanced `AudioETLPipeline` to streamline chunk processing by fetching transcripts in bulk and improving mapping logic for conversation segments.
- Refactored `ContextualChunkETLPipeline` to improve error handling and response validation during item insertion in Directus.

* Refactor LiteLLM configuration and update ETL pipeline for Redis lock handling

- Updated LiteLLM configuration documentation to clarify the `AUDIO_LIGHTRAG_COOL_OFF_TIME_SECONDS` setting and its dependency on the current timezone stamp.
- Renamed `REDIS_LOCK_PREFIX` to `AUDIO_LIGHTRAG_REDIS_LOCK_PREFIX` in the configuration to improve clarity and consistency.
- Adjusted `run_etl.py` to utilize the new `AUDIO_LIGHTRAG_REDIS_LOCK_PREFIX` for Redis lock management, ensuring proper handling of conversation IDs during processing.

* Update LiteLLM configuration for Redis lock handling

- Renamed `REDIS_LOCK_EXPIRY` to `AUDIO_LIGHTRAG_REDIS_LOCK_EXPIRY` in the configuration for consistency.
- Adjusted `run_etl.py` to utilize the new `AUDIO_LIGHTRAG_REDIS_LOCK_EXPIRY` variable for Redis lock management, ensuring proper handling of conversation IDs during processing.
- Updated documentation to reflect the changes in Redis lock configuration.
@spashii spashii removed this pull request from the merge queue due to the queue being cleared May 1, 2025
@spashii spashii merged commit a13d618 into main May 2, 2025
7 checks passed
@coderabbitai coderabbitai bot mentioned this pull request Jul 21, 2025
@spashii spashii deleted the ECHO-165-ingest-from-upload-text-upload-audio-implement-lock-for-processing branch October 30, 2025 12:03
spashii added a commit that referenced this pull request Nov 18, 2025
…essing (#124)

* Enhance ETL pipeline logging and audio processing (ECHO-165)

- Updated logging messages in run_etl.py to indicate progress through the ETL pipeline stages.
- Refactored AudioETLPipeline to separate processing of audio and non-audio files, improving clarity and functionality.
- Enhanced ContextualChunkETLPipeline to handle non-audio segments more effectively, including transcript management and error handling.
- Improved DirectusETLPipeline to ensure proper handling of missing audio paths by introducing a placeholder value.
- Streamlined ProcessTracker methods for better data management and clarity.

* Enhance LiteLLM configuration and ETL pipeline with Redis lock support

- Updated LiteLLM configuration documentation to include new settings for Redis lock management, including `REDIS_LOCK_PREFIX` and `REDIS_LOCK_EXPIRY`.
- Refactored `run_etl.py` to implement Redis locks, preventing concurrent processing of the same conversation ID and ensuring smoother ETL pipeline execution.
- Changed `AUDIO_LIGHTRAG_TIME_THRESHOLD_SECONDS` to `AUDIO_LIGHTRAG_COOL_OFF_TIME_SECONDS` for clarity in audio processing settings.
- Improved error handling and logging in the ETL pipeline to track Redis lock status and processing flow.
- Cleaned up unused code in the audio ETL pipeline for better maintainability.

* Refactor LiteLLM configuration and enhance ETL pipeline functionality

- Updated LiteLLM configuration documentation to rename `AUTO_SELECT_ENABLED` to `ENABLE_CHAT_AUTO_SELECT` for clarity.
- Improved Redis lock handling in `run_etl.py` to atomically acquire locks and provide informative logging for conversation processing.
- Enhanced `AudioETLPipeline` to streamline chunk processing by fetching transcripts in bulk and improving mapping logic for conversation segments.
- Refactored `ContextualChunkETLPipeline` to improve error handling and response validation during item insertion in Directus.

* Refactor LiteLLM configuration and update ETL pipeline for Redis lock handling

- Updated LiteLLM configuration documentation to clarify the `AUDIO_LIGHTRAG_COOL_OFF_TIME_SECONDS` setting and its dependency on the current timezone stamp.
- Renamed `REDIS_LOCK_PREFIX` to `AUDIO_LIGHTRAG_REDIS_LOCK_PREFIX` in the configuration to improve clarity and consistency.
- Adjusted `run_etl.py` to utilize the new `AUDIO_LIGHTRAG_REDIS_LOCK_PREFIX` for Redis lock management, ensuring proper handling of conversation IDs during processing.

* Update LiteLLM configuration for Redis lock handling

- Renamed `REDIS_LOCK_EXPIRY` to `AUDIO_LIGHTRAG_REDIS_LOCK_EXPIRY` in the configuration for consistency.
- Adjusted `run_etl.py` to utilize the new `AUDIO_LIGHTRAG_REDIS_LOCK_EXPIRY` variable for Redis lock management, ensuring proper handling of conversation IDs during processing.
- Updated documentation to reflect the changes in Redis lock configuration.

---------

Co-authored-by: Sameer Pashikanti <63326129+spashii@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants