Skip to content

ECHO-164 cron job to collect unfinished conversations#132

Merged
spashii merged 5 commits intomainfrom
feature/echo-164-cron-job-to-collect-unfinished-conversations
May 9, 2025
Merged

ECHO-164 cron job to collect unfinished conversations#132
spashii merged 5 commits intomainfrom
feature/echo-164-cron-job-to-collect-unfinished-conversations

Conversation

@spashii
Copy link
Copy Markdown
Member

@spashii spashii commented May 9, 2025

Summary by CodeRabbit

  • New Features

    • Added new fields and metadata to support enhanced audio processing, conversation completion status, and vector embeddings in conversations and projects.
    • Introduced a scheduler for periodic background processing and maintenance tasks.
    • Migrated background task processing from Celery to Dramatiq for improved reliability and scalability.
    • Enhanced visual indicators in the conversation interface to reflect processing states more accurately.
  • Bug Fixes

    • Improved robustness of quote sampling with better error handling and fallback mechanisms.
  • Refactor

    • Reorganized and streamlined GraphQL schemas and backend logic for consistency and maintainability.
    • Updated scripts and dependencies to support the new task processing framework.
  • Chores

    • Updated development and deployment scripts, configuration files, and ignored files for improved workflow and environment setup.
  • Tests

    • Added and enhanced automated tests for audio utilities, conversation utilities, and quote sampling to ensure correctness and reliability.
  • Documentation

    • Improved inline documentation and configuration comments for better clarity and maintainability.

@linear
Copy link
Copy Markdown

linear bot commented May 9, 2025

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented May 9, 2025

Walkthrough

This PR migrates backend task processing from Celery to Dramatiq, introduces a scheduler for periodic unfinished conversation collection, and updates related shell scripts and dependencies. It adds new fields and schema changes to Directus and GraphQL, enhances frontend conversation indicators, and expands test coverage for conversation and quote utilities. Several obsolete files are deleted.

Changes

