RAG ETL Performance Improvements: Parallel LLM, Batch Processing & Bug Fixes#322
RAG ETL Performance Improvements: Parallel LLM, Batch Processing & Bug Fixes#322
Conversation
improvements: error handling, validation, and 3-stage pipeline - Add async/event loop isolation with new async_utils.py wrapper - Add audio file validation and multi-format fallback decoding - Implement 3-stage ETL pipeline with proper priorities and timeouts - Add comprehensive Directus error handling with salvage mode - Add ProcessTracker serialization for data passing between stages - Fix pandas FutureWarning with StringIO in ProcessTracker Bug Fixes: - Fix ETL auto-trigger by using .send() instead of group().run() - Add graceful fallback for RAG query SQL errors Testing & Monitoring: - Add comprehensive test suite (test_etl_stages.py) - Add ETL workflow monitoring scripts - Add RAG query testing utilities Note: Currently using direct function calls for task chaining due to dramatiq actor discovery issue. All error handling and validation improvements are active and tested.
…caching for S3 audio streams
…d caching option for audio decoding
…and improve error handling; update Directus ETL to gracefully manage empty chunks; optimize imports in utility modules.
|
Hi @dtrn2048! Thank you for contributing to Dembrane ECHO! Before we consider your Pull Request, we ask that you sign our Contributor License Agreement (CLA). This is only required for your first Pull Request. Please review the CLA, and sign it by adding your GitHub username to the contributors.yml file. Thanks! |
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughRemoves legacy audio LightRAG ETL pipelines and tests, pivots tasks to a text-only RAG flow, adds async helpers and audio validation utilities, introduces a ConversationContextualizer service, adds diagnostic RAG scripts, tightens RAG API error handling (HTTP 503), and adjusts scheduler and logging. LGTM. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: ASSERTIVE Plan: Pro Disabled knowledge base sources:
📒 Files selected for processing (1)
🧰 Additional context used🧬 Code graph analysis (1)echo/server/dembrane/api/chat.py (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
🔇 Additional comments (4)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py (1)
42-255: Flush batched Directus writes even when the load flow errors.
BatchDirectusWriteris instantiated at Line 43 but only flushed at Line 254. If anything raises in between (Directus I/O, LLM call, LightRAG insert, etc.), we bail out before hitting that flush, permanently dropping every queued transcript + flag update we already staged. That’s a nasty regression versus the old immediate writes. Wrap the writer in a context manager or atry/finallysoflush()runs on every exit path.- batch_writer = BatchDirectusWriter(auto_flush_size=20) - - for conversation_id in self.process_tracker().conversation_id.unique(): + with BatchDirectusWriter(auto_flush_size=20) as batch_writer: + for conversation_id in self.process_tracker().conversation_id.unique(): @@ - # Flush all batched writes at the end - logger.info("Flushing batched Directus writes...") - batch_writer.flush() - logger.info("All batched writes completed") + logger.info("All batched writes completed")echo/server/dembrane/audio_lightrag/utils/audio_utils.py (1)
226-305: Return the right leftoversThe new validation short-circuits invalid chunks, but we still slice
unprocessed_chunk_file_uri_liby a raw count. If the first entry is invalid and the next one processes, we return a list that still contains the processed chunk — hello duplicate segment spam and potential infinite loops. Track invalid/processed IDs explicitly and rebuild the remaining URI list instead of relying on positional slicing.- chunk_id_2_size = {} + chunk_id_2_size = {} + invalid_chunk_ids = set() @@ - if not is_valid: - logger.warning(f"Skipping invalid audio file {chunk_id} ({uri}): {error_msg}") + if not is_valid: + logger.warning(f"Skipping invalid audio file {chunk_id} ({uri}): {error_msg}") + invalid_chunk_ids.add(chunk_id) continue @@ - except Exception as e: - logger.error(f"Error calculating size for {chunk_id} ({uri}): {e}") + except Exception as e: + logger.error(f"Error calculating size for {chunk_id} ({uri}): {e}") + invalid_chunk_ids.add(chunk_id) continue + + def _remaining_chunk_uris(processed_ids: set[str]) -> list[str]: + ordered_ids = list(chunk_id_2_uri.keys()) + return [ + chunk_id_2_uri[chunk_id] + for chunk_id in ordered_ids + if chunk_id not in processed_ids and chunk_id not in invalid_chunk_ids + ] @@ - return ([], [], counter) + return ([], [], counter) @@ - return unprocessed_chunk_file_uri_li[1:], chunk_id_2_segment, counter + processed_ids = {chunk_id} + return _remaining_chunk_uris(processed_ids), chunk_id_2_segment, counter @@ - return unprocessed_chunk_file_uri_li[len(processed_chunk_li) :], chunk_id_2_segment, counter + processed_ids = set(processed_chunk_li) + return _remaining_chunk_uris(processed_ids), chunk_id_2_segment, counter
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (19)
.gitignore(1 hunks)echo/server/dembrane/api/stateless.py(1 hunks)echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py(3 hunks)echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py(10 hunks)echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py(4 hunks)echo/server/dembrane/audio_lightrag/utils/async_utils.py(1 hunks)echo/server/dembrane/audio_lightrag/utils/audio_utils.py(3 hunks)echo/server/dembrane/audio_lightrag/utils/batch_directus.py(1 hunks)echo/server/dembrane/audio_lightrag/utils/parallel_llm.py(1 hunks)echo/server/dembrane/audio_lightrag/utils/process_tracker.py(2 hunks)echo/server/dembrane/audio_lightrag/utils/s3_cache.py(1 hunks)echo/server/dembrane/scheduler.py(1 hunks)echo/server/dembrane/tasks.py(4 hunks)echo/server/scripts/monitor_etl_workflow.py(1 hunks)echo/server/scripts/rag_etl_observer.py(1 hunks)echo/server/scripts/simple_rag_observer.py(1 hunks)echo/server/scripts/test_etl_stages.py(1 hunks)echo/server/scripts/test_rag_query.py(1 hunks)echo/server/scripts/test_trigger_directus_etl.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (10)
echo/server/dembrane/audio_lightrag/utils/s3_cache.py (1)
echo/server/dembrane/s3.py (1)
get_stream_from_s3(179-183)
echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py (6)
echo/server/dembrane/audio_lightrag/utils/async_utils.py (1)
run_async_in_new_loop(16-77)echo/server/dembrane/audio_lightrag/utils/audio_utils.py (2)
wav_to_str(179-182)safe_audio_decode(68-136)echo/server/dembrane/audio_lightrag/utils/parallel_llm.py (1)
parallel_llm_calls(116-167)echo/server/dembrane/audio_lightrag/utils/litellm_utils.py (1)
get_json_dict_from_audio(34-93)echo/server/dembrane/audio_lightrag/utils/batch_directus.py (3)
BatchDirectusWriter(17-136)queue_update(43-52)flush(120-128)echo/server/dembrane/audio_lightrag/utils/echo_utils.py (1)
renew_redis_lock(37-67)
echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py (4)
echo/server/dembrane/audio_lightrag/utils/batch_directus.py (2)
BatchDirectusWriter(17-136)queue_create(54-63)echo/server/dembrane/audio_lightrag/utils/echo_utils.py (1)
renew_redis_lock(37-67)echo/server/dembrane/audio_lightrag/utils/audio_utils.py (2)
process_audio_files(185-305)create_directus_segment(313-334)echo/server/dembrane/audio_lightrag/utils/process_tracker.py (1)
update_value_for_chunk_id(38-39)
echo/server/dembrane/audio_lightrag/utils/audio_utils.py (2)
echo/server/dembrane/s3.py (2)
save_audio_to_s3(205-222)get_stream_from_s3(179-183)echo/server/dembrane/audio_lightrag/utils/s3_cache.py (1)
get_cached_s3_stream(135-147)
echo/server/scripts/simple_rag_observer.py (1)
echo/server/scripts/rag_etl_observer.py (2)
main(542-728)close(215-219)
echo/server/scripts/test_etl_stages.py (5)
echo/server/dembrane/audio_lightrag/utils/process_tracker.py (4)
ProcessTracker(8-66)to_dict(41-51)from_dict(54-66)get_project_df(32-33)echo/server/dembrane/tasks.py (4)
task_run_directus_etl(227-269)task_run_audio_etl(279-316)task_run_contextual_etl(325-366)task_run_etl_pipeline(375-468)echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py (1)
DirectusETLPipeline(19-311)echo/server/dembrane/audio_lightrag/utils/async_utils.py (1)
run_async_in_new_loop(16-77)echo/server/dembrane/audio_lightrag/utils/audio_utils.py (1)
validate_audio_file(22-65)
echo/server/scripts/test_rag_query.py (2)
echo/server/dembrane/rag_manager.py (4)
RAGManager(14-93)is_initialized(49-57)initialize(60-93)get_instance(32-46)echo/server/dembrane/audio_lightrag/utils/async_utils.py (1)
run_async_in_new_loop(16-77)
echo/server/scripts/test_trigger_directus_etl.py (1)
echo/server/dembrane/tasks.py (1)
task_run_directus_etl(227-269)
echo/server/scripts/rag_etl_observer.py (2)
echo/server/scripts/simple_rag_observer.py (1)
main(141-181)echo/server/dembrane/service/conversation.py (1)
update(123-160)
echo/server/dembrane/tasks.py (6)
echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py (3)
DirectusException(15-16)DirectusETLPipeline(19-311)run(294-301)echo/server/dembrane/processing_status_utils.py (1)
ProcessingStatusContext(80-188)echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py (2)
run(257-263)ContextualChunkETLPipeline(25-263)echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py (2)
run(164-167)AudioETLPipeline(20-167)echo/server/dembrane/audio_lightrag/utils/process_tracker.py (3)
to_dict(41-51)ProcessTracker(8-66)from_dict(54-66)echo/server/dembrane/audio_lightrag/utils/echo_utils.py (2)
finish_conversation(24-34)release_redis_lock(70-82)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: ci-check-server
🔇 Additional comments (1)
echo/server/dembrane/scheduler.py (1)
20-26: LGTM. Ship it.Dropping the interval to 1 minute is chef's kiss 👌 given your ETL pipeline now rips through conversations in ~10s instead of 33 minutes. More frequent discovery = lower latency, and with those batch + parallel LLM gains you're not gonna hammer the system. Classic win-win optimization.
|
another todo - still need to fix issue where pipeline only runs every 1m when the scheduler calls it. |
There was a problem hiding this comment.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py(4 hunks)echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py (1)
echo/frontend/src/lib/directus.ts (1)
directus(6-14)
echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py (4)
echo/server/dembrane/audio_lightrag/utils/batch_directus.py (2)
BatchDirectusWriter(17-136)queue_create(54-63)echo/server/dembrane/audio_lightrag/utils/echo_utils.py (1)
renew_redis_lock(37-67)echo/server/dembrane/audio_lightrag/utils/audio_utils.py (2)
process_audio_files(185-305)create_directus_segment(313-334)echo/server/dembrane/audio_lightrag/utils/process_tracker.py (1)
update_value_for_chunk_id(38-39)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Cursor Bugbot
- GitHub Check: ci-check-server
echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py
Outdated
Show resolved
Hide resolved
echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py
Outdated
Show resolved
Hide resolved
…ture of the RAG system
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
README.md(1 hunks)
🧰 Additional context used
🪛 markdownlint-cli2 (0.18.1)
README.md
103-103: Trailing spaces
Expected: 0 or 2; Actual: 1
(MD009, no-trailing-spaces)
103-103: Files should end with a single newline character
(MD047, single-trailing-newline)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Cursor Bugbot
- GitHub Check: ci-check-server
- Removed the Directus ETL pipeline implementation and related classes. - Introduced a new service for contextualizing conversation transcripts using Claude. - Updated task_run_etl_pipeline to process finished conversations using existing transcripts without audio processing. - Enhanced async utility functions to manage persistent event loops per thread. - Adjusted worker configurations for improved performance. - Cleaned up unused code and files, including the ProcessTracker utility.
There was a problem hiding this comment.
Actionable comments posted: 7
♻️ Duplicate comments (1)
echo/server/dembrane/audio_lightrag/utils/audio_utils.py (1)
22-66: Stop nuking every S3 chunk.
requests.head()againsts3://...URIs will raise "No connection adapters" and torpedo your entire audio queue. Every S3 file fails validation → zero audio processed. This is the same critical issue flagged in the previous review.Detect S3 URIs upfront and validate via S3 helpers:
+from urllib.parse import urlparse + def validate_audio_file(chunk_uri: str, min_size_bytes: int = 1000) -> tuple[bool, str]: """ Validate audio file before processing to prevent ffmpeg failures. @@ -40,6 +42,24 @@ try: + parsed = urlparse(chunk_uri) + if parsed.scheme == "s3": + try: + stream = get_cached_s3_stream(chunk_uri) or get_stream_from_s3(chunk_uri) + except Exception as exc: + return (False, f"Validation error: {exc}") + if stream is None: + return (False, "File not found in S3") + # Check size from stream + size = getattr(stream, "content_length", None) + if size is None and isinstance(stream, BytesIO): + size = stream.getbuffer().nbytes + if size is not None and size < min_size_bytes: + return (False, f"File too small: {size} bytes (minimum {min_size_bytes})") + return (True, "") + # Check if file exists and get metadata response = requests.head(chunk_uri, timeout=5)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (12)
echo/server/dembrane/audio_lightrag/main/run_etl.py(0 hunks)echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py(0 hunks)echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py(0 hunks)echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py(0 hunks)echo/server/dembrane/audio_lightrag/services/__init__.py(1 hunks)echo/server/dembrane/audio_lightrag/services/contextualizer.py(1 hunks)echo/server/dembrane/audio_lightrag/utils/async_utils.py(1 hunks)echo/server/dembrane/audio_lightrag/utils/audio_utils.py(3 hunks)echo/server/dembrane/audio_lightrag/utils/process_tracker.py(0 hunks)echo/server/dembrane/tasks.py(2 hunks)echo/server/prod-worker-cpu.sh(1 hunks)echo/server/run-worker-cpu.sh(1 hunks)
💤 Files with no reviewable changes (5)
- echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py
- echo/server/dembrane/audio_lightrag/main/run_etl.py
- echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py
- echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py
- echo/server/dembrane/audio_lightrag/utils/process_tracker.py
🧰 Additional context used
🧬 Code graph analysis (3)
echo/server/dembrane/tasks.py (4)
echo/server/dembrane/processing_status_utils.py (1)
ProcessingStatusContext(80-188)echo/server/dembrane/api/stateless.py (2)
InsertRequest(77-80)insert_item(89-127)echo/server/dembrane/audio_lightrag/utils/async_utils.py (1)
run_async_in_new_loop(59-95)echo/server/dembrane/audio_lightrag/services/contextualizer.py (2)
get_contextualizer(107-112)contextualize(16-65)
echo/server/dembrane/audio_lightrag/services/contextualizer.py (1)
echo/server/dembrane/audio_lightrag/utils/litellm_utils.py (1)
llm_model_func(96-117)
echo/server/dembrane/audio_lightrag/utils/audio_utils.py (2)
echo/server/dembrane/s3.py (2)
save_audio_to_s3(205-222)get_stream_from_s3(179-183)echo/server/dembrane/audio_lightrag/utils/s3_cache.py (1)
get_cached_s3_stream(135-147)
🪛 Shellcheck (0.11.0)
echo/server/run-worker-cpu.sh
[error] 1-1: Tips depend on target shell and yours is unknown. Add a shebang or a 'shell' directive.
(SC2148)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Cursor Bugbot
- GitHub Check: ci-check-server
🔇 Additional comments (16)
echo/server/prod-worker-cpu.sh (1)
2-2: Throughput boost looks dialed. Spinning up 8 procs with single-threaded workers is the right call for CPU-bound Dramatiq actors—no objections here.echo/server/dembrane/audio_lightrag/services/__init__.py (1)
1-1: Ship it.Clean package marker. LGTM.
echo/server/dembrane/audio_lightrag/services/contextualizer.py (5)
1-6: Imports dialed in.Standard imports, logger setup clean. LGTM.
16-37: Early return pattern ships.Empty transcript guard is solid defensive programming. The type hint says
strbut you're checkingnot transcriptwhich handles None – this is fine as a guard rail, but if None is expected, considerOptional[str].
54-65: Fallback pattern ships clean.Solid error handling with full traceback logging and graceful degradation. The fallback preserves the transcript with basic context – exactly what you want when Claude flakes.
67-100: Prompt engineering on point.The prompt structure is clean and explicit about enrichment vs summarization. Safe dict access with
.get()defaults, clear instructions to the model. This will ship well.
103-112: Singleton pattern ships for multi-process.The singleton pattern is simple and works for Dramatiq's multi-process model (each worker gets its own instance). If you ever move to multi-threaded workers, this isn't thread-safe and you'd want a lock. For now, this ships clean.
echo/server/dembrane/audio_lightrag/utils/async_utils.py (2)
38-56: LGTM on the persistent loop pattern.Double-checked locking is solid, and the one-loop-per-thread model correctly matches how Dramatiq workers process tasks sequentially. This avoids the "Future attached to different loop" errors while keeping resource pooling (DB connections, HTTP clients) alive across tasks.
89-93: LGTM onrun_until_complete()usage.Since Dramatiq tasks execute sequentially per thread, the loop won't be running when you call
run_until_complete(), so no "already running" conflicts. Clean.echo/server/dembrane/audio_lightrag/utils/audio_utils.py (3)
68-136: LGTM on fallback decode logic.Format fallback sequence is solid, and you're reusing cached streams to avoid redundant S3 downloads. The progressive failure logging (warning → debug → error) is chef's kiss. Only question: does retrying all 5 formats add latency for truly corrupted files? Might want to limit retries to 2-3 most common formats if you hit performance issues.
154-171: LGTM on safe size calculation.Switching to
safe_audio_decode()with proper exception propagation is the right move. Raises on total failure so callers know the file is toast.
226-244: LGTM on validation integration.Pre-filtering invalid files before size calculation prevents downstream ffmpeg explosions. Early return on empty
chunk_id_2_sizeis clean. The error handling loop (try/except/continue) ensures one bad apple doesn't kill the whole batch.echo/server/dembrane/tasks.py (4)
220-222: Ship it - time limit looks solid for text-only flow.5 min budget works for transcript-only pipeline. Max retries at 3 gives enough runway for transient Directus/Claude hiccups.
224-364: THE PIVOT works - past chaining issue is obliterated.Old 3-stage pipeline with actor chaining (past review flagged calling actors as plain callables) is gone. New single-task design eliminates the serialization footgun entirely. Text-only flow with proper async wrapping via
run_async_in_new_loop()matches the async_utils pattern.
394-398: LGTM -.send()is the right pattern for fire-and-forget.Using
.send()to enqueue follow-up tasks is correct. Past review comment about actor chaining doesn't apply here since you're not passing data between stages.
273-283: Verify Directuslimit: -1usage across calls. Confirm that usinglimit: -1for unlimited fetch is supported by Directus and won’t cause API limits, timeouts, or OOM on large result sets.
…; add stream closing in S3 cache
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
echo/server/dembrane/api/stateless.py (1)
265-280: Exception re-wrapping bug: HTTPException(503) gets caught and converted to 500.Yo, the inner try/except at line 265 raises
HTTPException(503)for RAG failures, but the outer except block at line 278 catches it (sinceHTTPExceptionis a subclass ofException) and re-raises it asHTTPException(500). Your 503 status code gets yeeted into 500-land before it hits the client.Apply this diff to let HTTPException propagate correctly:
except Exception as e: + # Don't re-wrap HTTPExceptions - let them propagate with their original status + if isinstance(e, HTTPException): + raise logger.exception("Query streaming operation failed") raise HTTPException(status_code=500, detail=str(e)) from eOr better yet, move the RAG query try/except outside the outer try block if the outer block is only meant to catch validation/param errors.
♻️ Duplicate comments (1)
echo/server/dembrane/audio_lightrag/utils/audio_utils.py (1)
22-76: Still rejecting every S3 chunk.HEADing an
s3://URL still blows up (No connection adapters…), so you drop every S3 audio before it ever hits ffmpeg. Branch on the URI scheme and run the validation through our S3 helpers (grab metadata/length from the StreamingBody or cached bytes, enforce min-size there, close the stream). Until that happens the ETL stays bricked.Here's a concrete patch:
+from urllib.parse import urlparse + def validate_audio_file(chunk_uri: str, min_size_bytes: int = 1000) -> tuple[bool, str]: @@ - response = requests.head(chunk_uri, timeout=5) + parsed = urlparse(chunk_uri) + if parsed.scheme == "s3": + stream = None + try: + stream = get_cached_s3_stream(chunk_uri) + if stream is None: + stream = get_stream_from_s3(chunk_uri) + if stream is None: + return (False, "File not found in S3") + + if hasattr(stream, "content_length") and stream.content_length is not None: + size = stream.content_length + elif isinstance(stream, BytesIO): + size = stream.getbuffer().nbytes + else: + data = stream.read() + size = len(data) + + if size < min_size_bytes: + return (False, f"File too small: {size} bytes (minimum {min_size_bytes})") + return (True, "") + finally: + if stream is not None and hasattr(stream, "close"): + try: + stream.close() + except Exception: + logger.debug("Failed to close S3 stream during validation", exc_info=True) + + response = requests.head(chunk_uri, timeout=5)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (6)
echo/server/dembrane/api/stateless.py(1 hunks)echo/server/dembrane/audio_lightrag/services/contextualizer.py(1 hunks)echo/server/dembrane/audio_lightrag/utils/audio_utils.py(3 hunks)echo/server/dembrane/audio_lightrag/utils/s3_cache.py(1 hunks)echo/server/run-worker-cpu.sh(1 hunks)echo/server/scripts/test_trigger_directus_etl.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
echo/server/dembrane/audio_lightrag/utils/s3_cache.py (1)
echo/server/dembrane/s3.py (1)
get_stream_from_s3(179-183)
echo/server/dembrane/audio_lightrag/services/contextualizer.py (1)
echo/server/dembrane/audio_lightrag/utils/litellm_utils.py (1)
llm_model_func(96-117)
echo/server/dembrane/audio_lightrag/utils/audio_utils.py (2)
echo/server/dembrane/s3.py (2)
save_audio_to_s3(205-222)get_stream_from_s3(179-183)echo/server/dembrane/audio_lightrag/utils/s3_cache.py (2)
get_cached_s3_stream(142-154)get(41-95)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Cursor Bugbot
- GitHub Check: ci-check-server
🔇 Additional comments (7)
echo/server/run-worker-cpu.sh (1)
1-4: Locked-in worker launcher.Shebang plus
set -euo pipefailmake this script production-grit. Concurrency flags match the new worker topology—ship it.echo/server/scripts/test_trigger_directus_etl.py (2)
1-20: Clean setup and diagnostics.Shebang, docstring, path manipulation, and metadata printing are solid. The diagnostic output will help verify task registration. LGTM.
22-38: Timeout fix applied, execution flow looks good.The timeout=30 on line 32 is now correct (previous review caught the 30000 bug). Task send, result wait, and error handling with traceback are all dialed in. Solid diagnostic script.
echo/server/dembrane/audio_lightrag/services/contextualizer.py (4)
1-7: LGTM! Imports are clean.All necessary modules present. asyncio for threading, logging for observability, and the LLM model func hooked up.
10-38: Solid structure, clean early exit.Class design is tight. The empty transcript guard prevents wasted LLM calls. Type hints are present. The "PIVOT" comment gives helpful context about the architectural shift.
56-67: Solid fallback strategy.The broad
Exceptioncatch is fine here since you have a valid fallback path. Logging withexc_info=Truegives you stack traces for debugging. The fallback document preserves the transcript with basic context, so RAG insertion doesn't fail completely.
105-114: Singleton pattern works for the worker config.Standard lazy initialization. Given the worker config (processes=8, threads=1), each process gets its own instance, so the lack of threading.Lock isn't an issue. If you ever scale to multi-threaded workers, add a lock around the initialization check.
|
multithreading is hard :( lightrag does not support it |
|
this PR is ready for review |
…ssful RAG insertion - Revert contextualizer to use audio_model_system_prompt.en.jinja (same as old pipeline) - Remove dead code (~900 lines): - Delete entire tests/ directory (for deleted audio ETL pipeline) - Delete monitoring scripts (monitor_etl_workflow.py, rag_etl_observer.py) - Delete unused utils (parallel_llm.py, azure_utils.py, prompts.py) - Clean up litellm_utils.py (remove audio processing functions) - Add debug logging to conversation_utils.py to track unprocessed segments
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
echo/server/dembrane/audio_lightrag/utils/litellm_utils.py (1)
44-56: Critical: embedding_func blocks the event loop.This function is marked
asyncbut runs synchronousembedding()calls in a loop (line 48), blocking the event loop. Meanwhile,llm_model_funccorrectly offloads withasyncio.to_thread. Inconsistent patterns = event loop errors, exactly what this PR aims to fix.Apply the same thread offloading pattern:
async def embedding_func(texts: list[str]) -> np.ndarray: # Bug in litellm forcing us to do this: https://github.com/BerriAI/litellm/issues/6967 nd_arr_response = [] for text in texts: - temp = embedding( + temp = await asyncio.to_thread( + embedding, model=f"{LIGHTRAG_LITELLM_EMBEDDING_MODEL}", input=text, api_key=str(LIGHTRAG_LITELLM_EMBEDDING_API_KEY), api_version=str(LIGHTRAG_LITELLM_EMBEDDING_API_VERSION), api_base=str(LIGHTRAG_LITELLM_EMBEDDING_API_BASE), ) nd_arr_response.append(temp["data"][0]["embedding"]) return np.array(nd_arr_response)
♻️ Duplicate comments (1)
echo/server/dembrane/tasks.py (1)
241-260: Stop swallowing Directus fetch failures.If Directus hiccups, we log and walk away. That acks the dramatiq message, ops see “success,” retries never happen, and the convo stays broken forever. Re-raise so the worker surfaces the failure and the job retries.
- try: - conversation_object = directus.get_item("conversation", conversation_id) - except Exception: - logger.error(f"Failed to get conversation {conversation_id}") - return + try: + conversation_object = directus.get_item("conversation", conversation_id) + except Exception as e: + logger.error(f"Failed to get conversation {conversation_id}: {e}") + raise ... - try: - project = directus.get_item("project", project_id) - is_enabled = project.get("is_enhanced_audio_processing_enabled", False) - except Exception as e: - logger.error(f"Failed to get project {project_id}: {e}") - return + try: + project = directus.get_item("project", project_id) + is_enabled = project.get("is_enhanced_audio_processing_enabled", False) + except Exception as e: + logger.error(f"Failed to get project {project_id}: {e}") + raise
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (12)
echo/server/dembrane/audio_lightrag/services/contextualizer.py(1 hunks)echo/server/dembrane/audio_lightrag/tests/conftest.py(0 hunks)echo/server/dembrane/audio_lightrag/tests/e2e_test.py(0 hunks)echo/server/dembrane/audio_lightrag/tests/unit_tests/audio_etl_pipeline_test.py(0 hunks)echo/server/dembrane/audio_lightrag/tests/unit_tests/contextual_chunk_etl_pipeline_test.py(0 hunks)echo/server/dembrane/audio_lightrag/tests/unit_tests/directus_etl_pipeline_test.py(0 hunks)echo/server/dembrane/audio_lightrag/tests/unit_tests/neo4j_test.py(0 hunks)echo/server/dembrane/audio_lightrag/utils/azure_utils.py(0 hunks)echo/server/dembrane/audio_lightrag/utils/litellm_utils.py(2 hunks)echo/server/dembrane/audio_lightrag/utils/prompts.py(0 hunks)echo/server/dembrane/conversation_utils.py(1 hunks)echo/server/dembrane/tasks.py(3 hunks)
💤 Files with no reviewable changes (8)
- echo/server/dembrane/audio_lightrag/tests/conftest.py
- echo/server/dembrane/audio_lightrag/utils/prompts.py
- echo/server/dembrane/audio_lightrag/tests/unit_tests/neo4j_test.py
- echo/server/dembrane/audio_lightrag/tests/unit_tests/contextual_chunk_etl_pipeline_test.py
- echo/server/dembrane/audio_lightrag/tests/unit_tests/directus_etl_pipeline_test.py
- echo/server/dembrane/audio_lightrag/tests/unit_tests/audio_etl_pipeline_test.py
- echo/server/dembrane/audio_lightrag/tests/e2e_test.py
- echo/server/dembrane/audio_lightrag/utils/azure_utils.py
🧰 Additional context used
🧬 Code graph analysis (2)
echo/server/dembrane/tasks.py (4)
echo/server/dembrane/audio_lightrag/utils/echo_utils.py (1)
finish_conversation(24-34)echo/server/dembrane/api/stateless.py (2)
InsertRequest(77-80)insert_item(89-127)echo/server/dembrane/audio_lightrag/utils/async_utils.py (1)
run_async_in_new_loop(59-95)echo/server/dembrane/audio_lightrag/services/contextualizer.py (2)
get_contextualizer(97-102)contextualize(18-90)
echo/server/dembrane/audio_lightrag/services/contextualizer.py (2)
echo/server/dembrane/prompts.py (1)
render_prompt(58-91)echo/server/dembrane/audio_lightrag/utils/litellm_utils.py (1)
llm_model_func(19-41)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Cursor Bugbot
- GitHub Check: ci-check-server
🔇 Additional comments (10)
echo/server/dembrane/audio_lightrag/utils/litellm_utils.py (2)
1-1: LGTM – asyncio import ships.Necessary for the async thread offloading. Clean.
32-40: Solid async offload. Ships.Threading out the blocking litellm call keeps the event loop clean. Exactly what this codebase needs. Pattern looks good.
echo/server/dembrane/audio_lightrag/services/contextualizer.py (8)
1-8: LGTM!Clean imports, no circular deps. Using the project-wide prompts module instead of the now-deleted audio_lightrag version. Ship it.
10-24: LGTM!Async signature is correct, params are well-documented. The docstring nails the architecture pivot. Zero notes.
38-40: LGTM!Early bailout on empty input saves token spend. Good defensive play.
64-71: LGTM!Async call is correct. Past reviews flagged the blocking issue but that's been fixed in the llm_model_func implementation. Temperature at 0.3 is reasonable for this use case. You could make it configurable (previous review mentioned this as optional) but it's not blocking.
73-84: LGTM!JSON parsing is defensive with proper fallback. The
.get("CONTEXTUAL_TRANSCRIPT", response)handles both old and new response formats gracefully.
86-90: LGTM!Broad catch is fine with the graceful fallback to original transcript. The
exc_info=Truewill save you when Claude acts up. Solid resilience pattern.
93-102: LGTM!Simple singleton without locks. Good enough for single-threaded workers (which the PR mandates anyway due to LightRAG). Clean and pragmatic.
57-62: Prompt reference is correct. audio_model_system_prompt.en.jinja defines explicit “# Task 1” and “# Task 2” sections, so instructing the model to skip Task 1 and focus on Task 2 is valid.
…cessing is disabled
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (3)
echo/server/dembrane/tasks.py (3)
244-246: Still swallowing exceptions on conversation fetch.Previous review flagged this: returning without raising makes the task appear successful when it actually bailed. Ops can't distinguish between "conversation doesn't exist" and "Directus is down."
Re-raise after logging to surface the failure:
except Exception: - logger.error(f"Failed to get conversation {conversation_id}") - return + logger.error(f"Failed to get conversation {conversation_id}") + raise
258-260: Same swallow pattern on project fetch.Previous review caught this too. Silent returns hide whether the project is missing or Directus is flaking.
Re-raise to make it visible:
except Exception as e: - logger.error(f"Failed to get project {project_id}: {e}") - return + logger.error(f"Failed to get project {project_id}: {e}") + raise
328-332: Function-scoped imports still masking the cycle.Previous review noted this. The imports work but don't fix the root circular dependency. Consider extracting shared types (
InsertRequest,insert_item) into a separate module when you have bandwidth.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
echo/server/dembrane/tasks.py(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
echo/server/dembrane/tasks.py (5)
echo/server/dembrane/audio_lightrag/utils/echo_utils.py (1)
finish_conversation(24-34)echo/server/dembrane/processing_status_utils.py (1)
ProcessingStatusContext(80-188)echo/server/dembrane/api/stateless.py (2)
InsertRequest(77-80)insert_item(89-127)echo/server/dembrane/audio_lightrag/utils/async_utils.py (1)
run_async_in_new_loop(59-95)echo/server/dembrane/audio_lightrag/services/contextualizer.py (2)
get_contextualizer(97-102)contextualize(18-90)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: ci-check-server
🔇 Additional comments (3)
echo/server/dembrane/tasks.py (3)
221-223: Ship it. 5-min cap + 3 retries for the text-only flow.Solid constraints. No more audio grinding means 5 minutes is plenty, and the retry budget handles transient Directus/Claude hiccups without infinite loops.
262-269: Clean fix for the RAG-disabled infinite loop.Previous review wanted
finish_conversation()here to stop the scheduler from hammering this task every minute. You nailed it—conversation gets marked done even when RAG is off, and failures are logged. Ship it.
446-450: Confirm.send()vsgroup()is intentional
I see you replacedgroup([...]).run()with independent.send()calls, while we still usegroup([...]).run()at tasks.py:504—double-check you don’t need its callback/completion semantics for these three tasks.
- Update test_trigger_directus_etl.py to use task_run_etl_pipeline
- Remove unused s3_cache.py (164 lines)
There was a problem hiding this comment.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
echo/server/run-worker-cpu.sh(1 hunks)echo/server/scripts/test_trigger_directus_etl.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
echo/server/scripts/test_trigger_directus_etl.py (1)
echo/server/dembrane/tasks.py (1)
task_run_etl_pipeline(225-415)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: ci-check-server
3c365cd to
bc21589
Compare
…g Fixes (#322) # RAG ETL Pipeline Refactor: Drop-in Replacement ## Overview This PR replaces the existing 3-stage RAG ETL pipeline with a simplified version that uses existing transcripts from the standard Whisper pipeline. Audio processing is already handled by the main transcription workflow, so the RAG pipeline no longer needs to process audio. **Key changes:** - Removed 1,076 lines of audio processing code (5 pipeline files, 4 tasks) - New simplified task: fetch transcripts → contextualize with Claude → insert into RAG - Fixed event loop architecture to support multi-process workers - Updated worker configuration: `--processes 8 --threads 1` (was `--processes 4 --threads 6`) This is a drop-in replacement - old queued messages with the same task name will work without errors. ## What Changed ### Deleted Files (965 lines) - `audio_etl_pipeline.py` (182 lines) - Audio transcription/processing - `contextual_chunk_etl_pipeline.py` (263 lines) - Complex chunking logic - `directus_etl_pipeline.py` (311 lines) - Multi-stage coordination - `process_tracker.py` (67 lines) - Stage tracking - `run_etl.py` (142 lines) - Pipeline orchestration ### Deleted Tasks (222 lines) - `task_run_directus_etl()` - `task_run_audio_etl()` - `task_run_contextual_etl()` - Old `task_run_etl_pipeline()` (3-stage orchestrator) ### New/Modified Files **`services/contextualizer.py`** (new, 119 lines) - Claude-based transcript contextualization - Adds project context to transcripts for better RAG retrieval **`tasks.py`** (+111 lines) - New simplified `task_run_etl_pipeline()`: 1. Fetch conversation chunks with existing transcripts 2. Concatenate transcripts 3. Contextualize with Claude 4. Create segment record 5. Insert into LightRAG via `insert_item()` **`async_utils.py`** (modified) - Persistent event loops per thread (was creating/destroying loops per task) - Fixes "Future attached to different event loop" errors **Worker Scripts** - `run-worker-cpu.sh`: `--processes 8 --threads 1` (was `--processes 1 --threads 2`) - `prod-worker-cpu.sh`: `--processes 8 --threads 1` (was `--processes 4 --threads 6`) - Reason: LightRAG's internal locks only work with 1 thread per process, but multiple processes run in parallel ## Why This Approach Audio is already transcribed by the main Whisper pipeline and stored in `conversation_chunk.transcript`. The old RAG pipeline was re-processing audio unnecessarily. This version: - Uses existing transcripts (no duplicate work) - Adds contextual enrichment via Claude - Inserts into LightRAG for semantic search Net result: Same functionality, 1,076 fewer lines, simpler architecture. ## LightRAG Rearchitecture This PR does **not** change the LightRAG approach (Neo4j + PostgreSQL storage, entity extraction, etc.). That will be handled in a separate branch with proper planning and testing. ## Testing - ✅ Multiple conversations processed successfully - ✅ No event loop errors - ✅ Claude contextualization working - ✅ RAG insertion working - ✅ Processing time: ~20-50 seconds per conversation (includes Claude API call) ## Deployment Worker config change required: ```bash dramatiq --queues cpu --processes 8 --threads 1 dembrane.tasks ``` No other infrastructure changes needed. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - New Features - Added CLI tools: a RAG stats observer and a CLI test for RAG queries. - Added transcript contextualization service to improve inserted segment context. - Improvements - RAG failures now return a clear 503 “temporarily unavailable” message for graceful degradation. - Task scheduler now finishes pending conversations every 1 minute (was 3 minutes). - Pipeline pivoted to a transcript-first, text-only RAG flow for more reliable processing. - Removals - Legacy audio ETL pipelines and associated audio tests removed. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
RAG ETL Pipeline Refactor: Drop-in Replacement
Overview
This PR replaces the existing 3-stage RAG ETL pipeline with a simplified version that uses existing transcripts from the standard Whisper pipeline. Audio processing is already handled by the main transcription workflow, so the RAG pipeline no longer needs to process audio.
Key changes:
--processes 8 --threads 1(was--processes 4 --threads 6)This is a drop-in replacement - old queued messages with the same task name will work without errors.
What Changed
Deleted Files (965 lines)
audio_etl_pipeline.py(182 lines) - Audio transcription/processingcontextual_chunk_etl_pipeline.py(263 lines) - Complex chunking logicdirectus_etl_pipeline.py(311 lines) - Multi-stage coordinationprocess_tracker.py(67 lines) - Stage trackingrun_etl.py(142 lines) - Pipeline orchestrationDeleted Tasks (222 lines)
task_run_directus_etl()task_run_audio_etl()task_run_contextual_etl()task_run_etl_pipeline()(3-stage orchestrator)New/Modified Files
services/contextualizer.py(new, 119 lines)tasks.py(+111 lines)task_run_etl_pipeline():insert_item()async_utils.py(modified)Worker Scripts
run-worker-cpu.sh:--processes 8 --threads 1(was--processes 1 --threads 2)prod-worker-cpu.sh:--processes 8 --threads 1(was--processes 4 --threads 6)Why This Approach
Audio is already transcribed by the main Whisper pipeline and stored in
conversation_chunk.transcript. The old RAG pipeline was re-processing audio unnecessarily. This version:Net result: Same functionality, 1,076 fewer lines, simpler architecture.
LightRAG Rearchitecture
This PR does not change the LightRAG approach (Neo4j + PostgreSQL storage, entity extraction, etc.). That will be handled in a separate branch with proper planning and testing.
Testing
Deployment
Worker config change required:
No other infrastructure changes needed.
Summary by CodeRabbit
New Features
Improvements
Removals