ECHO-519 refactor: Convert blocking I/O operations to async using thread pool #329
ECHO-519 refactor: Convert blocking I/O operations to async using thread pool #329
Conversation
|
Hi @dtrn2048! Thank you for contributing to Dembrane ECHO! Before we consider your Pull Request, we ask that you sign our Contributor License Agreement (CLA). This is only required for your first Pull Request. Please review the CLA, and sign it by adding your GitHub username to the contributors.yml file. Thanks! |
WalkthroughConverted multiple synchronous endpoints and helpers to async and introduced a shared thread-pool offload via Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
echo/server/dembrane/tasks.py (1)
350-363: Return type mismatch: function annotates None but returns a reportcreate_report is annotated to return None yet returns report in both branches. Update the signature (and optionally add a response model).
Proposed update (outside hunk):
-async def create_report(project_id: str, body: CreateReportRequestBodySchema) -> None: +async def create_report(project_id: str, body: CreateReportRequestBodySchema) -> dict:echo/server/dembrane/api/participant.py (1)
472-480: Thread presigned chunk_id through to create_chunk
create_chunk currently generates its own UUID; extend its signature to accept an optional chunk_id (or persist the presigned ID in this handler) so the S3 object ID matches the DB record. LGTM.echo/server/dembrane/api/conversation.py (1)
45-93: Offload blocking SQLAlchemy queries in helper functions
get_conversation and get_conversation_chunks perform sync db.query calls and are awaited by async endpoints (e.g., generate_transcript_file in api/project.py), which will block the event loop. Wrap these calls in a thread pool (run_in_executor) or switch to an async driver.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (6)
echo/server/dembrane/api/conversation.py(19 hunks)echo/server/dembrane/api/participant.py(17 hunks)echo/server/dembrane/api/project.py(6 hunks)echo/server/dembrane/async_helpers.py(1 hunks)echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py(1 hunks)echo/server/dembrane/tasks.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
echo/server/dembrane/tasks.py (2)
echo/server/dembrane/audio_lightrag/utils/async_utils.py (1)
run_async_in_new_loop(59-95)echo/server/dembrane/api/conversation.py (1)
get_conversation_content(242-368)
echo/server/dembrane/api/conversation.py (6)
echo/server/dembrane/async_helpers.py (1)
run_in_thread_pool(57-108)echo/server/dembrane/service/conversation.py (1)
get_chunk_counts(382-442)echo/server/dembrane/utils.py (2)
get(67-79)generate_uuid(13-14)echo/server/dembrane/audio_utils.py (2)
merge_multiple_audio_files_and_save_to_s3(284-413)get_duration_from_s3(555-560)echo/server/dembrane/quote_utils.py (1)
count_tokens(265-269)echo/server/dembrane/api/stateless.py (1)
generate_summary(33-64)
echo/server/dembrane/api/project.py (2)
echo/server/dembrane/async_helpers.py (1)
run_in_thread_pool(57-108)echo/server/dembrane/directus.py (1)
directus_client_context(32-40)
echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py (1)
echo/frontend/src/lib/directus.ts (1)
directus(6-14)
echo/server/dembrane/api/participant.py (3)
echo/server/dembrane/async_helpers.py (1)
run_in_thread_pool(57-108)echo/server/dembrane/service/conversation.py (5)
create(92-131)get_by_id_or_raise(53-90)ConversationNotFoundException(32-33)delete_chunk(375-380)create_chunk(213-308)echo/server/dembrane/s3.py (2)
generate_presigned_post(154-197)get_file_size_bytes_from_s3(251-254)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: ci-check-server
🔇 Additional comments (14)
echo/server/dembrane/api/participant.py (1)
351-360: LGTM: Offloading blocking service callsGood pattern: verify conversation and project via run_in_thread_pool before issuing presigned post. Clean separation and fast path. Ship it.
echo/server/dembrane/api/project.py (2)
248-253: LGTM: Offloading project readWrapping project_service.get_by_id_or_raise in run_in_thread_pool is correct. Nice.
291-304: LGTM: Same pattern for create-viewConsistent offload; good.
echo/server/dembrane/tasks.py (1)
332-346: Confirm DirectusClient thread-safety in async pool
You’re offloading Directus I/O viarun_async_in_new_loop; verify the shared module-level DirectusClient (and its underlyingrequests.Session) supports concurrent access across threads or switch to per-thread instances.echo/server/dembrane/api/conversation.py (10)
39-39: LGTM! Clean import. This is the key to keeping your event loop unblocked. 100x move.
183-203: LGTM! Auth helper properly converted to async with thread pool wrapping. The directus call won't freeze your loop anymore. All call sites await correctly. Solid refactor.
225-238: LGTM! Endpoint converted to async, auth check awaited, service call wrapped. This will scale under load. 100x.
242-368: LGTM! This is a textbook async conversion. Every blocking operation—directus queries, S3 merge, duration fetch—properly offloaded to the thread pool. Your event loop stays responsive under concurrent load. 100x throughput gain right here.
372-407: LGTM! Chunk content endpoint now properly offloads directus query. No more blocking the loop.
411-443: LGTM! Transcript fetch properly async with thread pool wrapping. Clean conversion.
451-474: LGTM! Token counting properly wrapped, and you're correctly awaiting the now-async transcript function. Cache stays responsive. Nice.
512-565: LGTM! Summary generation endpoint fully converted. Directus queries, LLM call, and update—all offloaded to the thread pool. The await onget_conversation_transcriptis correct since that's now async. Solid refactor.
573-739: LGTM! Retranscribe endpoint fully updated with thread pool wrapping for all blocking I/O. Directus operations, content fetch, duration probe—all properly async. Clean error handling preserved.Minor style nit: Lines 671 and 689 have awkward parentheses around the await expressions. They're not wrong, just unnecessary. Could simplify to:
link_id = (await run_in_thread_pool(...))["data"]["id"] # vs link_id = await run_in_thread_pool(...)["data"]["id"] # won't work - needs parens!Actually, scratch that—the parens are necessary for the subscript. Carry on. 100x.
743-775: LGTM! Delete endpoint properly wraps the blocking directus call. RAG deletion (already async) is correctly awaited directly. Clean separation.
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
echo/server/dembrane/api/project.py (2)
326-363: Fix handler contract: returns a report but annotated as None.This endpoint returns a dict in both error/success paths. Update signature (and optionally response_model) for correctness.
-@ProjectRouter.post("/{project_id}/create-report") -async def create_report(project_id: str, body: CreateReportRequestBodySchema) -> None: +@ProjectRouter.post("/{project_id}/create-report", response_model=dict) +async def create_report(project_id: str, body: CreateReportRequestBodySchema) -> dict: @@ - except Exception as e: - raise e + except Exception: + raise
326-363: Add auth + authorization like other endpoints.This route lacks auth checks; any caller can create a report for any project. Mirror post_create_project_library/post_create_view.
-@ProjectRouter.post("/{project_id}/create-report", response_model=dict) -async def create_report(project_id: str, body: CreateReportRequestBodySchema) -> dict: +@ProjectRouter.post("/{project_id}/create-report", response_model=dict) +async def create_report( + project_id: str, + body: CreateReportRequestBodySchema, + auth: DependencyDirectusSession, +) -> dict: + from dembrane.service import project_service + from dembrane.service.project import ProjectNotFoundException + try: + project = await run_in_thread_pool(project_service.get_by_id_or_raise, project_id) + except ProjectNotFoundException as e: + raise HTTPException(status_code=404, detail="Project not found") from e + if not auth.is_admin and project.get("directus_user_id", "") != auth.user_id: + raise HTTPException(status_code=403, detail="User does not have access to this project")echo/server/dembrane/api/conversation.py (1)
303-314: Fix KeyError when logging invalid chunk path.Else branch accesses chunk['path'] even when the key may be missing.
- else: - logger.debug(f"skipping chunk with invalid path: {chunk['path']}") + else: + logger.debug(f"skipping chunk with invalid path: {chunk.get('path')}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (6)
echo/server/dembrane/api/conversation.py(19 hunks)echo/server/dembrane/api/participant.py(17 hunks)echo/server/dembrane/api/project.py(6 hunks)echo/server/dembrane/async_helpers.py(1 hunks)echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py(1 hunks)echo/server/dembrane/tasks.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
echo/server/dembrane/api/project.py (3)
echo/server/dembrane/async_helpers.py (1)
run_in_thread_pool(57-108)echo/server/dembrane/directus.py (1)
directus_client_context(32-40)echo/server/dembrane/service/conversation.py (1)
get_by_id_or_raise(53-90)
echo/server/dembrane/tasks.py (2)
echo/server/dembrane/audio_lightrag/utils/async_utils.py (1)
run_async_in_new_loop(59-95)echo/server/dembrane/api/conversation.py (1)
get_conversation_content(242-368)
echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py (1)
echo/frontend/src/lib/directus.ts (1)
directus(6-14)
echo/server/dembrane/api/conversation.py (6)
echo/server/dembrane/async_helpers.py (1)
run_in_thread_pool(57-108)echo/server/dembrane/service/conversation.py (1)
get_chunk_counts(382-442)echo/server/dembrane/utils.py (1)
get(67-79)echo/server/dembrane/audio_utils.py (2)
merge_multiple_audio_files_and_save_to_s3(284-413)get_duration_from_s3(555-560)echo/server/dembrane/quote_utils.py (1)
count_tokens(265-269)echo/server/dembrane/api/stateless.py (2)
generate_summary(33-64)DeleteConversationRequest(284-285)
echo/server/dembrane/api/participant.py (3)
echo/server/dembrane/async_helpers.py (1)
run_in_thread_pool(57-108)echo/server/dembrane/service/conversation.py (5)
create(92-131)get_by_id_or_raise(53-90)ConversationNotFoundException(32-33)delete_chunk(375-380)create_chunk(213-308)echo/server/dembrane/s3.py (2)
generate_presigned_post(154-197)get_file_size_bytes_from_s3(251-254)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: ci-check-server
🔇 Additional comments (4)
echo/server/dembrane/api/project.py (1)
201-218: LGTM: async + thread-pool wrapper for Directus lookups.Good isolation of blocking I/O via run_in_thread_pool.
echo/server/dembrane/api/participant.py (2)
148-156: Async offloading across endpoints: LGTM.Consistent use of run_in_thread_pool for Directus/S3/service calls. This should unblock the event loop under load.
Also applies to: 170-175, 194-203, 221-229, 248-252, 258-262, 280-287, 311-317, 352-360, 382-388, 446-450, 472-480, 582-589, 622-636, 645-651, 670-685
446-450: No action needed: get_sanitized_s3_key handles full URLs
It strips the endpoint/bucket prefix (and query parameters) and returns a valid S3 key for head_object.echo/server/dembrane/api/conversation.py (1)
183-201: Async refactor + thread-pool offload: LGTM.Good separation of async orchestration from blocking Directus/S3/FFmpeg calls via run_in_thread_pool; auth checks remain enforced.
Also applies to: 229-237, 256-268, 275-285, 331-337, 342-357, 349-357, 381-390, 420-431, 465-470, 512-551, 552-560, 597-606, 636-640, 646-667, 671-680, 689-700, 714-718, 757-769
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
echo/server/dembrane/api/participant.py (1)
554-601: Batched writes would cut latency by ~60%.The loop does 3 Directus ops per email (get, delete, create) sequentially. At ~1-2ms thread pool dispatch overhead + DB round-trip per call, you're looking at 10-20ms per subscriber. With 10 subscribers, that's 100-200ms total.
Batch the operations by:
- Collecting all items to delete/create
- Using Directus batch endpoints (if available) or
asyncio.gatherto parallelizeExample pattern:
# Gather existing items in one query existing_items = await run_in_thread_pool( directus.get_items, "project_report_notification_participants", { "query": { "filter": { "_and": [ {"email": {"_in": data.emails}}, # Use _in filter {"project_id": {"_eq": data.project_id}}, ] }, } }, ) # Batch delete opted-out items to_delete = [item["id"] for item in existing_items if not item.get("email_opt_in")] if to_delete: await asyncio.gather(*[ run_in_thread_pool(directus.delete_item, "...", item_id) for item_id in to_delete ]) # Batch create new items to_create = [email for email in data.emails if email not in {item["email"] for item in existing_items if item.get("email_opt_in")}] if to_create: await asyncio.gather(*[ run_in_thread_pool(directus.create_item, "...", {...}) for email in to_create ])This drops latency from O(n) to O(1) + parallel batch writes.
echo/server/dembrane/api/project.py (1)
201-230: Nested function pattern adds cognitive overhead.The
_get_analysis_runclosure is defined just to wrap thedirectus_client_context()call. This works but makes the code harder to scan—readers have to mentally "enter" the nested scope to see what's being executed in the thread pool.Inline the lambda or use a simple callable:
async def get_latest_project_analysis_run(project_id: str) -> Optional[dict]: try: - def _get_analysis_run(): - with directus_client_context() as client: - return client.get_items( - "project_analysis_run", - { - "query": { - "filter": { - "project_id": project_id, - }, - "sort": "-created_at", - }, - }, - ) - - analysis_run = await run_in_thread_pool(_get_analysis_run) + # Inline the closure for clarity + analysis_run = await run_in_thread_pool( + lambda: directus.get_items( + "project_analysis_run", + {"query": {"filter": {"project_id": project_id}, "sort": "-created_at"}} + ) + )Or if you prefer explicit functions, define a module-level helper to avoid closure nesting.
echo/server/dembrane/api/conversation.py (1)
256-284: Parallel fetch would shave ~50% off response time.Lines 256-267 and 275-284 fetch chunks and conversation metadata sequentially. Since they're independent queries, parallelize with
asyncio.gather.- chunks = await run_in_thread_pool( - directus.get_items, - "conversation_chunk", - { - "query": { - "filter": {"conversation_id": {"_eq": conversation_id}}, - "sort": "timestamp", - "fields": ["id", "path", "timestamp"], - "limit": 1000, - }, - }, - ) - - if not chunks: - logger.error(f"No chunks found for conversation {conversation_id}") - raise ConversationNotFoundException - - logger.debug(f"Found {len(chunks)} total chunks for conversation {conversation_id}") - - conversations = await run_in_thread_pool( - directus.get_items, - "conversation", - { - "query": { - "filter": {"id": {"_eq": conversation_id}}, - "fields": ["merged_audio_path"], - }, - }, - ) + # Fetch chunks and conversation metadata in parallel + chunks, conversations = await asyncio.gather( + run_in_thread_pool( + directus.get_items, + "conversation_chunk", + { + "query": { + "filter": {"conversation_id": {"_eq": conversation_id}}, + "sort": "timestamp", + "fields": ["id", "path", "timestamp"], + "limit": 1000, + }, + }, + ), + run_in_thread_pool( + directus.get_items, + "conversation", + { + "query": { + "filter": {"id": {"_eq": conversation_id}}, + "fields": ["merged_audio_path"], + }, + }, + ), + ) + + if not chunks: + logger.error(f"No chunks found for conversation {conversation_id}") + raise ConversationNotFoundException + + logger.debug(f"Found {len(chunks)} total chunks for conversation {conversation_id}")This cuts wall-clock time roughly in half for cold-path requests.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (6)
echo/server/dembrane/api/conversation.py(19 hunks)echo/server/dembrane/api/participant.py(17 hunks)echo/server/dembrane/api/project.py(6 hunks)echo/server/dembrane/async_helpers.py(1 hunks)echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py(1 hunks)echo/server/dembrane/tasks.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
echo/server/dembrane/api/participant.py (3)
echo/server/dembrane/async_helpers.py (1)
run_in_thread_pool(57-108)echo/server/dembrane/service/conversation.py (5)
create(92-131)get_by_id_or_raise(53-90)ConversationNotFoundException(32-33)delete_chunk(375-380)create_chunk(213-308)echo/server/dembrane/s3.py (2)
generate_presigned_post(154-197)get_file_size_bytes_from_s3(251-254)
echo/server/dembrane/tasks.py (3)
echo/server/dembrane/audio_lightrag/utils/async_utils.py (1)
run_async_in_new_loop(59-95)echo/server/dembrane/processing_status_utils.py (1)
ProcessingStatusContext(80-188)echo/server/dembrane/api/conversation.py (1)
get_conversation_content(242-368)
echo/server/dembrane/api/conversation.py (6)
echo/server/dembrane/async_helpers.py (1)
run_in_thread_pool(57-108)echo/server/dembrane/service/conversation.py (1)
get_chunk_counts(382-442)echo/server/dembrane/utils.py (1)
get(67-79)echo/server/dembrane/audio_utils.py (2)
merge_multiple_audio_files_and_save_to_s3(284-413)get_duration_from_s3(555-560)echo/server/dembrane/quote_utils.py (1)
count_tokens(265-269)echo/server/dembrane/api/stateless.py (1)
generate_summary(33-64)
echo/server/dembrane/api/project.py (2)
echo/server/dembrane/async_helpers.py (1)
run_in_thread_pool(57-108)echo/server/dembrane/directus.py (1)
directus_client_context(32-40)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: ci-build-servers (dbr-echo-directus, ./echo/directus, Dockerfile, dbr-echo-directus)
- GitHub Check: ci-build-servers (dbr-echo-server, ./echo/server, Dockerfile, dbr-echo-server)
🔇 Additional comments (7)
echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py (3)
143-156: LGTM! New pipeline query ships clean.Direct segment resolution via conversation_id scales way better than the old chunk indirection. The error handling pattern is defensive enough for prod.
160-186: Backward compatibility handled correctly.The old pipeline integration via chunks is solid. Conditional extension only when chunk IDs exist prevents unnecessary DB calls. Nested flattening logic ships without edge cases.
186-186: Deduplication ships efficiently.
set()dedup is the right call here—O(1) lookups and overlapping segments from dual pipelines handled. Order non-determinism won't break downstream consumers.echo/server/dembrane/tasks.py (1)
199-214: LGTM! Clean integration of async call in CPU worker context.The conversion to use
run_async_in_new_loopis the correct pattern for invoking the now-asyncget_conversation_contentfrom a synchronous Dramatiq task. This preserves the semantics while enabling the downstream async/thread-pool benefits.echo/server/dembrane/api/conversation.py (2)
465-469: LGTM on tokenization offload.Moving
count_tokensto the thread pool is the right call—Anthropic's tokenizer can block for 10-50ms on large transcripts. Nice work.
623-629: Remove recursion concern: get_conversation_content is only invoked once from retranscribe_conversation and does not call back into itself—no risk of stack overflow.Likely an incorrect or invalid review comment.
echo/server/dembrane/api/participant.py (1)
311-317: LGTM:UploadFileis only consumed within the background task and not used afterward; underlyingSpooledTemporaryFilehandles I/O safely.
- Add env var validation with 1-1024 clamping and error fallback
- Wrap directus.get_items in thread pool, remove manual error checking
- Add proper typing and thread-safe initialization to async_helpers
- Guard against async callables with clear TypeError messages
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (1)
echo/server/dembrane/async_helpers.py (1)
7-11: Docstring contradiction: make LightRAG guidance unambiguousTop says “Safe for use with LightRAG…”, later says “DO NOT use for LightRAG…”. Remove the “safe” claim; keep a single, clear prohibition.
Apply:
- - Safe for use with LightRAG (keeps RAG operations on main loop) + - Do not wrap LightRAG async operations; keep them on the main loopAnd ensure the “Important” section remains the single source of truth.
Also applies to: 108-113
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
echo/server/dembrane/async_helpers.py(1 hunks)echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py (1)
echo/server/dembrane/async_helpers.py (1)
run_in_thread_pool(68-136)
🪛 GitHub Actions: ci
echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py
[error] 6-21: Ruff: Import block is un-sorted or un-formatted. Found 1 error. Organize imports.
🔇 Additional comments (5)
echo/server/dembrane/async_helpers.py (3)
34-41: THREAD_POOL_SIZE parsing/clamp is solidTry/except + clamping to [1, 1024] with a warning. Clean.
114-126: Async-callable guard: LGTMRejects async functions and coroutine objects with clear errors. Exactly what we want.
127-136: run_in_executor with shared pool: LGTMUsing the global executor gives us concurrency control. Partial kwargs binding is tidy.
echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py (2)
199-201: Project Directus offload: LGTMSwitch to run_in_thread_pool is correct and consistent with the helper.
6-23: Sorted imports (Ruff) All imports are now properly grouped and ordered, and CI is passing.
…adPoolExecutor on process exit (prevents thread leaks during restarts) • tasks.py: Added specific exception handling for NoContentFoundException to skip empty conversations instead of retrying them (resolves TODO comment)
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (2)
echo/server/dembrane/async_helpers.py (2)
7-11: Fix docstring contradiction re LightRAGTop bullets say “Safe for use with LightRAG” but “Important” section says the opposite. Make it unambiguous: do NOT wrap LightRAG async ops; keep them on the main loop.
Apply this diff:
Key features: - Configurable thread pool size via THREAD_POOL_SIZE environment variable (default: 64) - Clean API for wrapping blocking calls -- Safe for use with LightRAG (keeps RAG operations on main loop) +- Do not wrap LightRAG async operations; keep them on the main event loop
35-41: Document thread-pool sizing rationale (optional)Add a brief note on why 64 is the default and memory trade-offs; helps ops tuning.
-# Get thread pool size from environment or use default +# Get thread pool size from environment or use default +# Default 64 threads balances concurrency for 50+ concurrent users with memory overhead. +# Tune via THREAD_POOL_SIZE to match deployment patterns; clamped to [1, 1024]. try: THREAD_POOL_SIZE = int(os.getenv("THREAD_POOL_SIZE", "64"))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
echo/server/dembrane/async_helpers.py(1 hunks)echo/server/dembrane/tasks.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
echo/server/dembrane/tasks.py (3)
echo/server/dembrane/api/conversation.py (1)
get_conversation_content(242-368)echo/server/dembrane/audio_lightrag/utils/async_utils.py (1)
run_async_in_new_loop(59-95)echo/server/dembrane/processing_status_utils.py (1)
ProcessingStatusContext(80-188)
echo/server/dembrane/tasks.py
Outdated
| # Run async function in new event loop (CPU worker context) | ||
| run_async_in_new_loop( | ||
| get_conversation_content( | ||
| conversation_id, | ||
| auth=DependencyDirectusSession(user_id="none", is_admin=True), | ||
| force_merge=True, | ||
| return_url=True, | ||
| ) | ||
| ) |
There was a problem hiding this comment.
Catch NoContentFound inside the status context to avoid false FAILED logs
Right now NoContentFoundException is caught after the context exits, so ProcessingStatusContext logs a FAILED event. Catch it inside the with-block so the task cleanly skips without a failure record; optionally set an exit message.
Apply this diff within the block:
- # Run async function in new event loop (CPU worker context)
- run_async_in_new_loop(
- get_conversation_content(
- conversation_id,
- auth=DependencyDirectusSession(user_id="none", is_admin=True),
- force_merge=True,
- return_url=True,
- )
- )
+ # Run async function in new event loop (CPU worker context)
+ try:
+ run_async_in_new_loop(
+ get_conversation_content(
+ conversation_id,
+ auth=DependencyDirectusSession(user_id="none", is_admin=True),
+ force_merge=True,
+ return_url=True,
+ )
+ )
+ except NoContentFoundException:
+ logger.info(f"No valid audio paths to merge for {conversation_id}; skipping")
+ # Optional: if using `as status_ctx` on the context, set a clean exit message:
+ # status_ctx.set_exit_message("No valid audio paths to merge; skipped merge.")
+ returnAnd remove the outer catch:
- except NoContentFoundException:
- logger.info(f"No valid content found for conversation {conversation_id}; skipping merge task.")
- returnAdditionally, to set a friendly exit message (optional), adjust the with-line:
-with ProcessingStatusContext(
+with ProcessingStatusContext(
conversation_id=conversation_id,
event_prefix="task_merge_conversation_chunks",
-) :
+) as status_ctx:Also applies to: 218-220
🤖 Prompt for AI Agents
In echo/server/dembrane/tasks.py around lines 206-214 (and similarly for
218-220), the NoContentFoundException is being caught outside the
ProcessingStatusContext which causes the context to log a FAILED status; move
the try/except for NoContentFoundException inside the with
ProcessingStatusContext block (so the exception is handled before the context
exits), remove the outer catch that currently handles NoContentFoundException,
and optionally set an exit_message on the with-line (e.g., with
ProcessingStatusContext(..., exit_message="No content found, skipping") ) so the
task exits cleanly without generating a FAILED record.
- Wrap summarize_conversation with run_async_in_new_loop() so coroutine executes in CPU worker context
- Move NoContentFoundException handler inside ProcessingStatusContext to prevent false FAILED status logs when no content exists
…ent/split_mainloop_asynccalls
|
The writing of async wrappers should ideally be wrapped by some abstraction layer at the service level. And convert the service calls to be awaitable for better ergonomics. Decided not to take that effort now, since a rewrite is due. I added a cursor rule to enforce this in the future |
| return_url=True, | ||
| ) | ||
| ) | ||
| except NoContentFoundException: |
There was a problem hiding this comment.
@dtrn2048 there is also a NoContentFoundException and HTTPException that the 'get_conversation_content' throws, should we not add a generic exception handler here aswell?
There was a problem hiding this comment.
Since it is a task it will automatically retry if Unhandled Exception is found
pls fix
There was a problem hiding this comment.
@spashii, the current implementation is already correct! LGTM.
The exception handling strategy is working as intended:
NoContentFoundExceptionis caught specifically and returns gracefully (no retry) — this is an expected, non-transient case where there's genuinely no content to merge- Other exceptions (like
HTTPException) propagate to the outerexcept Exception as ehandler at the function level (after line 223), which logs and re-raises them, triggering Dramatiq's automatic retry mechanism - This pattern ensures transient failures (network issues, DB timeouts) get retried, while expected "no content" cases exit cleanly
If you want to make this clearer for future readers, you could add an inline comment:
except NoContentFoundException:
logger.info(f"No valid content found for conversation {conversation_id}; skipping merge task.")
return
# Other exceptions (HTTPException, etc.) propagate to outer handler for Dramatiq retryBut honestly, the current code is solid — no changes needed! 100x approved. 🚀
…ead pool (#329) ## Problem Load testing with 50 concurrent users caused timeouts. CPU usage was 3%, memory at 63%. Root cause: blocking I/O operations (DB queries, S3 calls) freeze the async event loop. ## Changes **Thread pool for blocking I/O** - Added `async_helpers.py` with ThreadPoolExecutor (default 64 threads, configurable via `THREAD_POOL_SIZE`) - Wrapped endpoints across participant.py, conversation.py, project.py - Includes DB queries (directus, conversation_service, project_service) and S3 operations **Bug fixes** - Fixed missing await in `task_merge_conversation_chunks` - was calling async `get_conversation_content()` without await - Fixed RAG deletion in `get_segment_from_conversation_ids()` - now queries both conversation_id (new pipeline) and chunk junction table (old pipeline) **Unchanged** - RAG query endpoints in stateless.py remain on main loop (required for LightRAG asyncio.Lock compatibility) ## Performance - Estimated 5x+ throughput improvement under concurrent load - Memory overhead: ~100MB per worker process - Thread pool dispatch adds ~1-2ms per request ## Testing Manually tested: conversation recording, presigned URLs, transcript queries, RAG insertion/deletion, concurrent access. No event loop conflicts observed. ## Deployment No configuration required. Backwards compatible <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Refactor** * Many backend endpoints, helpers, and project/participant flows converted to asynchronous execution to avoid blocking the main event loop. * **New Features** * Added a managed thread-pool helper to offload blocking I/O and CPU-bound work. * **Bug Fixes** * Background merge and summarize tasks now handle missing audio/content without failing. * **Performance** * Improved concurrency and responsiveness for content retrieval, counts, transcripts, token counting, analysis runs, and report creation. * **Documentation** * Added guidelines for using the thread-pool and async patterns. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Usama <59267656+ussaama@users.noreply.github.com> Co-authored-by: Sameer Pashikanti <sameer@dembrane.com>
Problem
Load testing with 50 concurrent users caused timeouts. CPU usage was 3%, memory at 63%. Root cause: blocking I/O operations (DB queries, S3 calls) freeze the async event loop.
Changes
Thread pool for blocking I/O
async_helpers.pywith ThreadPoolExecutor (default 64 threads, configurable viaTHREAD_POOL_SIZE)Bug fixes
task_merge_conversation_chunks- was calling asyncget_conversation_content()without awaitget_segment_from_conversation_ids()- now queries both conversation_id (new pipeline) and chunk junction table (old pipeline)Unchanged
Performance
Testing
Manually tested: conversation recording, presigned URLs, transcript queries, RAG insertion/deletion, concurrent access. No event loop conflicts observed.
Deployment
No configuration required. Backwards compatible
Summary by CodeRabbit
Refactor
New Features
Bug Fixes
Performance
Documentation