From 55d6d6812838bfced24b509c1a582ba749ba8e28 Mon Sep 17 00:00:00 2001 From: Usama Date: Wed, 18 Mar 2026 16:11:32 +0000 Subject: [PATCH] Refactor conversation handling in tasks.py and conversation.py - Simplified the logic in `task_merge_conversation_chunks` by removing redundant checks for conversation state. - Added functionality in `ConversationService` to reset conversation state if it was previously finished and merged, ensuring all segments are processed correctly. --- echo/server/dembrane/service/conversation.py | 21 ++++++++++++++++++++ echo/server/dembrane/tasks.py | 15 +++++++------- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/echo/server/dembrane/service/conversation.py b/echo/server/dembrane/service/conversation.py index 75ccf2cd..4a8ba4b3 100644 --- a/echo/server/dembrane/service/conversation.py +++ b/echo/server/dembrane/service/conversation.py @@ -439,6 +439,27 @@ def create_chunk( conversation = self.get_by_id_or_raise(conversation_id) + # If the conversation was already finished+merged (e.g. auto-finished after + # a 5-min pause), reset its state so the finalization pipeline will run again + # once the user finishes recording. This ensures all segments are merged. + if conversation.get("is_finished") and conversation.get("merged_audio_path"): + logger.info( + f"Conversation {conversation_id} was already finished+merged. " + "Resetting state for new chunks." + ) + with self._client_context() as client: + client.update_item( + "conversation", + conversation_id, + { + "is_finished": False, + "is_all_chunks_transcribed": False, + "merged_audio_path": None, + "duration": None, + "summary": None, + }, + ) + project = self.project_service.get_by_id_or_raise(conversation["project_id"]) if project.get("is_conversation_allowed", False) is False: diff --git a/echo/server/dembrane/tasks.py b/echo/server/dembrane/tasks.py index e94a9c2c..2ab17945 100644 --- a/echo/server/dembrane/tasks.py +++ b/echo/server/dembrane/tasks.py @@ -182,7 +182,9 @@ def task_transcribe_chunk( ) return - transcribe_conversation_chunk(conversation_chunk_id, use_pii_redaction, anonymize_transcripts) + transcribe_conversation_chunk( + conversation_chunk_id, use_pii_redaction, anonymize_transcripts + ) # Transcription succeeded - decrement counter and check for finalization _on_chunk_transcription_done(conversation_id, conversation_chunk_id, logger) @@ -575,12 +577,7 @@ def task_merge_conversation_chunks(conversation_id: str) -> None: try: try: - conversation = conversation_service.get_by_id_or_raise(conversation_id) - - if conversation["is_finished"] and conversation["merged_audio_path"] is not None: - logger.info(f"Conversation {conversation_id} already merged, skipping") - return - + conversation_service.get_by_id_or_raise(conversation_id) except Exception: logger.error(f"Conversation not found: {conversation_id}") return @@ -777,7 +774,9 @@ def task_process_conversation_chunk( group( [ - task_transcribe_chunk.message(cid, conversation_id, use_pii_redaction, anonymize_transcripts) + task_transcribe_chunk.message( + cid, conversation_id, use_pii_redaction, anonymize_transcripts + ) for cid in valid_chunk_ids ] ).run()