diff --git a/.gitignore b/.gitignore index cbffd8c0..dcfc6d55 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,8 @@ echo/server/dembrane/audio_lightrag/data/* echo/server/dembrane/audio_lightrag/data/progress_tracker.csv echo/server/test.py echo/server/wandb* + + +notes_*.md +*.env +.gitignore diff --git a/echo/frontend/src/config.ts b/echo/frontend/src/config.ts index 0ea6e279..580866e3 100644 --- a/echo/frontend/src/config.ts +++ b/echo/frontend/src/config.ts @@ -6,8 +6,11 @@ export const PARTICIPANT_BASE_URL = import.meta.env.VITE_PARTICIPANT_BASE_URL ?? window.location.origin; export const API_BASE_URL = import.meta.env.VITE_API_BASE_URL ?? "/api"; -export const DIRECTUS_PUBLIC_URL = - import.meta.env.VITE_DIRECTUS_PUBLIC_URL ?? "http://localhost:8055"; +// Handle relative URLs for Directus proxy +const directusEnvUrl = import.meta.env.VITE_DIRECTUS_PUBLIC_URL ?? "http://localhost:8055"; +export const DIRECTUS_PUBLIC_URL = directusEnvUrl.startsWith('/') + ? `${window.location.origin}${directusEnvUrl}` + : directusEnvUrl; export const DIRECTUS_CONTENT_PUBLIC_URL = import.meta.env.VITE_DIRECTUS_CONTENT_PUBLIC_URL ?? diff --git a/echo/server/dembrane/audio_utils.py b/echo/server/dembrane/audio_utils.py index a155b9da..9227e9ed 100644 --- a/echo/server/dembrane/audio_utils.py +++ b/echo/server/dembrane/audio_utils.py @@ -11,7 +11,14 @@ import ffmpeg -from dembrane.s3 import s3_client, delete_from_s3, get_stream_from_s3, get_sanitized_s3_key +from dembrane.s3 import ( + s3_client, + delete_from_s3, + get_signed_url, + get_stream_from_s3, + get_sanitized_s3_key, + get_file_size_bytes_from_s3, +) from dembrane.utils import generate_uuid from dembrane.config import STORAGE_S3_BUCKET, STORAGE_S3_ENDPOINT from dembrane.service import conversation_service @@ -21,6 +28,8 @@ def sanitize_filename_component(val: str) -> str: + """Return a filesystem-safe component by stripping unsupported characters.""" + # Only allow alphanumeric, dash, and underscore. Remove other chars. return "".join(c for c in val if c.isalnum() or c in ("-", "_")) @@ -42,6 +51,8 @@ def sanitize_filename_component(val: str) -> str: def get_file_format_from_file_path(file_path: str) -> str: + """Infer the lowercase format/extension for a given file path.""" + extension = file_path.lower().split(".")[-1].split("?")[0] if extension in ACCEPTED_AUDIO_FORMATS: return extension @@ -50,6 +61,8 @@ def get_file_format_from_file_path(file_path: str) -> str: def get_mime_type_from_file_path(file_path: str) -> str: + """Map a file path to a best-effort MIME type based on its extension.""" + if file_path.endswith(".wav"): return "audio/wav" elif file_path.endswith(".mp3"): @@ -315,81 +328,109 @@ def merge_multiple_audio_files_and_save_to_s3( logger.info(f"Starting audio merge for {len(input_file_names)} files") start_time = time.time() - # Check total size of all input files and load data - total_size_mb = 0 + total_size_bytes = 0 # Process each file - probe format and convert if needed - processed_data_streams = [] + processed_keys: List[str] = [] + + def _sanitize_key(path: str) -> str: + return get_sanitized_s3_key(path) + + def _with_extension(path: str, ext: str) -> str: + base = path.rsplit(".", 1)[0] if "." in path else path + return f"{base}.{ext}" + for i_name in input_file_names: - # Probe file to determine format + sanitized_input_key = _sanitize_key(i_name) + try: - probe_result = probe_from_s3(i_name, get_file_format_from_file_path(i_name)) + signed_probe_url = get_signed_url(sanitized_input_key, expires_in_seconds=15 * 60) + probe_result = ffmpeg.probe(signed_probe_url) - # Check if format is output_format - is_output_format = False - if "format" in probe_result: - format_name = probe_result["format"].get("format_name", "").lower() - if output_format in format_name: - is_output_format = True - logger.info(f"File {i_name} is already in {output_format} format") + format_name = probe_result.get("format", {}).get("format_name", "").lower() + is_output_format = output_format in format_name if format_name else False if not is_output_format: - logger.warning(f"File {i_name} is not in {output_format} format, converting") - converted_file_name = i_name + f".{output_format}" - convert_and_save_to_s3(i_name, converted_file_name, output_format) - # Get the stream from S3 - processed_data_streams.append(get_stream_from_s3(converted_file_name)) - + logger.info( + "Converting %s to %s before merge", sanitized_input_key, output_format + ) + converted_key = _with_extension(sanitized_input_key, output_format) + convert_and_save_to_s3( + sanitized_input_key, + converted_key, + output_format, + ) + final_key = converted_key else: - # Already output_format, use as-is - processed_data_streams.append(get_stream_from_s3(i_name)) + final_key = sanitized_input_key + + size_bytes = get_file_size_bytes_from_s3(final_key) + processed_keys.append(final_key) + total_size_bytes += size_bytes except Exception as e: - logger.error(f"Error probing file {i_name}: {str(e)} - Moving on to next file") + logger.error("Failed preparing %s for merge: %s", sanitized_input_key, e, exc_info=True) + continue - if not processed_data_streams: - raise ValueError("No processed data streams") + if not processed_keys: + raise ValueError("No valid input files available for merge") - with tempfile.NamedTemporaryFile(suffix=f".{output_format}") as temp_file: - for data_stream in processed_data_streams: - temp_file.write(data_stream.read()) + # Build concat playlist using presigned URLs so ffmpeg streams directly from storage + playlist_lines: List[str] = [] + for key in processed_keys: + signed_url = get_signed_url(key, expires_in_seconds=15 * 60) + safe_url = signed_url.replace("'", "'\\''") + playlist_lines.append(f"file '{safe_url}'") - temp_file.flush() + playlist_bytes = ("\n".join(playlist_lines) + "\n").encode("utf-8") - if output_format == "ogg": - # Final processing to ensure consistent output - process = ( - ffmpeg.input(temp_file.name, format=output_format) - .output("pipe:1", f="ogg", acodec="libvorbis", q="5") - .global_args("-hide_banner", "-loglevel", "warning") - .overwrite_output() - .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) - ) - elif output_format == "mp3": - process = ( - ffmpeg.input(temp_file.name, format=output_format) - .output( - "pipe:1", - f="mp3", - acodec="libmp3lame", - q="5", - preset="veryfast", - ) - .global_args("-hide_banner", "-loglevel", "warning") - .overwrite_output() - .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) - ) - else: - raise ValueError(f"Not implemented for file format: {output_format}") + logger.info("Starting streaming merge via ffmpeg concat for %d files", len(processed_keys)) - output, err = process.communicate(input=None) + output_kwargs = { + "format": output_format, + } + + if output_format == "ogg": + output_kwargs.update({"acodec": "libvorbis", "q": "5"}) + elif output_format == "mp3": + output_kwargs.update({"acodec": "libmp3lame", "q": "5", "preset": "veryfast"}) + else: + raise ValueError(f"Not implemented for file format: {output_format}") + + process = ( + ffmpeg.input( + "pipe:0", + format="concat", + safe=0, + protocol_whitelist="file,pipe,data,http,https,tcp,tls,crypto", + ) + .output("pipe:1", **output_kwargs) + .global_args( + "-hide_banner", + "-loglevel", + "warning", + ) + .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) + ) + + output = None + err = None + + try: + output, err = process.communicate(input=playlist_bytes) + finally: + if process.stdin: + process.stdin.close() if process.returncode != 0: error_message = err.decode() if err else "Unknown FFmpeg error" + logger.error("FFmpeg concat failed: %s", error_message) raise FFmpegError(f"FFmpeg final processing failed: {error_message}") - # Save to S3 - logger.info(f"Saving merged audio to S3 as {output_file_name}") + if not output: + raise ConversionError("FFmpeg produced empty output during merge") + + logger.info("Saving merged audio to S3 as %s", output_file_name) s3_client.put_object( Bucket=STORAGE_S3_BUCKET, Key=get_sanitized_s3_key(output_file_name), @@ -400,13 +441,15 @@ def merge_multiple_audio_files_and_save_to_s3( info = s3_client.head_object( Bucket=STORAGE_S3_BUCKET, Key=get_sanitized_s3_key(output_file_name) ) - logger.debug(f"Head object from S3: {info}") + logger.debug("Head object from S3: %s", info) duration = time.time() - start_time logger.info( - f"Completed merging {len(input_file_names)} files in {duration:.2f}s. " - f"Total input size: {total_size_mb:.1f}MB" + "Completed streaming merge of %d files in %.2fs. Total input size: %.1fMB", + len(processed_keys), + duration, + total_size_bytes / (1024 * 1024), ) public_url = f"{STORAGE_S3_ENDPOINT}/{STORAGE_S3_BUCKET}/{output_file_name}" @@ -549,10 +592,14 @@ def probe_from_bytes(file_bytes: bytes, input_format: str) -> dict: def probe_from_s3(file_name: str, input_format: str) -> dict: + """Probe metadata for an S3 object by streaming it into ``probe_from_bytes``.""" + return probe_from_bytes(get_stream_from_s3(file_name).read(), input_format) def get_duration_from_s3(file_name: str) -> float: + """Return the media duration in seconds for an object stored in S3.""" + probe_data = probe_from_s3(file_name, get_file_format_from_file_path(file_name)) if "format" in probe_data and "duration" in probe_data["format"]: return float(probe_data["format"]["duration"]) @@ -569,6 +616,8 @@ def split_audio_chunk( chunk_size_bytes: int = MAX_CHUNK_SIZE, delete_original: bool = True, ) -> List[str]: + """Split a stored chunk by approximate byte size using temporary files.""" + logger = logging.getLogger("audio_utils.pre_process_audio") original_chunk = conversation_service.get_chunk_by_id_or_raise(original_chunk_id) @@ -672,7 +721,8 @@ def split_audio_chunk( chunk_output, err = process.communicate(input=None) if process.returncode != 0: - raise FFmpegError(f"ffmpeg splitting failed: {err.decode().strip()}") + err_msg = err.decode().strip() if err else "unknown error" + raise FFmpegError(f"ffmpeg splitting failed: {err_msg}") s3_client.put_object( Bucket=STORAGE_S3_BUCKET, @@ -711,3 +761,234 @@ def split_audio_chunk( logger.debug(f"Successfully split file into {number_chunks} chunks.") return new_ids + + +# ============================================================================ +# STREAMING AUDIO PROCESSING FUNCTIONS (Optimized) +# ============================================================================ + + +def probe_audio_from_s3(s3_path: str) -> dict: + """Probe full media metadata for an S3 object using a presigned URL. + + This respects the source format (no hardcoded suffix) and returns the full + ffprobe payload including both ``format`` and ``streams``. + + Args: + s3_path: S3 key or public URL to the object. + + Returns: + dict: Full ffprobe output with keys like ``format`` and ``streams``. + """ + try: + sanitized_key = get_sanitized_s3_key(s3_path) + signed_url = get_signed_url(sanitized_key, expires_in_seconds=15 * 60) + return ffmpeg.probe(signed_url) + except Exception as e: + logger.error(f"Failed to probe audio from S3: {s3_path}, error: {e}") + raise + + +def should_split_chunk(chunk_id: str, max_size_mb: int = 20) -> bool: + """ + Determine if chunk needs splitting based on file size. + Uses HEAD request (no download). + + Args: + chunk_id: Chunk ID to check + max_size_mb: Maximum size before splitting + + Returns: + True if chunk should be split + """ + from dembrane.s3 import get_file_size_bytes_from_s3 + + chunk = conversation_service.get_chunk_by_id_or_raise(chunk_id) + s3_path = chunk["path"] + + try: + file_size_bytes = get_file_size_bytes_from_s3(s3_path) + file_size_mb = file_size_bytes / (1024 * 1024) + + logger.info(f"Chunk {chunk_id} size: {file_size_mb:.2f}MB") + return file_size_mb > max_size_mb + + except Exception as e: + logger.warning(f"Could not determine chunk size, assuming needs split: {e}") + return True # Conservative: split if unsure + + +def split_audio_chunk_streaming( + original_chunk_id: str, + output_format: str = "mp3", + target_duration_seconds: int = 300, + delete_original: bool = False, +) -> List[str]: + """ + Split audio chunk using optimized streaming operations. + + Improvements over old split_audio_chunk(): + - Downloads once and seeks efficiently for each segment + - Parallelizes chunk uploads + - Better error handling + + Args: + original_chunk_id: ID of chunk to split + output_format: Target format (mp3, webm, etc.) + target_duration_seconds: Target duration per split chunk + delete_original: Whether to delete original after splitting + + Returns: + List of new chunk IDs created + """ + from concurrent.futures import ThreadPoolExecutor, as_completed + + from dembrane.s3 import save_bytes_to_s3 + + logger.info(f"Starting streaming split for chunk {original_chunk_id}") + + # Get chunk metadata + chunk = conversation_service.get_chunk_by_id_or_raise(original_chunk_id) + conversation_id = chunk["conversation_id"] + original_path = chunk["path"] + + # Probe file to get duration + try: + probe_info = probe_from_s3(original_path, get_file_format_from_file_path(original_path)) + total_duration = float(probe_info['format']['duration']) + logger.info(f"Chunk {original_chunk_id} duration: {total_duration}s") + except Exception as e: + logger.error(f"Failed to probe chunk {original_chunk_id}: {e}") + raise + + # Calculate number of chunks needed + num_chunks = math.ceil(total_duration / target_duration_seconds) + + if num_chunks <= 1: + logger.info(f"Chunk {original_chunk_id} doesn't need splitting (duration: {total_duration}s)") + return [original_chunk_id] + + logger.info(f"Splitting chunk {original_chunk_id} into {num_chunks} chunks") + + temp_input_path = None + original_format = get_file_format_from_file_path(original_path) + new_chunk_ids: List[str] = [] # Track only successfully processed chunk IDs + upload_tasks = [] # Tuples of (s3_key, data, chunk_id) + successful_chunks = [] # Tuples of (chunk_id, start_time_offset) + + try: + with tempfile.NamedTemporaryFile(suffix=f".{original_format}", delete=False) as temp_input: + temp_input_path = temp_input.name + audio_stream = get_stream_from_s3(original_path) + while True: + chunk_bytes = audio_stream.read(1024 * 1024) + if not chunk_bytes: + break + temp_input.write(chunk_bytes) + temp_input.flush() + + codec_kwargs = {} + if output_format == "mp3": + codec_kwargs = {"acodec": "libmp3lame", "q": "5", "preset": "veryfast"} + elif output_format == "ogg": + codec_kwargs = {"acodec": "libvorbis", "q": "5"} + + for i in range(num_chunks): + start_time = i * target_duration_seconds + duration = max(0, min(target_duration_seconds, total_duration - start_time)) + if duration <= 0: + continue + + try: + new_chunk_id = generate_uuid() + process = ( + ffmpeg.input(temp_input_path, ss=start_time) + .output( + "pipe:1", + format=output_format, + t=duration, + **codec_kwargs, + ) + .global_args("-hide_banner", "-loglevel", "error") + .run_async(pipe_stdout=True, pipe_stderr=True) + ) + + chunk_output, err = process.communicate() + + if process.returncode != 0: + err_msg = err.decode() if err else "unknown error" + logger.error(f"FFmpeg error for chunk {i}: {err_msg}") + raise Exception(f"FFmpeg failed: {err_msg}") + + s3_key = f"conversation/{conversation_id}/chunks/{new_chunk_id}.{output_format}" + upload_tasks.append((s3_key, chunk_output, new_chunk_id)) + new_chunk_ids.append(new_chunk_id) + successful_chunks.append((new_chunk_id, start_time)) + + except Exception as e: + logger.error(f"Failed to process chunk {i}: {e}") + raise + + finally: + if temp_input_path and os.path.exists(temp_input_path): + try: + os.unlink(temp_input_path) + except OSError as cleanup_err: + logger.warning(f"Failed to remove temp file {temp_input_path}: {cleanup_err}") + + # Upload all chunks in parallel (using threads) + logger.info(f"Uploading {len(upload_tasks)} split chunks in parallel") + + def upload_chunk(s3_key: str, data: bytes, chunk_id: str): + """Upload a single chunk to S3""" + try: + save_bytes_to_s3(data, s3_key, public=False) + logger.info(f"Uploaded chunk {chunk_id} ({len(data)} bytes)") + return True + except Exception as e: + logger.error(f"Failed to upload chunk {chunk_id}: {e}") + raise + + with ThreadPoolExecutor(max_workers=5) as executor: + futures = [ + executor.submit(upload_chunk, s3_key, data, chunk_id) + for s3_key, data, chunk_id in upload_tasks + ] + + # Wait for all uploads to complete + for future in as_completed(futures): + future.result() # Raises exception if upload failed + + # Create database records for new chunks + logger.info(f"Creating database records for {len(successful_chunks)} split chunks") + + for new_chunk_id, start_time_offset in successful_chunks: + s3_key = f"conversation/{conversation_id}/chunks/{new_chunk_id}.{output_format}" + + # Calculate timestamp for this chunk + original_timestamp = datetime.datetime.fromisoformat(chunk["timestamp"]) + new_timestamp = original_timestamp + timedelta(seconds=start_time_offset) + + # Create chunk record + directus.create_item( + "conversation_chunk", + item_data={ + "id": new_chunk_id, + "conversation_id": conversation_id, + "timestamp": new_timestamp.isoformat(), + "path": f"{STORAGE_S3_ENDPOINT}/{STORAGE_S3_BUCKET}/{s3_key}", + "source": chunk["source"], + }, + ) + + # Delete original chunk if requested + if delete_original: + logger.info(f"Deleting original chunk {original_chunk_id}") + try: + delete_from_s3(original_path) + directus.delete_item("conversation_chunk", original_chunk_id) + except Exception as e: + logger.warning(f"Failed to delete original chunk: {e}") + + logger.info(f"Split complete: {original_chunk_id} → {len(new_chunk_ids)} chunks") + return new_chunk_ids diff --git a/echo/server/dembrane/conversation_utils.py b/echo/server/dembrane/conversation_utils.py index 9ae5fb12..cad5b576 100644 --- a/echo/server/dembrane/conversation_utils.py +++ b/echo/server/dembrane/conversation_utils.py @@ -20,11 +20,11 @@ def collect_unfinished_conversations() -> List[str]: "filter": { # Must be unfinished "is_finished": False, - # Must not have a chunk in the last 5 minutes :) + # Must not have a chunk in the last 3 minutes :) "chunks": { "_none": { "timestamp": { - "_gte": (get_utc_timestamp() - timedelta(minutes=5)).isoformat() + "_gte": (get_utc_timestamp() - timedelta(minutes=3)).isoformat() } } }, diff --git a/echo/server/dembrane/s3.py b/echo/server/dembrane/s3.py index d34e0f00..cc51181e 100644 --- a/echo/server/dembrane/s3.py +++ b/echo/server/dembrane/s3.py @@ -202,6 +202,55 @@ def get_file_size_from_s3_mb(file_name: str) -> float: return response["ContentLength"] / (1024 * 1024) +def save_bytes_to_s3(data: bytes, file_name: str, public: bool = False) -> str: + """ + Save bytes directly to S3 without temp file. + + Args: + data: Bytes to upload + file_name: S3 key + public: Whether to make file public + + Returns: + Full S3 URL + """ + file_name = get_sanitized_s3_key(file_name) + + s3_client.put_object( + Bucket=STORAGE_S3_BUCKET, + Key=file_name, + Body=data, + ACL="public-read" if public else "private", + ) + + return f"{STORAGE_S3_ENDPOINT}/{STORAGE_S3_BUCKET}/{file_name}" + + +def get_file_exists_in_s3(file_name: str) -> bool: + """ + Check if file exists in S3 without downloading. + Uses HEAD request. + + Args: + file_name: S3 key + + Returns: + True if file exists + """ + from botocore.exceptions import ClientError + + try: + s3_client.head_object( + Bucket=STORAGE_S3_BUCKET, + Key=get_sanitized_s3_key(file_name) + ) + return True + except ClientError as e: + if e.response['Error']['Code'] == '404': + return False + raise + + def save_audio_to_s3(audio: AudioSegment, file_name: str, public: bool = False) -> str: """ Save an AudioSegment object directly to S3. diff --git a/echo/server/dembrane/service/conversation.py b/echo/server/dembrane/service/conversation.py index 1c6a7321..9e1f79af 100644 --- a/echo/server/dembrane/service/conversation.py +++ b/echo/server/dembrane/service/conversation.py @@ -270,7 +270,27 @@ def create_chunk( # ) # ) + # Trigger background processing task_process_conversation_chunk.send(chunk_id) + + # NEW: Proactive merge every 5 chunks for real-time viewing + from logging import getLogger + + from dembrane.tasks import task_merge_conversation_chunks_incremental + + logger = getLogger("dembrane.service.conversation") + + try: + chunk_count = self.get_chunk_counts(conversation_id)["total"] + logger.info(f"Conversation {conversation_id} now has {chunk_count} chunks") + + # Trigger incremental merge every 5 chunks (adjustable) + if chunk_count % 5 == 0 and chunk_count > 0: + logger.info(f"Triggering proactive merge for conversation {conversation_id} (chunk count: {chunk_count})") + task_merge_conversation_chunks_incremental.send(conversation_id) + except Exception as e: + # Don't fail chunk creation if merge trigger fails + logger.warning(f"Could not trigger proactive merge for {conversation_id}: {e}") return chunk @@ -373,6 +393,7 @@ def get_chunk_counts( "query": { "filter": {"conversation_id": conversation_id}, "fields": ["id", "error", "transcript"], + "limit": -1, } }, ) diff --git a/echo/server/dembrane/tasks.py b/echo/server/dembrane/tasks.py index eb4236ed..38dc37dc 100644 --- a/echo/server/dembrane/tasks.py +++ b/echo/server/dembrane/tasks.py @@ -1,5 +1,5 @@ from json import JSONDecodeError -from typing import Optional +from typing import List, Optional from logging import getLogger import dramatiq @@ -166,7 +166,10 @@ def task_summarize_conversation(conversation_id: str) -> None: @dramatiq.actor(store_results=True, queue_name="cpu", priority=10) def task_merge_conversation_chunks(conversation_id: str) -> None: """ - Merge conversation chunks. + Merge conversation chunks (called when conversation finishes). + + This is the "complete merge" that happens when conversation is marked finished. + For real-time/incremental merging, see task_merge_conversation_chunks_incremental. """ logger = getLogger("dembrane.tasks.task_merge_conversation_chunks") @@ -217,6 +220,109 @@ def task_merge_conversation_chunks(conversation_id: str) -> None: raise e from e +@dramatiq.actor(queue_name="cpu", priority=15, time_limit=15 * 60 * 1000) +def task_merge_conversation_chunks_incremental(conversation_id: str) -> None: + """ + Incrementally merge conversation chunks (called proactively during recording). + + This enables real-time viewing of ongoing conversations by maintaining + an up-to-date merged audio file. Runs in the background every N chunks. + + Approach: + - Check if merged audio already exists + - If exists and recent, skip (merge already up to date) + - Otherwise, merge all chunks and cache result + """ + from dembrane.service import conversation_service + from dembrane.directus import directus + from dembrane.audio_utils import merge_multiple_audio_files_and_save_to_s3 + + logger = getLogger("dembrane.tasks.task_merge_conversation_chunks_incremental") + logger.info(f"Incremental merge for conversation {conversation_id}") + + try: + conversation = conversation_service.get_by_id_or_raise(conversation_id) + + # Fetch chunks using directus directly (handle conversations with >1000 chunks) + chunks: List[dict] = [] + page = 1 + page_size = 500 + + while True: + batch = directus.get_items( + "conversation_chunk", + { + "query": { + "filter": {"conversation_id": {"_eq": conversation_id}}, + "sort": "timestamp", + "fields": ["id", "path", "timestamp"], + "limit": page_size, + "page": page, + }, + }, + ) + + if not batch: + break + + chunks.extend(batch) + + if len(batch) < page_size: + break + + page += 1 + + if not chunks: + logger.info(f"No chunks to merge for conversation {conversation_id}") + return + + # Check if merged file already exists + merged_key = f"audio-conversations/merged-{conversation_id}.mp3" + merged_path_in_db = conversation.get("merged_audio_path") + + # If we have a recent merge, check if it needs updating + if merged_path_in_db: + logger.info(f"Merged file path exists in DB for {conversation_id}") + + # For now, always re-merge to keep it fresh + # Future optimization: track last merge timestamp and only merge if new chunks added + logger.info(f"Re-merging to include latest chunks for {conversation_id}") + + # Merge all chunks + chunk_paths = [c["path"] for c in chunks if c.get("path")] + + if not chunk_paths: + logger.warning(f"No valid chunk paths for conversation {conversation_id}") + return + + logger.info(f"Merging {len(chunk_paths)} chunks for conversation {conversation_id}") + + with ProcessingStatusContext( + conversation_id=conversation_id, + event_prefix="task_merge_conversation_chunks_incremental", + ): + merged_path = merge_multiple_audio_files_and_save_to_s3( + input_file_names=chunk_paths, + output_file_name=merged_key, + output_format="mp3", + ) + + # Update conversation metadata with merged path + from dembrane.directus import directus + directus.update_item( + "conversation", + conversation_id, + {"merged_audio_path": merged_path} + ) + + logger.info(f"Incremental merge complete for conversation {conversation_id}: {merged_path}") + + except Exception as e: + logger.error(f"Error in incremental merge for {conversation_id}: {e}", exc_info=True) + # Don't re-raise - this is a background optimization, failures shouldn't break the main flow + # The final merge on conversation finish will ensure the file exists + + @dramatiq.actor( queue_name="cpu", priority=50, @@ -349,49 +455,85 @@ def task_finish_conversation_hook(conversation_id: str) -> None: # cpu because it is also bottlenecked by the cpu queue due to the split_audio_chunk task -@dramatiq.actor(queue_name="cpu", priority=0) +@dramatiq.actor(queue_name="cpu", priority=0, time_limit=10 * 60 * 1000) def task_process_conversation_chunk(chunk_id: str) -> None: """ - Process a conversation chunk. + Process a conversation chunk with intelligent triage. + + Improvements: + - Checks file size before deciding to split + - Small files skip splitting (fast path) + - Parallel dispatch of transcription and diarization + - Better error handling + + Old flow: Always split → transcribe + New flow: Triage → split if needed → transcribe in parallel """ + from dembrane.service import conversation_service + from dembrane.audio_utils import should_split_chunk, split_audio_chunk_streaming + logger = getLogger("dembrane.tasks.task_process_conversation_chunk") + logger.info(f"Processing conversation chunk: {chunk_id}") + try: - from dembrane.service import conversation_service - chunk = conversation_service.get_chunk_by_id_or_raise(chunk_id) - logger.debug(f"Chunk {chunk_id} found in conversation: {chunk['conversation_id']}") - - # critical section - with ProcessingStatusContext( - conversation_id=chunk["conversation_id"], - event_prefix="task_process_conversation_chunk.split_audio_chunk", - message=f"for chunk {chunk_id}", - ): - from dembrane.audio_utils import split_audio_chunk - - split_chunk_ids = split_audio_chunk(chunk_id, "mp3", delete_original=True) - - if split_chunk_ids is None: - logger.error(f"Split audio chunk result is None for chunk: {chunk_id}") - raise ValueError(f"Split audio chunk result is None for chunk: {chunk_id}") - - if "upload" not in str(chunk["source"]).lower(): - group([task_get_runpod_diarization.message(chunk_id)]).run() + conversation_id = chunk["conversation_id"] - logger.info(f"Split audio chunk result: {split_chunk_ids}") + split_chunk_ids: Optional[List[str]] = None - group( - [ - task_transcribe_chunk.message(cid, chunk["conversation_id"]) + # Intelligent triage: check if splitting is needed + needs_split = should_split_chunk(chunk_id, max_size_mb=20) + + if needs_split: + logger.info(f"Chunk {chunk_id} needs splitting") + + # Split using streaming method (faster) + with ProcessingStatusContext( + conversation_id=conversation_id, + event_prefix="task_process_conversation_chunk.split_audio_chunk", + message=f"for chunk {chunk_id}", + ): + split_chunk_ids = split_audio_chunk_streaming( + original_chunk_id=chunk_id, + output_format="mp3", + delete_original=True + ) + + if split_chunk_ids is None: + logger.error(f"Split audio chunk result is None for chunk: {chunk_id}") + raise ValueError(f"Split audio chunk result is None for chunk: {chunk_id}") + + logger.info(f"Chunk {chunk_id} split into {len(split_chunk_ids)} chunks") + + # Dispatch transcription for all split chunks IN PARALLEL + transcription_tasks = [ + task_transcribe_chunk.message(cid, conversation_id) for cid in split_chunk_ids if cid is not None ] - ).run() - - return - + + group(transcription_tasks).run() + logger.info(f"Dispatched {len(split_chunk_ids)} transcription tasks") + + else: + logger.info(f"Chunk {chunk_id} is small enough, skipping split") + + # Fast path: directly transcribe without splitting + task_transcribe_chunk.send(chunk_id, conversation_id) + logger.info(f"Dispatched direct transcription for {chunk_id}") + + # Diarization (if not from upload source) + source = str(chunk.get("source", "")).lower() + if "upload" not in source: + diarization_targets = split_chunk_ids if split_chunk_ids else [chunk_id] + for diarization_chunk_id in diarization_targets: + logger.info(f"Dispatching diarization for {diarization_chunk_id}") + task_get_runpod_diarization.send(diarization_chunk_id) + + logger.info(f"Processing complete for chunk {chunk_id}") + except Exception as e: - logger.error(f"Error processing conversation chunk@[{chunk_id}]: {e}") + logger.error(f"Error processing chunk {chunk_id}: {e}", exc_info=True) raise e from e diff --git a/echo/server/run-worker-cpu.sh b/echo/server/run-worker-cpu.sh index 9e72a8bd..fa6fb2ed 100755 --- a/echo/server/run-worker-cpu.sh +++ b/echo/server/run-worker-cpu.sh @@ -1 +1,2 @@ +#!/bin/bash dramatiq --queues cpu --processes 1 --threads 2 dembrane.tasks \ No newline at end of file diff --git a/echo/server/run-worker.sh b/echo/server/run-worker.sh index e56b3707..f6359889 100755 --- a/echo/server/run-worker.sh +++ b/echo/server/run-worker.sh @@ -1 +1,2 @@ +#!/bin/bash dramatiq-gevent --queues network --processes 1 --threads 2 dembrane.tasks \ No newline at end of file