Conversation
- Update Directus public URL handling for relative paths - Change API proxy target from localhost:8000 to localhost:8001 - Introduce new functions for saving bytes to S3 and checking file existence - Implement incremental merging of conversation chunks for real-time viewing - Adjust unfinished conversation filter to 3 minutes - Improve chunk processing with intelligent triage and streaming split - Add missing shebang to worker scripts
|
Hi @dtrn2048! Thank you for contributing to Dembrane ECHO! Before we consider your Pull Request, we ask that you sign our Contributor License Agreement (CLA). This is only required for your first Pull Request. Please review the CLA, and sign it by adding your GitHub username to the contributors.yml file. Thanks! |
WalkthroughImplements relative Directus URL resolution on the frontend; adds streaming S3/FFmpeg audio processing, size-aware splitting, incremental merging, batch Directus writers, async-loop utilities, ETL observer and test scripts, ProcessTracker serialization, task/workflow changes, S3 helpers, and minor script/.gitignore edits. (50 words) Changes
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
LGTM. Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 11
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (9)
echo/frontend/src/config.ts(1 hunks)echo/frontend/vite.config.ts(1 hunks)echo/server/dembrane/audio_utils.py(4 hunks)echo/server/dembrane/conversation_utils.py(1 hunks)echo/server/dembrane/s3.py(1 hunks)echo/server/dembrane/service/conversation.py(1 hunks)echo/server/dembrane/tasks.py(3 hunks)echo/server/run-worker-cpu.sh(1 hunks)echo/server/run-worker.sh(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
echo/server/dembrane/conversation_utils.py (1)
echo/server/dembrane/utils.py (1)
get_utc_timestamp(49-50)
echo/server/dembrane/audio_utils.py (3)
echo/server/dembrane/s3.py (6)
delete_from_s3(186-188)get_stream_from_s3(179-183)get_sanitized_s3_key(154-176)get_signed_url(143-151)get_file_size_bytes_from_s3(191-194)save_bytes_to_s3(205-226)echo/server/dembrane/utils.py (2)
get(67-79)generate_uuid(13-14)echo/server/dembrane/service/conversation.py (2)
delete(162-167)get_chunk_by_id_or_raise(169-201)
echo/server/dembrane/service/conversation.py (1)
echo/server/dembrane/tasks.py (2)
task_process_conversation_chunk(444-518)task_merge_conversation_chunks_incremental(224-308)
echo/server/dembrane/tasks.py (4)
echo/server/dembrane/audio_utils.py (3)
merge_multiple_audio_files_and_save_to_s3(291-447)should_split_chunk(787-813)split_audio_chunk_streaming(816-974)echo/server/dembrane/s3.py (1)
get_file_exists_in_s3(229-251)echo/server/dembrane/service/conversation.py (2)
get_by_id_or_raise(43-80)get_chunk_by_id_or_raise(169-201)echo/server/dembrane/processing_status_utils.py (1)
ProcessingStatusContext(80-188)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: ci-check-server
🔇 Additional comments (6)
echo/frontend/vite.config.ts (1)
38-38: Ship it. Port bump aligns with backend.Clean proxy retarget to 8001. Zero issues.
echo/frontend/src/config.ts (1)
9-13: Solid. Relative URL handling ships clean.The
startsWith('/')check correctly distinguishes relative from absolute URLs and origin prepending works as expected. Edge cases (protocol-relative, empty) handle gracefully given upstream env validation.echo/server/run-worker.sh (1)
1-2: Mirror CPU worker hardening: Add strict mode, exec, and env-tunable concurrency-#!/bin/bash -dramatiq-gevent --queues network --processes 1 --threads 2 dembrane.tasks +#!/bin/bash +set -Eeuo pipefail +PROCESSES="${WORKER_PROCESSES:-1}" +THREADS="${WORKER_THREADS:-2}" +exec dramatiq-gevent --queues network --processes "$PROCESSES" --threads "$THREADS" dembrane.tasksFile mode is already 100755 (executable). LGTM.
echo/server/dembrane/tasks.py (1)
443-444: Nice: add 10‑minute time limit to bound worker runtime.Keeps queue healthy and prevents runaway chunk jobs. LGTM.
echo/server/dembrane/audio_utils.py (2)
384-389: Verify libmp3lame acceptspresethere.
presetis typically for x264/x265, not mp3. If unsupported, ffmpeg errors out. Consider removing it and rely onqorb:a.Suggested:
- output_kwargs.update({"acodec": "libmp3lame", "q": "5", "preset": "veryfast"}) + output_kwargs.update({"acodec": "libmp3lame", "q": "5"})
369-377: Streaming concat playlist approach — solid.Presigned URLs + pipe concat + S3 PUT with diagnostics. LGTM.
Also applies to: 391-405, 418-425, 439-444
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
echo/server/dembrane/audio_utils.py (1)
759-789: Fix hardcoded suffix and return structure.Hardcoded
.mp3suffix misleads ffprobe for other formats. Function returns onlyprobe['format']while docstring promises full metadata.Apply this diff:
- with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as temp: + ext = get_file_format_from_file_path(s3_path) + with tempfile.NamedTemporaryFile(suffix=f'.{ext}', delete=False) as temp: temp.write(header_bytes) temp_path = temp.name try: probe = ffmpeg.probe(temp_path) - return probe['format'] + return probe finally: os.unlink(temp_path)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
.gitignore(1 hunks)echo/server/dembrane/audio_utils.py(5 hunks)echo/server/dembrane/service/conversation.py(2 hunks)echo/server/dembrane/tasks.py(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
echo/server/dembrane/service/conversation.py (1)
echo/server/dembrane/tasks.py (2)
task_process_conversation_chunk(459-537)task_merge_conversation_chunks_incremental(224-323)
echo/server/dembrane/audio_utils.py (3)
echo/server/dembrane/s3.py (6)
delete_from_s3(186-188)get_signed_url(143-151)get_stream_from_s3(179-183)get_sanitized_s3_key(154-176)get_file_size_bytes_from_s3(191-194)save_bytes_to_s3(205-226)echo/server/dembrane/utils.py (2)
get(67-79)generate_uuid(13-14)echo/server/dembrane/service/conversation.py (2)
delete(162-167)get_chunk_by_id_or_raise(169-201)
echo/server/dembrane/tasks.py (3)
echo/server/dembrane/audio_utils.py (3)
merge_multiple_audio_files_and_save_to_s3(291-450)should_split_chunk(791-817)split_audio_chunk_streaming(820-993)echo/server/dembrane/service/conversation.py (2)
get_by_id_or_raise(43-80)get_chunk_by_id_or_raise(169-201)echo/server/dembrane/processing_status_utils.py (1)
ProcessingStatusContext(80-188)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: ci-check-server
🔇 Additional comments (12)
echo/server/dembrane/audio_utils.py (6)
14-21: LGTM!Clean imports for the streaming overhaul. S3 helpers are well-organized.
325-367: LGTM!Streaming probe via signed URLs fixes the latency issue. Size check before append prevents invalid keys from breaking concat. Based on past review comments, both critical issues addressed.
369-426: LGTM!Streaming concat via presigned URLs is slick. No temp files, protocol whitelist covers network streaming, and error handling is solid.
712-713: LGTM!Proper stderr guard. No explosions on None.
791-817: LGTM!Smart triage via HEAD. Conservative fallback on error is solid.
820-993: LGTM!Streaming split with input
-ss(fast seek) and parallel uploads fixes the O(n²) pipe approach. Cleanup is bulletproof. Based on past review, this is a solid improvement.echo/server/dembrane/service/conversation.py (2)
273-293: LGTM!Proactive merge every 5 chunks enables real-time viewing. Non-fatal error handling is smart—background optimization shouldn't break chunk creation.
396-396: LGTM!
limit: -1fetches all chunks, fixing the undercounting issue beyond 100 chunks. Based on past review, this prevents repeated merge triggers.echo/server/dembrane/tasks.py (4)
2-2: LGTM!Added
Listfor type hints. Clean.
167-173: LGTM!Docstring clarifies complete vs incremental merge. Good context.
223-324: LGTM!Pagination fixes the 1000 chunk cap. Always re-merging is reasonable for now—future optimization can track timestamps. Non-fatal errors keep the main flow intact. Based on past review, this is solid.
458-537: LGTM!Intelligent triage, parallel transcription, and fixed diarization routing. Based on past review, diarization now targets split chunks (or original if no split), not deleted chunks.
There was a problem hiding this comment.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
echo/frontend/vite.config.ts(1 hunks)echo/server/dembrane/audio_utils.py(10 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
echo/server/dembrane/audio_utils.py (3)
echo/server/dembrane/s3.py (6)
delete_from_s3(186-188)get_signed_url(143-151)get_stream_from_s3(179-183)get_sanitized_s3_key(154-176)get_file_size_bytes_from_s3(191-194)save_bytes_to_s3(205-226)echo/server/dembrane/utils.py (2)
get(67-79)generate_uuid(13-14)echo/server/dembrane/service/conversation.py (2)
delete(162-167)get_chunk_by_id_or_raise(169-201)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: ci-check-server
🔇 Additional comments (1)
echo/frontend/vite.config.ts (1)
38-38: LGTM—confirm backend docs updated for port 8001 No references to port 8001 found in orchestration or config files; update README, docker-compose, and .env templates accordingly.
…g in audio_utils.py
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py (1)
50-53: Handle conversations with no valid segment IDs before joining.
",".join(...)will explode the first time Stage 2 skips every chunk (e.g. validation marked them invalid) because the Series still holdsNones. That throws aTypeError, bails the whole Stage 3 run, and leaves the convo locked forever. Yank theNaNs (and bail if nothing’s left) before you buildsegment_li.- segment_li = ",".join(audio_load_tracker.sort_values("timestamp").segment).split(",") - segment_li = [int(x) for x in list(dict.fromkeys(segment_li)) if x != ""] # type: ignore + segment_series = ( + audio_load_tracker.sort_values("timestamp").segment.dropna().astype(str) + ) + if segment_series.empty: + logger.warning( + f"No valid segments for conversation {conversation_id}; skipping contextual load." + ) + continue + + segment_li = ",".join(segment_series).split(",") + segment_li = [int(x) for x in list(dict.fromkeys(segment_li)) if x] # type: ignore
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (13)
echo/server/dembrane/api/stateless.py(1 hunks)echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py(3 hunks)echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py(3 hunks)echo/server/dembrane/audio_lightrag/utils/async_utils.py(1 hunks)echo/server/dembrane/audio_lightrag/utils/audio_utils.py(4 hunks)echo/server/dembrane/audio_lightrag/utils/process_tracker.py(2 hunks)echo/server/dembrane/tasks.py(7 hunks)echo/server/scripts/monitor_etl_workflow.py(1 hunks)echo/server/scripts/rag_etl_observer.py(1 hunks)echo/server/scripts/simple_rag_observer.py(1 hunks)echo/server/scripts/test_etl_stages.py(1 hunks)echo/server/scripts/test_rag_query.py(1 hunks)echo/server/scripts/test_trigger_directus_etl.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (8)
echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py (3)
echo/server/dembrane/audio_lightrag/utils/audio_utils.py (2)
wav_to_str(164-167)safe_audio_decode(67-121)echo/server/dembrane/audio_lightrag/utils/litellm_utils.py (1)
get_json_dict_from_audio(34-93)echo/server/dembrane/audio_lightrag/utils/async_utils.py (1)
run_async_in_new_loop(16-77)
echo/server/scripts/test_etl_stages.py (7)
echo/server/dembrane/audio_lightrag/utils/process_tracker.py (4)
ProcessTracker(8-66)to_dict(41-51)from_dict(54-66)get_project_df(32-33)echo/server/dembrane/tasks.py (3)
task_run_directus_etl(333-375)task_run_audio_etl(385-422)task_run_contextual_etl(431-472)echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py (1)
DirectusETLPipeline(19-312)echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py (1)
AudioETLPipeline(19-156)echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py (1)
ContextualChunkETLPipeline(28-228)echo/server/dembrane/audio_lightrag/utils/async_utils.py (1)
run_async_in_new_loop(16-77)echo/server/dembrane/audio_lightrag/utils/audio_utils.py (2)
validate_audio_file(21-64)safe_audio_decode(67-121)
echo/server/scripts/simple_rag_observer.py (1)
echo/server/scripts/rag_etl_observer.py (2)
main(542-728)close(215-219)
echo/server/scripts/test_trigger_directus_etl.py (1)
echo/server/dembrane/tasks.py (1)
task_run_directus_etl(333-375)
echo/server/dembrane/tasks.py (8)
echo/server/dembrane/audio_utils.py (3)
merge_multiple_audio_files_and_save_to_s3(297-456)should_split_chunk(792-818)split_audio_chunk_streaming(821-994)echo/server/dembrane/service/conversation.py (2)
get_by_id_or_raise(43-80)get_chunk_by_id_or_raise(169-201)echo/server/dembrane/processing_status_utils.py (1)
ProcessingStatusContext(80-188)echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py (3)
DirectusException(15-16)DirectusETLPipeline(19-312)run(295-302)echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py (2)
run(222-228)ContextualChunkETLPipeline(28-228)echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py (2)
run(153-156)AudioETLPipeline(19-156)echo/server/dembrane/audio_lightrag/utils/process_tracker.py (3)
to_dict(41-51)ProcessTracker(8-66)from_dict(54-66)echo/server/dembrane/audio_lightrag/utils/echo_utils.py (2)
finish_conversation(24-34)release_redis_lock(70-82)
echo/server/scripts/test_rag_query.py (2)
echo/server/dembrane/rag_manager.py (4)
RAGManager(14-93)is_initialized(49-57)initialize(60-93)get_instance(32-46)echo/server/dembrane/audio_lightrag/utils/async_utils.py (1)
run_async_in_new_loop(16-77)
echo/server/dembrane/audio_lightrag/utils/audio_utils.py (1)
echo/server/dembrane/s3.py (1)
get_stream_from_s3(179-183)
echo/server/scripts/rag_etl_observer.py (2)
echo/server/scripts/simple_rag_observer.py (1)
main(141-181)echo/server/dembrane/service/conversation.py (1)
update(123-160)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: ci-check-server
echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py
Outdated
Show resolved
Hide resolved
| # Create a brand new event loop just for this coroutine | ||
| loop = asyncio.new_event_loop() | ||
|
|
||
| try: | ||
| # Set it as the current event loop for this thread | ||
| asyncio.set_event_loop(loop) | ||
|
|
||
| # Run the coroutine to completion | ||
| logger.debug(f"Running async coroutine in new loop: {coro}") | ||
| result = loop.run_until_complete(coro) | ||
|
|
||
| logger.debug(f"Successfully completed async coroutine: {coro}") | ||
| return result | ||
|
|
||
| finally: | ||
| # Clean up: close the loop to free resources | ||
| try: | ||
| # Cancel any remaining tasks | ||
| pending = asyncio.all_tasks(loop) | ||
| for task in pending: | ||
| task.cancel() | ||
|
|
||
| # Wait for all tasks to finish cancelling | ||
| if pending: | ||
| loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) | ||
|
|
||
| # Close the loop | ||
| loop.close() | ||
| logger.debug("Closed event loop successfully") | ||
|
|
||
| except Exception as e: | ||
| logger.warning(f"Error while closing event loop: {e}") | ||
|
|
There was a problem hiding this comment.
Restore the previous event loop before closing the temporary one.
We clobber the thread’s current loop with a brand-new one, close it, and never reinstall the original. The very next asyncio call on this thread hits RuntimeError: Event loop is closed. Cache the prior loop (if any) and put it back after cleanup.
- loop = asyncio.new_event_loop()
-
- try:
- # Set it as the current event loop for this thread
- asyncio.set_event_loop(loop)
+ loop = asyncio.new_event_loop()
+ previous_loop = None
+
+ try:
+ try:
+ previous_loop = asyncio.get_event_loop()
+ except RuntimeError:
+ previous_loop = None
+
+ # Set it as the current event loop for this thread
+ asyncio.set_event_loop(loop)
@@
- except Exception as e:
- logger.warning(f"Error while closing event loop: {e}")
+ except Exception as e:
+ logger.warning(f"Error while closing event loop: {e}")
+ finally:
+ asyncio.set_event_loop(previous_loop)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Create a brand new event loop just for this coroutine | |
| loop = asyncio.new_event_loop() | |
| try: | |
| # Set it as the current event loop for this thread | |
| asyncio.set_event_loop(loop) | |
| # Run the coroutine to completion | |
| logger.debug(f"Running async coroutine in new loop: {coro}") | |
| result = loop.run_until_complete(coro) | |
| logger.debug(f"Successfully completed async coroutine: {coro}") | |
| return result | |
| finally: | |
| # Clean up: close the loop to free resources | |
| try: | |
| # Cancel any remaining tasks | |
| pending = asyncio.all_tasks(loop) | |
| for task in pending: | |
| task.cancel() | |
| # Wait for all tasks to finish cancelling | |
| if pending: | |
| loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) | |
| # Close the loop | |
| loop.close() | |
| logger.debug("Closed event loop successfully") | |
| except Exception as e: | |
| logger.warning(f"Error while closing event loop: {e}") | |
| # Create a brand new event loop just for this coroutine | |
| loop = asyncio.new_event_loop() | |
| previous_loop = None | |
| try: | |
| # Cache the prior event loop (if any) | |
| try: | |
| previous_loop = asyncio.get_event_loop() | |
| except RuntimeError: | |
| previous_loop = None | |
| # Set the new loop as the current event loop for this thread | |
| asyncio.set_event_loop(loop) | |
| # Run the coroutine to completion | |
| logger.debug(f"Running async coroutine in new loop: {coro}") | |
| result = loop.run_until_complete(coro) | |
| logger.debug(f"Successfully completed async coroutine: {coro}") | |
| return result | |
| finally: | |
| # Clean up: close the loop to free resources | |
| try: | |
| # Cancel any remaining tasks | |
| pending = asyncio.all_tasks(loop) | |
| for task in pending: | |
| task.cancel() | |
| # Wait for all tasks to finish cancelling | |
| if pending: | |
| loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) | |
| # Close the loop | |
| loop.close() | |
| logger.debug("Closed event loop successfully") | |
| except Exception as e: | |
| logger.warning(f"Error while closing event loop: {e}") | |
| finally: | |
| # Restore the original event loop (if there was one) | |
| asyncio.set_event_loop(previous_loop) |
echo/server/dembrane/tasks.py
Outdated
| # Stage 1: Directus ETL | ||
| logger.info(f">>> STAGE 1/3: Running Directus ETL for {conversation_id}") | ||
| process_tracker_data = task_run_directus_etl(conversation_id) | ||
| logger.info(f">>> STAGE 1/3: Directus ETL completed, got process_tracker_data: {type(process_tracker_data)}") | ||
|
|
||
| logger.info(f"Stage 1 complete, starting Stage 2 for {conversation_id}") | ||
|
|
||
| # Stage 2: Audio ETL | ||
| logger.info(f">>> STAGE 2/3: Running Audio ETL for {conversation_id}") | ||
| process_tracker_data = task_run_audio_etl(conversation_id, process_tracker_data) | ||
| logger.info(f">>> STAGE 2/3: Audio ETL completed") | ||
|
|
||
| logger.info(f"Stage 2 complete, starting Stage 3 for {conversation_id}") | ||
|
|
||
| # Stage 3: Contextual ETL | ||
| logger.info(f">>> STAGE 3/3: Running Contextual ETL for {conversation_id}") | ||
| task_run_contextual_etl(conversation_id, process_tracker_data) | ||
| logger.info(f">>> STAGE 3/3: Contextual ETL completed") |
There was a problem hiding this comment.
Call the underlying functions instead of the dramatiq actors.
Right now task_run_directus_etl(...) returns a Message, not the serialized tracker dict, because the actor decorator is still wrapping the function. We immediately feed that Message into Stage 2, so ProcessTracker.from_dict(...) blows up before touching the payload. Call the underlying .fn to execute synchronously (or use .send/.get_result() if you really want async).
- process_tracker_data = task_run_directus_etl(conversation_id)
+ process_tracker_data = task_run_directus_etl.fn(conversation_id)
@@
- process_tracker_data = task_run_audio_etl(conversation_id, process_tracker_data)
+ process_tracker_data = task_run_audio_etl.fn(conversation_id, process_tracker_data)
@@
- task_run_contextual_etl(conversation_id, process_tracker_data)
+ task_run_contextual_etl.fn(conversation_id, process_tracker_data)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Stage 1: Directus ETL | |
| logger.info(f">>> STAGE 1/3: Running Directus ETL for {conversation_id}") | |
| process_tracker_data = task_run_directus_etl(conversation_id) | |
| logger.info(f">>> STAGE 1/3: Directus ETL completed, got process_tracker_data: {type(process_tracker_data)}") | |
| logger.info(f"Stage 1 complete, starting Stage 2 for {conversation_id}") | |
| # Stage 2: Audio ETL | |
| logger.info(f">>> STAGE 2/3: Running Audio ETL for {conversation_id}") | |
| process_tracker_data = task_run_audio_etl(conversation_id, process_tracker_data) | |
| logger.info(f">>> STAGE 2/3: Audio ETL completed") | |
| logger.info(f"Stage 2 complete, starting Stage 3 for {conversation_id}") | |
| # Stage 3: Contextual ETL | |
| logger.info(f">>> STAGE 3/3: Running Contextual ETL for {conversation_id}") | |
| task_run_contextual_etl(conversation_id, process_tracker_data) | |
| logger.info(f">>> STAGE 3/3: Contextual ETL completed") | |
| # Stage 1: Directus ETL | |
| logger.info(f">>> STAGE 1/3: Running Directus ETL for {conversation_id}") | |
| process_tracker_data = task_run_directus_etl.fn(conversation_id) | |
| logger.info(f">>> STAGE 1/3: Directus ETL completed, got process_tracker_data: {type(process_tracker_data)}") | |
| logger.info(f"Stage 1 complete, starting Stage 2 for {conversation_id}") | |
| # Stage 2: Audio ETL | |
| logger.info(f">>> STAGE 2/3: Running Audio ETL for {conversation_id}") | |
| process_tracker_data = task_run_audio_etl.fn(conversation_id, process_tracker_data) | |
| logger.info(f">>> STAGE 2/3: Audio ETL completed") | |
| logger.info(f"Stage 2 complete, starting Stage 3 for {conversation_id}") | |
| # Stage 3: Contextual ETL | |
| logger.info(f">>> STAGE 3/3: Running Contextual ETL for {conversation_id}") | |
| task_run_contextual_etl.fn(conversation_id, process_tracker_data) | |
| logger.info(f">>> STAGE 3/3: Contextual ETL completed") |
🤖 Prompt for AI Agents
In echo/server/dembrane/tasks.py around lines 531 to 548, the code is calling
actor-wrapped functions (task_run_directus_etl, task_run_audio_etl,
task_run_contextual_etl) directly which returns dramatiq Message objects instead
of actual results; replace those direct calls with their synchronous underlying
function calls (use task_run_directus_etl.fn(conversation_id),
task_run_audio_etl.fn(conversation_id, process_tracker_data), and
task_run_contextual_etl.fn(conversation_id, process_tracker_data)) so each stage
receives the real dict/result, or alternatively use the actor API
(.send()/.get_result()) if you intend async execution; capture and pass the
returned dicts to the next stage.
There was a problem hiding this comment.
The proposed PR comment is not applicable: echo/server/dembrane/tasks.py does not
define or call task_run_directus_etl, task_run_audio_etl, or
task_run_contextual_etl, nor does it use Dramatiq actors. That section relies
on Celery chords/groups returning Celery AsyncResult objects, so the suggested
.fn / .send() changes do not apply.
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if the three ETL functions are defined in tasks.py
echo "=== Checking for function definitions in tasks.py ==="
rg -n "def task_run_directus_etl|def task_run_audio_etl|def task_run_contextual_etl" echo/server/dembrane/tasks.py
echo -e "\n=== Checking for any mentions of these functions in tasks.py ==="
rg -n "task_run_directus_etl|task_run_audio_etl|task_run_contextual_etl" echo/server/dembrane/tasks.py
echo -e "\n=== Checking what task framework is imported ==="
rg -n "^import dramatiq|^import celery|^from dramatiq|^from celery" echo/server/dembrane/tasks.py
echo -e "\n=== Checking for Celery-specific patterns (chord, group from celery) ==="
rg -n "from celery import|celery.chord|celery.group" echo/server/dembrane/tasks.py
echo -e "\n=== Checking the actual imports at the top of the file ==="
head -50 echo/server/dembrane/tasks.py
Length of output: 2323
🏁 Script executed:
#!/bin/bash
# Search the entire codebase for these function definitions
echo "=== Searching entire codebase for function definitions ==="
rg -n "def task_run_directus_etl|def task_run_audio_etl|def task_run_contextual_etl" --type py
echo -e "\n=== Check if they're defined in the audio_lightrag module ==="
fd -e py . echo/server/dembrane/audio_lightrag -x rg -l "task_run_directus_etl|task_run_audio_etl|task_run_contextual_etl" {}
echo -e "\n=== Search for any ETL stage functions in the codebase ==="
rg -n "def.*_etl\(" --type py echo/server/dembrane/audio_lightrag | head -20
Length of output: 181
@dtrn2048, you're absolutely right - I apologize for the incorrect comment. Those three functions (task_run_directus_etl, task_run_audio_etl, task_run_contextual_etl) do not exist anywhere in the codebase. The diff is calling functions that haven't been defined yet.
However, I need to clarify one point: the file does use Dramatiq (not Celery) as shown by the imports at the top:
import dramatiq
from dramatiq import groupThe actual issue is: The diff is trying to call three ETL stage functions that don't exist. The code should either:
- Continue using the existing
run_etl_pipeline([conversation_id])function (which is what the old code did and what's imported fromdembrane.audio_lightrag.main.run_etl), or - Define these three stage functions first before calling them
The current diff creates broken code that references undefined functions. My original suggestion to use .fn() was based on an incorrect assumption that these functions existed as Dramatiq actors.
🧬 Code Graph Analysis Results
-
File: echo/server/dembrane/audio_lightrag/main/run_etl.py
-
Line range: 29-31
-
Snippet:
def run_etl_pipeline(conv_id_list: list[str]) -> Optional[bool]:
"""
Runs the complete ETL pipeline including Directus, Audio, and Contextual Chunk processes.
Uses Redis locks to prevent the same conversation ID from being processed within 1 hour.Args: conv_id_list: List of conversation IDs to process Returns: bool: True if pipeline completes successfully, False if there's an error None: If input validation fails """
-
-
File: echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py
- Line range: 175-182
- Snippet:
def run(
self, conversation_id_list: Optional[List[str]] = None, run_timestamp: str | None = None
) -> ProcessTracker:
"""Run the full ETL pipeline: extract, transform, and load."""
conversation, project = self.extract(conversation_id_list=conversation_id_list)
conversation_df, project_df = self.transform(conversation, project, run_timestamp)
process_tracker = self.load_to_process_tracker(conversation_df, project_df)
return process_tracker
-
File: echo/server/dembrane/processing_status_utils.py
-
Line range: 80-188
-
Snippet:
class ProcessingStatusContext:
"""Context manager to automatically log processing status events with duration."""def __init__( self, project_id: Optional[str] = None, project_analysis_run_id: Optional[str] = None, conversation_id: Optional[str] = None, conversation_chunk_id: Optional[str] = None, message: Optional[str] = None, event_prefix: Optional[str] = None, ): """ Context manager to automatically log processing status events with duration. When entering the context, the context manager will log a STARTED event with the message and duration. When an exception occurs, the context manager will log a FAILED event with the error message and duration. When no exception occurs, the context manager will log COMPLETED event with the message and duration. Args: project_id: The ID of the project. (str) project_analysis_run_id: The ID of the project analysis run. (str) conversation_id: The ID of the conversation. (str) conversation_chunk_id: The ID of the conversation chunk. (str) message: The message to log. (str) event_prefix: The prefix of the event. (str) Conventionally, you will see this being set to method name. """ self.project_id = project_id self.project_analysis_run_id = project_analysis_run_id self.conversation_id = conversation_id self.conversation_chunk_id = conversation_chunk_id self.event_prefix = event_prefix self.message = message self.exit_message: Optional[str] = None # Custom exit message self.start_time: float = 0.0 self.logger = getLogger(f"status.{self.event_prefix}") self.processing_status_start_id: Optional[int] = None self.processing_status_failed_id: Optional[int] = None self.processing_status_completed_id: Optional[int] = None def set_exit_message(self, message: str) -> None: """Set a custom message to be used in the exit event.""" self.exit_message = message def __enter__(self) -> "ProcessingStatusContext": # Log start event without duration self.start_time = time.time() self.processing_status_start_id = add_processing_status( project_id=self.project_id, project_analysis_run_id=self.project_analysis_run_id, conversation_id=self.conversation_id, conversation_chunk_id=self.conversation_chunk_id, event=f"{self.event_prefix}.started", message=self.message if self.message else "", ) self.logger.info(f"{self.processing_status_start_id} {self.message}") return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], traceback: Any, ) -> Literal[False]: duration_ms = int((time.time() - self.start_time) * 1000) # if exception occurs, log FAILED event with error message and duration if exc_type: if self.exit_message: message = self.exit_message + f" -\n {str(exc_value)}" else: message = str(exc_value) self.processing_status_failed_id = add_processing_status( project_id=self.project_id, project_analysis_run_id=self.project_analysis_run_id, conversation_id=self.conversation_id, conversation_chunk_id=self.conversation_chunk_id, event=f"{self.event_prefix}.failed", message=message, duration_ms=duration_ms, parent_id=self.processing_status_start_id, ) self.logger.error( f"{self.processing_status_failed_id} {message} (duration: {duration_ms / 1000}s) (started: {self.processing_status_start_id})" ) # if no exception occurs, log COMPLETED event with message and duration else: if self.exit_message: message = self.exit_message else: message = self.message if self.message else "" self.processing_status_completed_id = add_processing_status( project_id=self.project_id, project_analysis_run_id=self.project_analysis_run_id, conversation_id=self.conversation_id, conversation_chunk_id=self.conversation_chunk_id, event=f"{self.event_prefix}.completed", message=message, duration_ms=duration_ms, parent_id=self.processing_status_start_id, ) self.logger.info( f"{self.processing_status_completed_id} {message} (duration: {duration_ms / 1000}s) (started: {self.processing_status_start_id})" ) return False
-
-
File: echo/server/dembrane/conversation_utils.py
-
Line range: 11-48
-
Snippet:
def collect_unfinished_conversations() -> List[str]:
# We want to collect:
# 1. All unfinished conversations, EXCEPT
# 2. Those that have at least one chunk in the last 5 minutesresponse = directus.get_items( "conversation", { "query": { "filter": { # Must be unfinished "is_finished": False, # Must not have a chunk in the last 3 minutes :) "chunks": { "_none": { "timestamp": { "_gte": (get_utc_timestamp() - timedelta(minutes=3)).isoformat() } } }, }, "fields": ["id"], "limit": -1, }, }, ) conversation_ids = [] for conversation in response: try: conversation_ids.append(conversation["id"]) except Exception as e: logger.error(f"Error collecting conversation {conversation['id']}: {e}") logger.info(f"Found {len(conversation_ids)} unfinished conversations") return conversation_ids -
Line range: 51-114
-
Snippet:
def collect_unfinished_audio_processing_conversations() -> List[str]:
unfinished_conversations = []# if they are already in process response = directus.get_items( "conversation", { "query": { "filter": { "project_id": { "is_enhanced_audio_processing_enabled": True, }, }, "fields": ["id", "is_audio_processing_finished"], }, }, ) for conversation in response: try: if not conversation["is_audio_processing_finished"]: unfinished_conversations.append(conversation["id"]) continue # and move to next conversation except Exception as e: logger.error(f"Error collecting conversation {conversation['id']}: {e}") continue # if claimed "is_audio_processing_finished" but not actually finished try: response = directus.get_items( "conversation_segment", { "query": { "filter": {"conversation_id": conversation["id"], "lightrag_flag": False}, "fields": ["id"], "limit": 1, }, }, ) # Only add if there is at least one unprocessed segment if response and len(response) > 0: unfinished_conversations.append(conversation["id"]) except Exception as e: logger.error(f"Error collecting conversation {conversation['id']}: {e}") try: total_segments = directus.get_items( "conversation_segment", {"query": {"filter": {"conversation_id": conversation["id"]}, "limit": 1}}, ) if len(total_segments) == 0: unfinished_conversations.append(conversation["id"]) directus.update_item( "conversation", conversation["id"], {"is_audio_processing_finished": False}, ) except Exception as e: logger.error(f"Error collecting conversation {conversation['id']}: {e}") return list(set(unfinished_conversations))
-
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py (1)
62-116: Fix chunk-to-segment tracking across conversations.Right now
chunk_id_2_segmentis reinitialized on every(project_id, conversation_id)iteration, yet the tracker update happens after the loop. The net effect: only the final conversation’s mappings are persisted; everything queued earlier is discarded, so those chunks stay “unprocessed” and will be re-run next cycle. Critical correctness hole.
Patch: accumulate into a list declared outside the loop (or update the tracker per conversation) so that every chunk processed in this pass gets recorded.- with BatchDirectusWriter(auto_flush_size=50) as batch_writer: - for project_id, conversation_id in zip_unique_audio: + chunk_id_2_segment_all: list[tuple[str, str]] = [] + with BatchDirectusWriter(auto_flush_size=50) as batch_writer: + for project_id, conversation_id in zip_unique_audio: ... - chunk_id_2_segment = [] + chunk_id_2_segment: list[tuple[str, str]] = [] ... - chunk_id_2_segment.extend(chunk_id_2_segment_temp) + chunk_id_2_segment.extend(chunk_id_2_segment_temp) + chunk_id_2_segment_all.extend(chunk_id_2_segment_temp) ... - chunk_id_2_segment_dict: dict[str, list[int]] = {} - for chunk_id, segment_id in chunk_id_2_segment: + chunk_id_2_segment_dict: dict[str, list[int]] = {} + for chunk_id, segment_id in chunk_id_2_segment_all: ...echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py (1)
48-218: Flush the batched Directus writes.We spin up
BatchDirectusWriter(auto_flush_size=20)and queue a ton of updates, but we never flush. No context manager, no explicitflush()=> every queued update dies at function exit. Result:conversation_segmentnever gets transcripts orlightrag_flagflips, so downstream ETL keeps reprocessing the same segments.
Wrap the whole load loop in awith BatchDirectusWriter(...) as batch_writer:(or callbatch_writer.flush()in afinally) so we actually push the queued writes before returning.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py(3 hunks)echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py(8 hunks)echo/server/dembrane/audio_lightrag/utils/batch_directus.py(1 hunks)echo/server/dembrane/audio_lightrag/utils/parallel_llm.py(1 hunks)echo/server/dembrane/audio_lightrag/utils/s3_cache.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
echo/server/dembrane/audio_lightrag/utils/s3_cache.py (1)
echo/server/dembrane/s3.py (1)
get_stream_from_s3(179-183)
echo/server/dembrane/audio_lightrag/utils/batch_directus.py (1)
echo/server/scripts/rag_etl_observer.py (1)
gather(285-302)
echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py (4)
echo/server/dembrane/audio_lightrag/utils/batch_directus.py (2)
BatchDirectusWriter(17-136)queue_create(54-63)echo/server/dembrane/audio_lightrag/utils/echo_utils.py (1)
renew_redis_lock(37-67)echo/server/dembrane/audio_lightrag/utils/audio_utils.py (2)
process_audio_files(170-290)create_directus_segment(298-319)echo/server/dembrane/audio_lightrag/utils/process_tracker.py (1)
update_value_for_chunk_id(38-39)
echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py (4)
echo/server/dembrane/audio_lightrag/utils/audio_utils.py (2)
wav_to_str(164-167)safe_audio_decode(67-121)echo/server/dembrane/audio_lightrag/utils/async_utils.py (1)
run_async_in_new_loop(16-77)echo/server/dembrane/audio_lightrag/utils/batch_directus.py (2)
BatchDirectusWriter(17-136)queue_update(43-52)echo/server/dembrane/audio_lightrag/utils/parallel_llm.py (1)
parallel_llm_calls(116-167)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: ci-check-server
| try: | ||
| stream = get_stream_from_s3(s3_uri) | ||
| data = stream.read() | ||
|
|
||
| # Check if adding this would exceed cache size | ||
| data_size = len(data) | ||
| if self.current_cache_bytes + data_size > self.max_cache_bytes: | ||
| logger.warning( | ||
| f"Cache full ({self.current_cache_bytes / 1024 / 1024:.1f}MB), " | ||
| f"cannot cache {s3_uri} ({data_size / 1024 / 1024:.1f}MB)" | ||
| ) | ||
| # Return stream without caching | ||
| return BytesIO(data) | ||
|
|
||
| # Add to cache | ||
| self.cache[s3_uri] = data | ||
| self.current_cache_bytes += data_size | ||
|
|
||
| logger.debug( | ||
| f"Cached {s3_uri} ({data_size / 1024:.1f}KB), " | ||
| f"total cache: {self.current_cache_bytes / 1024 / 1024:.1f}MB" | ||
| ) | ||
|
|
||
| return BytesIO(data) | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Failed to download/cache {s3_uri}: {e}") | ||
| return None |
There was a problem hiding this comment.
Seal the connection leak, keep the pool alive.
We pull a StreamingBody here, read it, and bail without ever closing/releasing. That leaves the underlying HTTP connection stuck in the boto3 pool, which is exactly the bottleneck we’re trying to crush—after a few cache misses the worker will start throwing MaxRetryError/connection pool exhaustion. Wrap the body in a try/finally and close it (or release_conn) once we’ve slurped the bytes.
- try:
- stream = get_stream_from_s3(s3_uri)
- data = stream.read()
+ stream = None
+ try:
+ stream = get_stream_from_s3(s3_uri)
+ data = stream.read()
...
- except Exception as e:
+ except Exception as e:
logger.error(f"Failed to download/cache {s3_uri}: {e}")
return None
+ finally:
+ if stream is not None:
+ try:
+ stream.close()
+ except Exception:
+ logger.debug("Failed to close S3 stream for %s", s3_uri, exc_info=True)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| try: | |
| stream = get_stream_from_s3(s3_uri) | |
| data = stream.read() | |
| # Check if adding this would exceed cache size | |
| data_size = len(data) | |
| if self.current_cache_bytes + data_size > self.max_cache_bytes: | |
| logger.warning( | |
| f"Cache full ({self.current_cache_bytes / 1024 / 1024:.1f}MB), " | |
| f"cannot cache {s3_uri} ({data_size / 1024 / 1024:.1f}MB)" | |
| ) | |
| # Return stream without caching | |
| return BytesIO(data) | |
| # Add to cache | |
| self.cache[s3_uri] = data | |
| self.current_cache_bytes += data_size | |
| logger.debug( | |
| f"Cached {s3_uri} ({data_size / 1024:.1f}KB), " | |
| f"total cache: {self.current_cache_bytes / 1024 / 1024:.1f}MB" | |
| ) | |
| return BytesIO(data) | |
| except Exception as e: | |
| logger.error(f"Failed to download/cache {s3_uri}: {e}") | |
| return None | |
| stream = None | |
| try: | |
| stream = get_stream_from_s3(s3_uri) | |
| data = stream.read() | |
| # Check if adding this would exceed cache size | |
| data_size = len(data) | |
| if self.current_cache_bytes + data_size > self.max_cache_bytes: | |
| logger.warning( | |
| f"Cache full ({self.current_cache_bytes / 1024 / 1024:.1f}MB), " | |
| f"cannot cache {s3_uri} ({data_size / 1024 / 1024:.1f}MB)" | |
| ) | |
| # Return stream without caching | |
| return BytesIO(data) | |
| # Add to cache | |
| self.cache[s3_uri] = data | |
| self.current_cache_bytes += data_size | |
| logger.debug( | |
| f"Cached {s3_uri} ({data_size / 1024:.1f}KB), " | |
| f"total cache: {self.current_cache_bytes / 1024 / 1024:.1f}MB" | |
| ) | |
| return BytesIO(data) | |
| except Exception as e: | |
| logger.error(f"Failed to download/cache {s3_uri}: {e}") | |
| return None | |
| finally: | |
| if stream is not None: | |
| try: | |
| stream.close() | |
| except Exception: | |
| logger.debug("Failed to close S3 stream for %s", s3_uri, exc_info=True) |
🤖 Prompt for AI Agents
In echo/server/dembrane/audio_lightrag/utils/s3_cache.py around lines 62 to 89,
the StreamingBody returned by get_stream_from_s3 is read but never closed,
leaking HTTP connections and exhausting the boto3 connection pool; wrap the read
in a try/finally that always closes/releases the stream (call stream.close() or
stream.release_conn() if available) after reading so the connection returns to
the pool, ensure the finally runs on both success and exception paths, and keep
returning a BytesIO(data) on success or None on failure.
f8e3a50 to
bfb9399
Compare
|
Test results: ----------------------------- live log collection ------------------------------
|
Summary
This branch removes the “download to temp file and re-upload” step from the hot audio pipeline. For the common case (≈99% of chunks: 30 s MP3/WebM from the portal) we now only HEAD-probe the size and dispatch transcription immediately—no download, no ffmpeg conversion, no S3 write-back. Only rare oversized blobs enter the streaming splitter, and the incremental merge path streams chunks from S3 when building the MP3 preview.
Details
the fly.
test_audio_utils_tempfileless.pycovering the fast path, split path, and mixed-format merges.Testing