Files/Groups Change Summary
.github/workflows/*.yml Added merge_group event triggers for workflows on main branch.
echo/.gitignore Now ignores celerybeat-schedule.db.
echo/.vscode/* Updated launch configs, added new debug options, and new terminal groups for scheduler and workers.
echo/directus/directus-sync.config.js Removed exclude property for collections.
echo/directus/sync/snapshot/fields/* Added fields: centroid_embedding (aspect), is_finished (conversation), is_enhanced_audio_processing_enabled (project); updated nullability and metadata for others.
echo/directus/sync/specs/*.graphql Reorganized, reordered, and added fields/operations in GraphQL schemas for new and existing entities.
echo/frontend/src/components/conversation/ConversationAccordion.tsx Enhanced badges/indicators for conversation processing states.
echo/frontend/src/lib/typesDirectus.d.ts Added is_finished: boolean to Conversation type.
echo/server/dembrane/api/*.py Updated task invocation from Celery to Dramatiq (.delay()/.apply_async().send()), minor formatting.
echo/server/dembrane/audio_lightrag/utils/litellm_utils.py
echo/server/dembrane/chat_utils.py
echo/server/dembrane/database.py
echo/server/dembrane/directus.py
echo/server/dembrane/s3.py
echo/server/dembrane/ner.py
echo/server/dembrane/audio_utils.py
Minor formatting, import cleanup, style consistency.
echo/server/dembrane/config.py Added optional colorized logging with colorlog.
echo/server/dembrane/conversation_utils.py New utility: collect_unfinished_conversations().
echo/server/dembrane/main.py Updated OpenAPI version, minor formatting.
echo/server/dembrane/scheduler.py New APScheduler-based scheduler for periodic unfinished conversation collection.
echo/server/dembrane/sentry.py Added Dramatiq integration to Sentry.
echo/server/dembrane/quote_utils.py Improved vector similarity search and fallback logic for quote sampling.
echo/server/dembrane/tasks.py Migrated all tasks to Dramatiq actors, restructured workflows, added lz4 compression, Redis backend, and error handling.
echo/server/dembrane/chains.py
echo/server/dembrane/process_resource.py
echo/server/dembrane/tasks_config.py
echo/server/prod-worker-liveness.py
echo/server/prod-worker-readiness.py
Deleted obsolete Celery-related modules and resource processing logic.
echo/server/pyproject.toml Removed Celery, added Dramatiq, APScheduler, lz4, sentry-dramatiq, colorlog, and related deps.
echo/server/prod-worker.sh
echo/server/prod-worker-cpu.sh
echo/server/run-worker.sh
Switched worker scripts from Celery to Dramatiq (with gevent), updated concurrency and queue configs.
echo/server/prod-scheduler.sh
echo/server/run-scheduler.sh
echo/server/run-worker-cpu.sh
Added scripts for running scheduler and cpu worker with Dramatiq.
echo/server/tests/common.py Test helpers accept additional_data for entity creation.
echo/server/tests/test_audio_utils.py Expanded test coverage for audio probing, merging, and format handling.
echo/server/tests/test_conversation_utils.py New tests for chunk creation and unfinished conversation collection logic.
echo/server/tests/test_quote_utils.py New tests for quote sampling, vector search, and empty project handling.

Sequence Diagram(s)

sequenceDiagram
    participant Scheduler
    participant Tasks
    participant Directus
    participant Dramatiq
    participant Worker

    Scheduler->>Dramatiq: Triggers periodic job (every 15 min)
    Dramatiq->>Tasks: task_collect_and_finish_unfinished_conversations
    Tasks->>Directus: Query for unfinished conversations
    Directus-->>Tasks: List of unfinished conversation IDs
    loop For each conversation
        Tasks->>Dramatiq: Enqueue task_finish_conversation_hook(conversation_id)
        Dramatiq->>Worker: Process finish hook
    end
Loading
sequenceDiagram
    participant API
    participant Dramatiq
    participant Worker

    API->>Dramatiq: task_process_conversation_chunk.send(chunk_id, run_finish_hook)
    Dramatiq->>Worker: Process chunk
    alt run_finish_hook is True
        Worker->>Dramatiq: Enqueue finish hook workflow
        Dramatiq->>Worker: Summarize, merge, ETL, finalize
    end
Loading

Assessment against linked issues

Objective Addressed Explanation
Implement a CRON/scheduler job to collect "unfinished" conversations (ECHO-164)
Ensure backend can identify unfinished conversations based on chunk timestamps and project settings (ECHO-164)
Provide periodic triggering and task orchestration for finishing conversations (ECHO-164)

Possibly related PRs

Suggested reviewers

  • ussaama

Poem

🚀 Tasks now fly on Dramatiq wings,
Scheduler hums and the worker bell rings.
Chunks and conversations, unfinished no more,
With lz4 and colorlogs, the logs we adore.
Celery’s retired, the scripts are all new—
100x engineer magic, shipping right through!
LGTM.

Tip

⚡️ Faster reviews with caching
  • CodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 16th. To opt out, configure Review - Disable Cache at either the organization or repository level. If you prefer to disable all data retention across your organization, simply turn off the Data Retention setting under your Organization Settings.

Enjoy the performance boost—your workflow just got faster.

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@spashii spashii merged commit 4db30b3 into main May 9, 2025
6 of 7 checks passed
@spashii spashii deleted the feature/echo-164-cron-job-to-collect-unfinished-conversations branch May 9, 2025 08:12
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

🧹 Nitpick comments (17)
echo/server/prod-scheduler.sh (1)

1-2: LGTM! Simple and effective scheduler script.

Clean implementation that launches the Python scheduler module. The minimalist approach is exactly what's needed here - just a thin wrapper to execute the module. Perfect for container environments.

You might want to consider adding an exec to the command to ensure proper signal passing:

-python -m dembrane.scheduler
+exec python -m dembrane.scheduler

This ensures signals (like SIGTERM) get properly passed to the Python process instead of being caught by the shell.

echo/server/dembrane/database.py (1)

283-284: Reformatted commented out JSONB columns.

Code style improvement by removing trailing spaces from commented out code. Keeps things clean even in inactive code sections.

echo/.vscode/settings.json (1)

77-78: Duplicate pylint config - drop that redundancy.

Seems like you've got "python.linting.pylintEnabled": false twice (lines 77-78). Not breaking anything, but let's keep our config DRY.

  "python.linting.enabled": true,
  "python.linting.pylintEnabled": false,
- "python.linting.pylintEnabled": false,
  "python.linting.mypyEnabled": true,
🧰 Tools
🪛 Biome (1.9.4)

[error] 77-77: The key python.linting.pylintEnabled was already declared.

This where a duplicated key was declared again.

If a key is defined multiple times, only the last definition takes effect. Previous definitions are ignored.

(lint/suspicious/noDuplicateObjectKeys)

echo/server/prod-worker-cpu.sh (1)

1-2: Evaluate shebang and robustness
You’ve switched to /bin/bash but aren’t yet leveraging Bash‐specific features. If portability isn’t a concern, consider adding at the top:

set -euo pipefail
exec dramatiq --queues cpu --processes 2 --threads 4 dembrane.tasks

to ensure failures are caught early and signals propagate correctly.

echo/server/prod-worker.sh (1)

1-2: Adopt robust shell scripting practices
Since you’re on /bin/bash, consider:

set -euo pipefail
exec dramatiq-gevent --queues network --processes 2 --threads 50 dembrane.tasks

This ensures errors halt execution and signals are forwarded properly.

echo/server/dembrane/conversation_utils.py (1)

48-51: Remove unnecessary pass statement

The pass statement on line 51 is redundant since there's already an executable statement in the block.

 if __name__ == "__main__":
     print(collect_unfinished_conversations())
-    pass
echo/.vscode/launch.json (1)

15-25: Duplicated conversation utils debug configurations.

You have two identical configurations for debugging conversation_utils.py, one using "debugpy" and one using "python". This redundancy could cause confusion.

- {
-   "name": "Python: Conversation Utils Debug",
-   "type": "python",
-   "request": "launch",
-   "program": "dembrane/conversation_utils.py",
-   "cwd": "${workspaceFolder}/server",
-   "env": {
-     "PYTHONPATH": "${workspaceFolder}/server"
-   },
-   "console": "integratedTerminal"
- }

Standardize on debugpy which is the recommended VS Code Python debugger.

Also applies to: 70-80

echo/server/dembrane/quote_utils.py (2)

341-345: Vector literal casting looks solid – just sanity-check dimensionality.

Love the explicit literal(..., type_=Vector(EMBEDDING_DIM)); that prevents the infamous “operator does not exist for type jsonb -> vector” error. One micro-gotcha: if random_vectors is ever generated with a shape that deviates from EMBEDDING_DIM we’ll silently pass a malformed vector to Postgres and the query will error at runtime (caught later, but still). A quick assert right after vector_as_list would make this bullet-proof.

+assert len(vector_as_list) == EMBEDDING_DIM, "Vector dimensionality mismatch"

346-349: Early-exit when there are literally no quotes saves cycles.

Nit: You already guard for not all_quotes, but the guard is inside the for vector in random_vectors loop; we’ll still spin that loop num_random_vectors times doing nothing. Shift the guard above the loop to bail immediately and shave a few ms.

-if not all_quotes:
-    continue
+if not all_quotes:
+    break   # nothing to compare against, skip the rest of the vectors
echo/server/dembrane/audio_lightrag/utils/litellm_utils.py (1)

120-133: Embedding fetch loop still serial – batching would pay off.

Every call to embedding() hits the provider; for large lists this is N requests. LiteLLM supports batch inputs for Azure/OpenAI. Collect texts in chunks (≤ 16k tokens) and send one request per chunk – 10-20× speedup and cost reduction.

No change required for this PR, just flagging for future perf work.

echo/frontend/src/components/conversation/ConversationAccordion.tsx (2)

370-381: Auto-select logic clashes with manual checkbox when both flags true.

When ENABLE_CHAT_AUTO_SELECT is true and auto_select_bool is enabled, the checkbox is hidden (rightSection short-circuits). That means users can’t manually opt-out of an automatically added conversation. Consider keeping the checkbox but in read-only/“selected” state so the UX stays consistent.

Just a heads-up – may be intentional.


395-409: Duplicate .toLocaleLowerCase() calls – micro-perf & DRY.

You call conversation.source?.toLocaleLowerCase() three times inside the same render pass. Cache it in a const – less work for V8 and keeps it DRY.

-            {conversation.source?.toLocaleLowerCase().includes("upload") && (
+            {(() => {
+               const src = conversation.source?.toLocaleLowerCase() ?? "";
+               return src.includes("upload");
+             })() && (
...
-              !conversation.source?.toLocaleLowerCase().includes("upload") &&
+              !src.includes("upload") &&
echo/server/tests/test_quote_utils.py (1)

125-128: Consider using tiktoken for more accurate token counting.

Using len(quote.text.split()) is a simplistic approximation of token count. For production-grade token counting, consider using OpenAI's tiktoken library.

- total_tokens_small = sum(len(quote.text.split()) for quote in small_sample)
+ import tiktoken
+ encoding = tiktoken.get_encoding("cl100k_base")  # Or appropriate encoding
+ total_tokens_small = sum(len(encoding.encode(quote.text)) for quote in small_sample)
echo/server/tests/test_conversation_utils.py (1)

66-73: Insightful documentation of Directus behavior.

Documenting the Directus API's behavior with onCreate/onUpdate hooks is valuable tribal knowledge. Consider moving this to formal documentation.

-"""
-I found an extremely weird bug. 
-When using the Directus API to update fields that have onCreate/onUpdate hooks
-(or special attributes like date-created/date-updated) applied to them, 
-Directus silently ignores the values you pass in your JSON payload. 
-Instead, Directus will use its own internal logic to set these field values, 
-regardless of what you explicitly provide in your API request.
-"""
+"""
+Note: Directus API behavior with special fields
+
+When using the Directus API to update fields that have onCreate/onUpdate hooks
+(or special attributes like date-created/date-updated) applied to them, 
+Directus silently ignores the values you pass in your JSON payload. 
+Instead, Directus will use its own internal logic to set these field values, 
+regardless of what you explicitly provide in your API request.
+"""
echo/server/dembrane/tasks.py (1)

270-292: Avoid variable shadowing in list-comprehensions

Reusing chunk_id in the comprehension shadows the outer parameter and hurts readability.
Quick rename keeps things crystal:

-                            task_transcribe_chunk.message(chunk_id)
-                            for chunk_id in split_chunk_ids
-                            if chunk_id is not None
+                            task_transcribe_chunk.message(split_id)
+                            for split_id in split_chunk_ids
+                            if split_id is not None

(Same tweak applies to the else branch below.)

echo/directus/sync/specs/system.graphql (2)

410-416: Type aspect – add centroid_embedding
The aspect.centroid_embedding: String field provides embedding data. Consider marking it non-null (String!) if every aspect will always include an embedding.


1950-1953: Type project_chat – add auto_select
The optional auto_select: Boolean flag fits the model; consider documenting its behavior in the schema description.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b4642b6 and e3c234c.

⛔ Files ignored due to path filters (2)
  • echo/server/requirements-dev.lock is excluded by !**/*.lock
  • echo/server/requirements.lock is excluded by !**/*.lock
📒 Files selected for processing (51)
  • .github/workflows/ci.yml (1 hunks)
  • .github/workflows/dev-deploy-gitops-backends.yaml (1 hunks)
  • .github/workflows/dev-deploy-vercel-dashboard.yml (1 hunks)
  • .github/workflows/dev-deploy-vercel-portal.yml (1 hunks)
  • echo/.gitignore (1 hunks)
  • echo/.vscode/launch.json (4 hunks)
  • echo/.vscode/settings.json (1 hunks)
  • echo/directus/directus-sync.config.js (0 hunks)
  • echo/directus/sync/snapshot/fields/aspect/centroid_embedding.json (1 hunks)
  • echo/directus/sync/snapshot/fields/conversation/is_finished.json (1 hunks)
  • echo/directus/sync/snapshot/fields/conversation_chunk/timestamp.json (1 hunks)
  • echo/directus/sync/snapshot/fields/project/is_enhanced_audio_processing_enabled.json (1 hunks)
  • echo/directus/sync/snapshot/fields/project_chat/auto_select.json (1 hunks)
  • echo/directus/sync/specs/item.graphql (48 hunks)
  • echo/directus/sync/specs/system.graphql (28 hunks)
  • echo/frontend/src/components/conversation/ConversationAccordion.tsx (3 hunks)
  • echo/frontend/src/lib/typesDirectus.d.ts (1 hunks)
  • echo/server/dembrane/api/chat.py (1 hunks)
  • echo/server/dembrane/api/conversation.py (1 hunks)
  • echo/server/dembrane/api/participant.py (8 hunks)
  • echo/server/dembrane/api/project.py (2 hunks)
  • echo/server/dembrane/audio_lightrag/utils/litellm_utils.py (3 hunks)
  • echo/server/dembrane/audio_utils.py (1 hunks)
  • echo/server/dembrane/chains.py (0 hunks)
  • echo/server/dembrane/chat_utils.py (3 hunks)
  • echo/server/dembrane/config.py (3 hunks)
  • echo/server/dembrane/conversation_utils.py (1 hunks)
  • echo/server/dembrane/database.py (3 hunks)
  • echo/server/dembrane/directus.py (1 hunks)
  • echo/server/dembrane/main.py (3 hunks)
  • echo/server/dembrane/ner.py (1 hunks)
  • echo/server/dembrane/process_resource.py (0 hunks)
  • echo/server/dembrane/quote_utils.py (3 hunks)
  • echo/server/dembrane/s3.py (3 hunks)
  • echo/server/dembrane/scheduler.py (1 hunks)
  • echo/server/dembrane/sentry.py (2 hunks)
  • echo/server/dembrane/tasks.py (6 hunks)
  • echo/server/dembrane/tasks_config.py (0 hunks)
  • echo/server/prod-scheduler.sh (1 hunks)
  • echo/server/prod-worker-cpu.sh (1 hunks)
  • echo/server/prod-worker-liveness.py (0 hunks)
  • echo/server/prod-worker-readiness.py (0 hunks)
  • echo/server/prod-worker.sh (1 hunks)
  • echo/server/pyproject.toml (3 hunks)
  • echo/server/run-scheduler.sh (1 hunks)
  • echo/server/run-worker-cpu.sh (1 hunks)
  • echo/server/run-worker.sh (1 hunks)
  • echo/server/tests/common.py (1 hunks)
  • echo/server/tests/test_audio_utils.py (8 hunks)
  • echo/server/tests/test_conversation_utils.py (1 hunks)
  • echo/server/tests/test_quote_utils.py (1 hunks)
💤 Files with no reviewable changes (6)
  • echo/server/dembrane/tasks_config.py
  • echo/server/prod-worker-liveness.py
  • echo/server/prod-worker-readiness.py
  • echo/directus/directus-sync.config.js
  • echo/server/dembrane/process_resource.py
  • echo/server/dembrane/chains.py
🧰 Additional context used
🧬 Code Graph Analysis (8)
echo/server/dembrane/api/project.py (1)
echo/server/dembrane/tasks.py (2)
  • task_create_project_library (648-737)
  • task_create_view (521-576)
echo/server/dembrane/api/participant.py (1)
echo/server/dembrane/tasks.py (2)
  • task_process_conversation_chunk (225-296)
  • task_finish_conversation_hook (181-220)
echo/server/dembrane/conversation_utils.py (1)
echo/server/dembrane/utils.py (1)
  • get_utc_timestamp (49-50)
echo/frontend/src/components/conversation/ConversationAccordion.tsx (3)
echo/frontend/src/components/common/NavigationButton.tsx (1)
  • NavigationButton (29-119)
echo/frontend/src/config.ts (1)
  • ENABLE_CHAT_AUTO_SELECT (31-32)
echo/frontend/src/lib/utils.ts (1)
  • cn (4-6)
echo/server/tests/test_quote_utils.py (4)
echo/server/dembrane/database.py (4)
  • QuoteModel (447-490)
  • ProcessingStatusEnum (74-78)
  • ProjectAnalysisRunModel (168-200)
  • get_db (591-598)
echo/server/dembrane/quote_utils.py (1)
  • get_random_sample_quotes (266-397)
echo/server/tests/common.py (3)
  • delete_project (21-22)
  • create_conversation (25-34)
  • delete_conversation (37-38)
echo/server/dembrane/utils.py (1)
  • generate_uuid (13-14)
echo/server/dembrane/quote_utils.py (1)
echo/server/dembrane/database.py (1)
  • QuoteModel (447-490)
echo/server/dembrane/audio_lightrag/utils/litellm_utils.py (1)
echo/server/dembrane/audio_lightrag/utils/prompts.py (2)
  • Prompts (4-22)
  • text_structuring_model_system_prompt (17-22)
echo/server/tests/test_conversation_utils.py (3)
echo/server/dembrane/utils.py (1)
  • get_utc_timestamp (49-50)
echo/server/dembrane/conversation_utils.py (1)
  • collect_unfinished_conversations (11-45)
echo/server/tests/common.py (5)
  • create_project (9-18)
  • delete_project (21-22)
  • create_conversation (25-34)
  • delete_conversation (37-38)
  • delete_conversation_chunk (58-59)
🪛 Shellcheck (0.10.0)
echo/server/run-worker-cpu.sh

[error] 1-1: Tips depend on target shell and yours is unknown. Add a shebang or a 'shell' directive.

(SC2148)

echo/server/run-scheduler.sh

[error] 1-1: Tips depend on target shell and yours is unknown. Add a shebang or a 'shell' directive.

(SC2148)

echo/server/run-worker.sh

[error] 1-1: Tips depend on target shell and yours is unknown. Add a shebang or a 'shell' directive.

(SC2148)

⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: ci-build-servers (dbr-echo-server, ./echo/server, Dockerfile, dbr-echo-server)
🔇 Additional comments (116)
.github/workflows/dev-deploy-gitops-backends.yaml (1)

7-9: Add merge_group trigger alongside push.
This ensures the dev-deploy-gitops-backends workflow fires on grouped merges to main, aligning it with other deployment workflows. LGTM.

.github/workflows/ci.yml (1)

11-13: Extend CI to respond to merge_group events.
Adding merge_group for main keeps CI in sync with new merge grouping behavior without altering existing jobs. LGTM.

.github/workflows/dev-deploy-vercel-dashboard.yml (1)

9-11: Include merge_group for Vercel dashboard deploy.
Now triggers on group merges to main, matching other workflows’ event patterns. LGTM.

.github/workflows/dev-deploy-vercel-portal.yml (1)

9-11: Add merge_group trigger to portal deploy.
Enables consistent deployment on grouped merges to main, just like the dashboard and other pipelines. LGTM.

echo/.gitignore (1)

18-18: Add ignore for Celery schedule DB — LGTM!
The new celerybeat-schedule.db entry protects the repo from leftover Celery state now that we’ve migrated to Dramatiq.

echo/server/dembrane/audio_utils.py (1)

12-12: Remove unnecessary type ignore — verify ffmpeg type coverage
Dropping # type: ignore on import ffmpeg is a solid cleanup. Ensure the ffmpeg package has type stubs or adjust your mypy config so this doesn’t introduce new type errors.

echo/server/dembrane/ner.py (1)

9-9: Clean import of Pipeline — confirm trankit typings
Great removal of # type:ignore on from trankit import Pipeline. Double-check that the trankit module provides type definitions or update your mypy settings to avoid type checking failures.

echo/server/dembrane/directus.py (2)

3-3: Clean type declaration is 🔥

The removal of the # type: ignore comment for the DirectusClient import means we're now properly handling types. This improves type safety and IDE support. Nice cleanup!


7-7: Upgraded logger config for better traceability

Solid refactor using a fixed logger name "directus" instead of __name__. This makes log filtering more consistent and intentional, which is key for production observability.

echo/frontend/src/lib/typesDirectus.d.ts (1)

31-31: Proper schema extension for conversation tracking

Adding the required is_finished boolean field to the Conversation type is exactly what we need for the unfinished conversation collection functionality. This drives the UI indicators and backend filtering logic. Good call making it non-nullable.

echo/directus/sync/snapshot/fields/conversation_chunk/timestamp.json (1)

19-19: Timestamp field schema optimized

Setting special to null instead of ["date-created"] is the right call. This reflects that the timestamp field is still required (as shown in line 33's is_nullable: false) but doesn't have special date-created semantics anymore. This change aligns perfectly with the GraphQL schema updates making this a standard non-nullable timestamp field.

echo/directus/sync/snapshot/fields/project_chat/auto_select.json (1)

35-35: Schema change to make auto_select field nullable.

Making the auto_select field nullable allows more flexibility in the DB schema. This matches with the existing default value of true on line 31, giving you graceful handling of null cases while still having a sensible default behavior.

echo/server/dembrane/api/conversation.py (1)

529-529: Task queue processing migrated from Celery to Dramatiq.

Nice task queue migration. Changed from Celery's .delay() pattern to Dramatiq's .send() method while maintaining the same parameters. This is part of the broader migration from Celery to Dramatiq for async task processing mentioned in the PR description.

echo/server/dembrane/api/chat.py (1)

471-471: Removed unnecessary type ignore comment.

Solid cleanup removing the redundant # type: ignore comment. The type checker can now properly understand the asynchronous iteration over the response object. Makes the code cleaner while maintaining the same functionality.

echo/server/dembrane/database.py (2)

30-30: Removed type ignore comment from pgvector import.

Clean removal of unnecessary type annotation. Type system can now properly recognize the Vector import from pgvector.sqlalchemy.


309-309: Standardized string quotation for column name.

Switched from single quotes to double quotes for the "auto_select" column name. Maintains consistency with the project's string quotation style.

echo/directus/sync/snapshot/fields/project/is_enhanced_audio_processing_enabled.json (1)

1-45: Fire new boolean field for enhanced audio processing! 🚀

Sweet schema def for this flag. Default to true is the way to go - opt-out not opt-in. The nullable config gives flexibility while the cast-boolean handler keeps things clean. Perfect foundation for filtering convos in that unfinished collector cron job.

echo/.vscode/settings.json (2)

38-46: Terminal config for new workers-cpu is straight fire! 💯

Love the separation of worker types. This aligns perfectly with the Celery → Dramatiq migration. CPU-bound tasks getting their own worker group is a pro move for resource optimization.


49-58: Scheduler terminal group is killer! ⏱️

Adding a dedicated terminal for the new scheduler script is exactly what we need with the Dramatiq migration. Clean setup with proper env activation.

echo/directus/sync/snapshot/fields/aspect/centroid_embedding.json (1)

1-24: Vector field for centroids is clutch! 🧠

Solid schema for storing embeddings. Using the vector type is perfect for ML-driven clustering. Makes total sense this is nullable since embeddings might be generated async by your task pipeline.

One thing to note - type is set as "unknown" at meta level but "vector" at schema level. That's fine and likely due to Directus UI limitations, just flagging for awareness.

echo/directus/sync/snapshot/fields/conversation/is_finished.json (1)

1-45: Boolean flag for convo tracking is brilliant! 🎯

Perfect schema definition for tracking conversation state. Default to false is the right call as convos start unfinished. This is the cornerstone for your unfinished conversation collector cron. Clean implementation with the cast-boolean handler.

echo/server/dembrane/sentry.py (2)

4-4: Dramatiq monitoring import is on point! 📈

Smart move adding the Sentry integration for Dramatiq. This is critical for observability in the new task processing system.


53-53: Dramatiq integration added correctly! 💪

Perfect placement in the integrations list. This will ensure your new task queue system has proper error tracking and performance monitoring in Sentry.

echo/server/dembrane/main.py (3)

44-54: LGTM: Formatting refinements applied
The added trailing commas and removal of extraneous whitespace improve consistency without altering behavior.

Also applies to: 66-78


82-84: LGTM: Multi‐line initialize_pipeline_status call
Refactoring the await initialize_pipeline_status() into a multi‐line invocation enhances readability and aligns with the project’s style guidelines.


146-146: Review OpenAPI version bump
The OpenAPI version was bumped from 0.2.0 to 1.0.0, signifying a major release. Please confirm this aligns with your versioning strategy and update downstream clients if needed.

echo/server/dembrane/s3.py (2)

42-51: LGTM: Cleaned up imports and spacing
The removal of # type: ignore, standardized import ordering, and an extra blank line after the docstring boost readability.


189-198: LGTM: Streamlined get_file_size_from_s3_mb formatting
Consolidating blank lines and standardizing key quoting ("ContentLength") refines the function’s layout without impacting semantics.

echo/server/pyproject.toml (3)

39-39: Validate Sentry SDK extras
The celery extra was removed from sentry-sdk. Ensure your new sentry-dramatiq setup fully captures errors across all workers.


42-42: LGTM: Added colorlog for enhanced logs
Including colorlog aligns with recent logging improvements in dembrane.config.


83-88: LGTM: Dramatiq & scheduler dependencies
The suite of dependencies (dramatiq[redis,watch], sentry-dramatiq, apscheduler, dramatiq-workflow, lz4, gevent) correctly reflects the Celery→Dramatiq migration and scheduler introduction.

echo/server/dembrane/api/project.py (4)

328-328: Task invocation updated to Dramatiq - LGTM!

Switched from Celery's apply_async() to Dramatiq's send() method. Clean implementation with proper default value handling for the language parameter.


331-332: LGTM - Log message updated to reflect the new task creation pattern

Simple and informative logging that maintains the essential information while dropping the Celery-specific task ID tracking.


363-365: Dramatiq task invocation with proper default values - LGTM!

Solid migration from Celery to Dramatiq with proper handling of optional parameters. The use of or for default values is a clean pattern for ensuring parameters are never undefined.


367-367: LGTM - Simplified logging with all necessary context

Logging is concise and captures the essential context for observability.

echo/server/dembrane/conversation_utils.py (1)

11-46: LGTM - Solid implementation of unfinished conversation collection

This utility function elegantly handles the complex Directus query to find unfinished conversations. The filter logic is clean and performant with proper timestamp handling using UTC.

echo/server/dembrane/api/participant.py (5)

359-359: Task invocation updated to Dramatiq - LGTM!

Switched from Celery's .delay() to Dramatiq's .send() method. Clean migration maintaining the same parameter structure.


373-373: Migrated task call to Dramatiq - LGTM!

Changed from Celery's .delay() to Dramatiq's .send() for task_finish_conversation_hook. Maintains the same behavior with updated task system.


419-426: LGTM - Dictionary formatting improved for readability

Multi-line format makes the code more readable while maintaining the same functionality. This is a pure style improvement.


474-477: LGTM - Consistent dictionary formatting

Applied the same multi-line dictionary formatting as elsewhere for better readability.


316-316:

Details

✅ Verification successful

Changed default behavior for upload_conversation_chunk

Changed the default value of run_finish_hook from False to True, which means conversation chunks will now automatically trigger the finish hook by default. This change aligns with ECHO-164 for collecting unfinished conversations.


🏁 Script executed:

#!/bin/bash
# Description: Check where this flag is used and how it affects the system
rg "run_finish_hook" -A 5 -B 5

Length of output: 5598


Confirmed change to run_finish_hook default in upload_conversation_chunk

  • The upload_conversation_chunk endpoint now defaults run_finish_hook to True (was False).
  • task_process_conversation_chunk in dembrane/tasks.py still defaults to False, but the participant API now explicitly passes the flag.
  • The conversation API (dembrane/api/conversation.py) continues to override with run_finish_hook=False.
  • Front-end always sends run_finish_hook from payload.runFinishHook, so this default only applies to callers that omit the flag.

This update cleanly aligns with ECHO-164 for collecting unfinished conversations. LGTM!

echo/server/dembrane/scheduler.py (3)

1-13: Solid scheduler setup with persistence. LGTM!

Clean architecture using APScheduler with SQLAlchemy job store for persistence. The UTC timezone configuration is crucial for avoiding time-related bugs in distributed systems.


15-23: Clean job configuration! LGTM!

The periodic task for collecting unfinished conversations runs every 15 minutes with the replace_existing=True flag correctly set to handle redeployments. The commented alternative for 1-minute intervals is handy for testing.


25-28: Simple and effective entry point. LGTM!

The conditional block ensures the scheduler only starts when the module is run directly - perfect separation of concerns.

echo/server/tests/common.py (4)

1-7: Logger addition is a 10x improvement. LGTM!

Adding debug logging will make troubleshooting test failures much easier.


9-19: Elegant extension to create_project. LGTM!

The additional_data parameter with dictionary unpacking provides flexibility without breaking existing tests.


25-34: Same clean pattern applied to create_conversation. LGTM!

Consistent approach for extending functionality without changing existing behavior.


41-54: Debug-friendly conversation chunk creation. LGTM!

The added debug logging and default participant_name will save time during test failures. Dictionary unpacking pattern is consistent with other functions.

echo/.vscode/launch.json (2)

4-14: Improved debugger configuration for task module. LGTM!

The debugpy configuration for the tasks module is well-structured with proper working directory and PYTHONPATH settings.


35-69: Consistent terminal configuration. LGTM!

Setting console to "integratedTerminal" across all configurations ensures consistent debugging experience.

echo/server/dembrane/config.py (2)

5-45: Epic colorful logging enhancement. LGTM!

Adding colorlog with graceful fallback improves developer experience without breaking existing functionality. The color configuration for different log levels is spot on.


378-384: Clean multi-line formatting for better readability. LGTM!

Consistent application of Python's multi-line statement formatting makes the config validations more readable.

Also applies to: 387-390

echo/frontend/src/components/conversation/ConversationAccordion.tsx (1)

360-363: hasContent guard LGTM.

Using optional chaining with some() is the right call and avoids null-access crashes. Solid.

echo/server/tests/test_quote_utils.py (8)

1-1: LGTM! Using noqa for F821 is appropriate.

Disabling the F821 (undefined names) checker is good practice here since some variables might be injected or defined elsewhere. 🚀


117-118: Great test implementation with proper cleanup.

Creating test quotes with randomized embeddings is a 10x move - avoids external dependencies and makes the tests more isolated. The proper cleanup after tests prevents test pollution.


120-123: Solid approach for context limit testing.

Testing with different context limits ensures the function correctly handles varying token constraints - critical for LLM applications.


188-191: Excellent multi-conversation test setup.

Creating test quotes for multiple conversations is a solid approach to verify the conversation-aware sampling logic. This tests an important feature where at least one quote from each conversation should be included.


195-210: Great distribution testing approach.

Running multiple samples and tracking selection frequency is a solid statistical approach to verify the random distribution properties of the sampling algorithm.


215-219: Strong verification of cross-conversation sampling.

Verifying that quotes from each conversation are selected is critical for ensuring proper representation across all conversations in the project.


222-226: Good vector similarity search testing.

Testing the L2 distance functionality ensures the vector similarity search is working correctly - critical for semantic-aware quote selection.


241-267: Excellent edge case handling for empty projects.

Testing the empty project case is a crucial edge case that prevents potential null pointer exceptions or other runtime errors.

echo/server/tests/test_conversation_utils.py (6)

33-37: Smart timestamp manipulation for testing.

Creating a timestamp exactly 1h16m in the past is an excellent approach for testing the 15-minute threshold logic in the utility function. The logging of sent/received timestamps helps with debugging timestamp-related issues.


77-102: Comprehensive negative test for enhanced audio processing.

Great test that verifies conversations from projects without enhanced audio processing are correctly excluded, even when they have older chunks. This validates the filter logic works correctly.


103-114: Solid edge case testing for conversations with no chunks.

Testing that conversations with no chunks are included in the unfinished conversations list is an important edge case to cover.


117-130: Good threshold testing for older chunks.

Testing conversations with chunks older than the threshold (1hr > 15min) ensures the time-based filtering works correctly.


131-144: Critical edge case testing for multiple chunks.

Excellent test case with multiple chunks of different ages. This validates that the "most recent chunk" logic is correctly implemented.


145-160: Thorough testing of the 15-minute threshold.

Adding a 10-minute old chunk and verifying the conversation is excluded validates the crucial threshold logic. The detailed logging of timestamps helps with debugging time-based issues.

echo/server/tests/test_audio_utils.py (9)

33-33: Smart test data organization.

Defining a separate LARGE_AUDIO_SET allows for targeted testing of size-sensitive audio functionality while maintaining the ability to run faster tests with smaller files.


38-38: Extended test coverage for large audio files.

Adding LARGE_AUDIO_SET to the parametrization expands test coverage to include edge cases with larger audio files. This is excellent practice for ensuring robustness.


63-69: Clean assertion formatting.

The multiline formatting of assertions improves readability while maintaining the same logic. This is a solid refactor.


304-332: Excellent new test for byte-level audio probing.

Adding dedicated tests for probe_from_bytes ensures the core audio inspection functionality works correctly with in-memory data - critical for streaming and conversion workflows.


334-381: Solid S3 integration testing with proper cleanup.

Testing probe_from_s3 with various file formats validates the remote storage probing functionality. Using a try-finally block ensures cleanup even if assertions fail - this is production-grade testing.


383-412: Comprehensive format conversion matrix testing.

The extensive parametrization of format pairs is an excellent approach to validate all possible conversion paths. This matrix testing strategy will catch format-specific edge cases.


415-429: Smart test file selection logic.

Selecting one file per format from the available test files is an efficient way to test all format combinations without an explosion of test cases.


445-447: Informative file naming convention.

Using a descriptive naming convention that includes source and target formats in the merged file name makes debugging much easier. This is a best practice for test artifacts.


471-479: Thorough validation of merged audio files.

Comprehensive verification of the merged file properties ensures the merged output is valid and usable audio, not just a concatenation of bytes.

echo/server/dembrane/chat_utils.py (8)

94-107: Clean dictionary formatting.

The multiline dictionary formatting with consistent indentation and trailing commas is much more readable and maintainable. This follows modern Python style best practices.


150-169: Improved function signature formatting.

Breaking parameters into multiple lines with trailing commas makes the signature more readable and git-diff friendly. Using Python 3.9+ type hints (list[dict[str, str]] vs List[Dict[str, str]]) is also forward-looking.


173-175: Consistent function signature style.

Applying the same multiline parameter style to get_conversation_references maintains consistency throughout the codebase.


214-216: Clean response format specification.

The formatting changes here improve readability while maintaining the same functionality.


216-220: Better try-except block formatting.

The multiline try-except block with proper indentation is more readable and easier to maintain.


227-230: Clean function call formatting.

Breaking the function call parameters into multiple lines improves readability for this complex API call.


232-235: Improved error logging format.

Breaking the long error message into multiple lines makes the code more readable without changing functionality.


237-242: Clean dictionary formatting.

Consistent multiline dictionary formatting with proper indentation makes the code more readable and maintainable.

echo/directus/sync/specs/item.graphql (5)

22-25: LGTM: Query schema extended and reorganized.
The new query entry points for aspect, conversation_chunk, project_chat, conversation_segment_conversation_chunk, quote, conversation_reply, conversation_segment, project_chat_message_metadata, conversation, and project_report_notification_participants are grouped logically, include proper filter, pagination, versioning, and aggregation fields, and align with the updated GraphQL schema.

Also applies to: 82-93, 94-105, 106-109, 110-113, 114-117


131-132: LGTM: Create mutations added for new entities.
All create_* mutations—for aspect, conversation_chunk, project_chat, conversation_segment_conversation_chunk, quote, conversation_reply, conversation_segment, project_chat_message_metadata, conversation, and project_report_notification_participants—are defined with matching input types and fields.

Also applies to: 161-162, 163-164, 165-172, 173-174, 175-176, 177-178


194-196: LGTM: Update mutations added for new entities.
All update_* mutations corresponding to the newly introduced types are present and reference the correct input types, ensuring full CRUD coverage.

Also applies to: 239-241, 242-244, 245-247, 248-250, 251-253, 254-256, 257-259, 260-262, 263-265


276-277: LGTM: Delete mutations added for new entities.
The delete_* mutations for aspect, conversation_chunk, project_chat, conversation_segment_conversation_chunk, quote, conversation_reply, conversation_segment, project_chat_message_metadata, conversation, and project_report_notification_participants are all defined correctly.

Also applies to: 306-307, 308-309, 310-311, 312-313, 314-315, 316-317, 318-319, 320-321, 322-323


327-377: LGTM: Subscription schema extended.
New *_mutated subscription events are in place for permissions, users, aspects, conversation chunks, project chats, segments, quotes, replies, message metadata, conversations, and notification participants—completing the real-time event coverage.

echo/directus/sync/specs/system.graphql (27)

23-74: Consistent Query enhancements
The added Query fields for permissions, folders, roles, users, revisions, flows, operations, settings, and versions adhere to the established pattern of including filter, sort, limit, offset, page, and search arguments (where applicable). The grouping and ordering match existing collections, ensuring uniform client experience. LGTM.


114-236: Expanded Mutation CRUD operations
The new create_*, update_*, and delete_* mutations for permissions, folders, roles, users, flows, operations, versions, and settings correctly mirror the existing Directus CRUD naming conventions (create_x_items vs. create_x_item, batch vs. single). Arguments (filter, sort, limit, offset, page, search, data) are consistent with other entities. LGTM.


245-294: Comprehensive Subscription coverage
Subscriptions for mutated events on Directus entities (permissions, folders, roles, users, revisions, flows, operations, settings, versions, sync_id_map, aspect, conversation_chunk, project_chat, conversation_segment_conversation_chunk, quote, conversation_reply, conversation_segment, project_chat_message_metadata, conversation, project_report_notification_participants) are now in place. This ensures real-time updates across all new collections. LGTM.


447-460: Type conversation – enrich metadata
Fields processing_message, source, duration, is_finished, tags, and tags_func extend conversation metadata as designed. Ordering is coherent with existing fields. LGTM.


471-485: Type conversation_chunk – enrich metadata
Added timestamp: Date! (now non-nullable), source, processing_status, processing_message, and conversation_segments with conversation_segments_func. This aligns with the new data collection requirements. LGTM.


1896-1900: Type project – new interaction flags
Fields get_reply_prompt, is_get_reply_enabled, is_enhanced_audio_processing_enabled, and is_project_notification_subscription_allowed allow finer control over conversation features. They follow naming conventions. LGTM.


1983-1985: Type project_chat_message – add metadata relation
Fields chat_message_metadata and chat_message_metadata_func enable metadata linkage. Naming and placement are consistent. LGTM.


2293-2300: Filter aspect_filter – support centroid_embedding
Adding centroid_embedding: string_filter_operators ensures clients can filter aspects by embedding content. Consistent with other scalar filters. LGTM.


2335-2344: Filter conversation_chunk_filter – new operators
Filters for source, processing_status, processing_message, and conversation_segments (with func) align with the enriched conversation_chunk type. Ordering is consistent. LGTM.


2363-2374: Filter conversation_filter – metadata support
New filters for processing_message, source, duration, is_finished, tags, and project_chat_messages allow fine-grained querying. Matches the expanded conversation type. LGTM.


3590-3597: Filter project_chat_filter – add auto_select
Including auto_select: boolean_filter_operators covers new field filtering. Consistency maintained. LGTM.


3629-3633: Filter project_chat_message_filter – add metadata
Adding chat_message_metadata and chat_message_metadata_func to the filter matches the type. LGTM.


3668-3672: Filter project_filter – support new flags
Filters get_reply_prompt, is_get_reply_enabled, is_enhanced_audio_processing_enabled, and is_project_notification_subscription_allowed enable clients to query by these project settings. LGTM.


2452-2453: Input create_aspect_input – add centroid_embedding
Inclusion in the create input ensures new aspects can carry embeddings. Good.


2460-2472: Input create_conversation_chunk_input – enriched fields
New fields (timestamp, transcript, updated_at, source, processing_status, processing_message, conversation_segments) are correctly added. Matches the type. LGTM.


2489-2499: Input create_conversation_input – enriched fields
Fields processing_message, source, duration, is_finished, tags, and project_chat_messages are now supported on creation. Layout is logical. LGTM.


2875-2876: Input create_project_chat_input – add auto_select
Ensures auto_select can be set at create time. Good.


2902-2903: Input create_project_chat_message_input – add metadata
chat_message_metadata input list supports linking metadata on creation. LGTM.


2937-2941: Input create_project_input – add flags & prompt
New create inputs for get_reply_prompt, is_get_reply_enabled, is_enhanced_audio_processing_enabled, and is_project_notification_subscription_allowed complete the project configuration API. LGTM.


2998-2999: Input create_quote_input – add embedding
Allows embedding to be provided when creating a quote. Matches type. LGTM.


3811-3812: Input update_aspect_input – add centroid_embedding
Supports updating the embedding field on existing aspects. Good.


3825-3831: Input update_conversation_chunk_input – enriched fields
Fields for source, processing_status, processing_message, and conversation_segments are added. Matches create input and type. LGTM.


3849-3858: Input update_conversation_input – enriched fields
Supports updates to processing_message, source, duration, is_finished, tags, and project_chat_messages. Consistency confirmed.


4275-4276: Input update_project_chat_input – add auto_select
Enables toggling auto_select on existing chats. Good.


4302-4303: Input update_project_chat_message_input – add metadata
Allows metadata to be updated alongside chat messages. LGTM.


4337-4341: Input update_project_input – add flags & prompt
Mirror of create input ensures existing projects can update the new fields. LGTM.


4398-4399: Input update_quote_input – add embedding
Supports updating the embedding on quotes. Matches create and type. LGTM.

@@ -0,0 +1 @@
python -m dembrane.scheduler No newline at end of file
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add shebang for script portability
Scripts without a shebang default to unknown shells, which can lead to subtle failures. Add #!/usr/bin/env bash at the top for clarity and consistency.

+#!/usr/bin/env bash
 python -m dembrane.scheduler
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
python -m dembrane.scheduler
#!/usr/bin/env bash
python -m dembrane.scheduler
🧰 Tools
🪛 Shellcheck (0.10.0)

[error] 1-1: Tips depend on target shell and yours is unknown. Add a shebang or a 'shell' directive.

(SC2148)

@@ -0,0 +1 @@
dramatiq --queues cpu --processes 2 --threads 8 dembrane.tasks No newline at end of file
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Include shebang in worker script
To guarantee consistent execution, prepend the script with a shebang. For example:

+#!/usr/bin/env bash
 dramatiq --queues cpu --processes 2 --threads 8 dembrane.tasks
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
dramatiq --queues cpu --processes 2 --threads 8 dembrane.tasks
#!/usr/bin/env bash
dramatiq --queues cpu --processes 2 --threads 8 dembrane.tasks
🧰 Tools
🪛 Shellcheck (0.10.0)

[error] 1-1: Tips depend on target shell and yours is unknown. Add a shebang or a 'shell' directive.

(SC2148)

Comment on lines +3 to +5
version = "0.0.1"
description = "Python project for the Dembrane API"
requires-python = ">= 3.11"
requires-python = "== 3.11"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Verify project version & Python constraint
The version was changed to 0.0.1 (a downgrade) and requires-python locked to ==3.11. Confirm these decisions to avoid release confusion and unintended compatibility restrictions.


# Wait for all background jobs to finish
wait No newline at end of file
dramatiq-gevent --queues network --processes 2 --threads 50 dembrane.tasks No newline at end of file
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add shebang for shell script correctness

Script needs a shebang for proper shell execution. The Dramatiq setup looks solid with optimized process/thread counts for scalability.

+#!/bin/bash
 dramatiq-gevent --queues network --processes 2 --threads 50 dembrane.tasks
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
dramatiq-gevent --queues network --processes 2 --threads 50 dembrane.tasks
#!/bin/bash
dramatiq-gevent --queues network --processes 2 --threads 50 dembrane.tasks
🧰 Tools
🪛 Shellcheck (0.10.0)

[error] 1-1: Tips depend on target shell and yours is unknown. Add a shebang or a 'shell' directive.

(SC2148)

Comment on lines +350 to +377
try:
# First try using the native pgvector operator
closest_quote = db.scalars(
select(QuoteModel)
.filter(QuoteModel.project_analysis_run_id == project_analysis_run_id)
.order_by(QuoteModel.embedding.l2_distance(vector_param))
.limit(1)
).first()
except Exception as e:
logger.warning(f"Native pgvector operation failed: {e}")
db.rollback()
try:
# Try using SQL function approach
closest_quote = db.scalars(
select(QuoteModel)
.filter(QuoteModel.project_analysis_run_id == project_analysis_run_id)
.order_by(func.vector_l2_distance(QuoteModel.embedding, vector_param))
.limit(1)
).first()
except Exception as e2:
logger.warning(f"SQL function approach failed too: {e2}")
db.rollback()
# Fall back to random selection from the batch
if all_quotes:
closest_quote = random.choice(all_quotes)
else:
closest_quote = None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

❓ Verification inconclusive

Nice layered fallback – consider wrapping in savepoint to avoid side-effects after rollbacks.

The try-fallback-try pattern is 🔥. Two small quibbles:

  1. Repeated db.rollback() inside a larger loop may discard unrelated pending work. Wrapping each attempt in a session.begin_nested() (savepoint) isolates the failure and avoids resetting the whole transaction.
  2. func.vector_l2_distance depends on a custom SQL function; emit a DEBUG log telling the operator which extension to enable (CREATE FUNCTION vector_l2_distance …) so ops folks aren’t left guessing.

No immediate blocker, but worth polishing.


Use Savepoints for Query Fallback Isolation & Improve Logging

File: echo/server/dembrane/quote_utils.py
Lines: 350–377

Two quick polish points to harden the fallback logic:

  • Wrap each embedding‐distance lookup in its own savepoint to avoid full‐session rollbacks:
    with db.begin_nested():
        closest_quote = db.scalars(
            select(QuoteModel)
            …
        ).first()
  • When you fall back to func.vector_l2_distance, emit a DEBUG log to flag the required SQL function—e.g.:
    logger.debug(
        "Using SQL function fallback: ensure `vector_l2_distance` is defined, e.g. "
        "CREATE FUNCTION vector_l2_distance(…) RETURNS real AS $$ … $$ LANGUAGE sql;"
    )

These tweaks keep unrelated work intact and save ops folks some head-scratching. LGTM.


# Create a quote with embedded vector
quote = QuoteModel(
id=generate_uuid(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Need to import or define generate_uuid.

The generate_uuid() function is used but not defined or imported.

- id=generate_uuid(),
+ # Import the function from utils
+ from dembrane.utils import generate_uuid
+ id=generate_uuid(),

Better add this import at the top of the file.

Committable suggestion skipped: line range outside the PR's diff.


# Use random embedding vector instead of embedding the text
# This avoids dependency on the embedding service during tests
embedding = np.random.randn(EMBEDDING_DIM).tolist()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Need to define EMBEDDING_DIM constant.

The EMBEDDING_DIM variable is undefined in this scope. Even with the noqa directive, this could cause runtime issues.

- embedding = np.random.randn(EMBEDDING_DIM).tolist()
+ # Import the constant from relevant module or define it here
+ from dembrane.database import EMBEDDING_DIM
+ embedding = np.random.randn(EMBEDDING_DIM).tolist()

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 231 to 240
try:
return split_audio_chunk(chunk_id, "mp3")
except Exception as exc:
logger.error(f"Error: {exc}")
db.rollback()
raise self.retry(exc=exc) from exc

chunk = directus.get_item("conversation_chunk", chunk_id)

@celery_app.task(
bind=True,
retry_backoff=True,
ignore_result=False, # Result needs to be stored for chord to work
base=BaseTask,
)
def task_create_transcription_chord(_self, chunk_ids: List[str], conversation_id: str):
"""
Create a chord of transcription tasks for each chunk ID,
with the finish conversation hook as the callback.
# attempt to access
logger.info(f"Chunk {chunk_id} found in conversation: {chunk['conversation_id']}")

This separates the chord creation into its own task to avoid
serialization issues with inline functions.
"""
# Create a task for each chunk ID
header = [
task_transcribe_conversation_chunk.si(chunk_id).on_error(log_error.s())
for chunk_id in chunk_ids
]
except Exception as e:
logger.error(f"Error: {e}")
return

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Let the worker fail fast when the chunk lookup bombs

Swallowing the exception and returning None masks systemic issues (404, network blips, auth failures).
Raising will mark the message as failed and let Dramatiq’s monitoring / Sentry catch it.

-        except Exception as e:
-            logger.error(f"Error: {e}")
-            return
+        except Exception as e:
+            logger.error(f"Error: {e}")
+            raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try:
return split_audio_chunk(chunk_id, "mp3")
except Exception as exc:
logger.error(f"Error: {exc}")
db.rollback()
raise self.retry(exc=exc) from exc
chunk = directus.get_item("conversation_chunk", chunk_id)
@celery_app.task(
bind=True,
retry_backoff=True,
ignore_result=False, # Result needs to be stored for chord to work
base=BaseTask,
)
def task_create_transcription_chord(_self, chunk_ids: List[str], conversation_id: str):
"""
Create a chord of transcription tasks for each chunk ID,
with the finish conversation hook as the callback.
# attempt to access
logger.info(f"Chunk {chunk_id} found in conversation: {chunk['conversation_id']}")
This separates the chord creation into its own task to avoid
serialization issues with inline functions.
"""
# Create a task for each chunk ID
header = [
task_transcribe_conversation_chunk.si(chunk_id).on_error(log_error.s())
for chunk_id in chunk_ids
]
except Exception as e:
logger.error(f"Error: {e}")
return
try:
chunk = directus.get_item("conversation_chunk", chunk_id)
# attempt to access
logger.info(f"Chunk {chunk_id} found in conversation: {chunk['conversation_id']}")
except Exception as e:
logger.error(f"Error: {e}")
raise

Comment on lines 67 to +71
ssl_params = ""
if REDIS_URL.startswith("rediss://") and "?ssl_cert_reqs=" not in REDIS_URL:
ssl_params = "?ssl_cert_reqs=CERT_NONE"

celery_app = Celery(
"tasks",
broker=REDIS_URL + "/1" + ssl_params,
result_backend=REDIS_URL + "/1" + ssl_params,
)
redis_connection_string = REDIS_URL + "/1" + ssl_params
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Make Redis SSL-param patching URL-safe

Blindly concatenating "?ssl_cert_reqs=CERT_NONE" breaks whenever the base URL already
contains query parameters (double ?) or an explicit DB index.
A tiny helper keeps this bullet-proof and future-proof:

-ssl_params = ""
-if REDIS_URL.startswith("rediss://") and "?ssl_cert_reqs=" not in REDIS_URL:
-    ssl_params = "?ssl_cert_reqs=CERT_NONE"
-
-redis_connection_string = REDIS_URL + "/1" + ssl_params
+from urllib.parse import urlsplit, urlunsplit, parse_qsl, urlencode
+
+parts = urlsplit(REDIS_URL.rstrip("/"))
+path   = parts.path or "/1"                      # default DB=1 if none supplied
+query  = dict(parse_qsl(parts.query))
+
+if parts.scheme == "rediss" and "ssl_cert_reqs" not in query:
+    query["ssl_cert_reqs"] = "CERT_NONE"
+
+redis_connection_string = urlunsplit(
+    parts._replace(path=path, query=urlencode(query))
+)

Comment on lines +48 to 59
class DramatiqLz4JSONEncoder(JSONEncoder):
def encode(self, data: MessageData) -> bytes:
return lz4.frame.compress(super().encode(data))

def stop(self, _worker):
HEARTBEAT_FILE.unlink(missing_ok=True)

def update_heartbeat_file(self, _worker):
HEARTBEAT_FILE.touch()
def decode(self, data: bytes) -> MessageData:
try:
decompressed = lz4.frame.decompress(data)
except RuntimeError:
# Uncompressed data from before the switch to lz4
decompressed = data
return super().decode(decompressed)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Catch the right exceptions when falling back from lz4-compression

lz4.frame.decompress typically raises lz4.frame.LZ4FrameError or ValueError, not a plain RuntimeError.
Catching the concrete errors prevents us from silently swallowing unrelated bugs and aligns with the [“don’t hide dragons”] mantra.

-        except RuntimeError:
+        # Fallback for data that was never lz4-compressed.
+        # NB: `LZ4FrameError` (or `ValueError` in old lz4 versions) is what the
+        # lib actually throws.
+        except (RuntimeError, lz4.frame.LZ4FrameError, ValueError):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class DramatiqLz4JSONEncoder(JSONEncoder):
def encode(self, data: MessageData) -> bytes:
return lz4.frame.compress(super().encode(data))
def stop(self, _worker):
HEARTBEAT_FILE.unlink(missing_ok=True)
def update_heartbeat_file(self, _worker):
HEARTBEAT_FILE.touch()
def decode(self, data: bytes) -> MessageData:
try:
decompressed = lz4.frame.decompress(data)
except RuntimeError:
# Uncompressed data from before the switch to lz4
decompressed = data
return super().decode(decompressed)
class DramatiqLz4JSONEncoder(JSONEncoder):
def encode(self, data: MessageData) -> bytes:
return lz4.frame.compress(super().encode(data))
def decode(self, data: bytes) -> MessageData:
try:
decompressed = lz4.frame.decompress(data)
# Fallback for data that was never lz4-compressed.
# NB: `LZ4FrameError` (or `ValueError` in old lz4 versions) is what the
# lib actually throws.
except (RuntimeError, lz4.frame.LZ4FrameError, ValueError):
decompressed = data
return super().decode(decompressed)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant