From 170f3bd6592f9e88f2efd480acb1ee3efada30b2 Mon Sep 17 00:00:00 2001 From: Yash Hayaran Date: Sat, 16 Aug 2025 13:33:42 +0530 Subject: [PATCH 1/5] Realtime ASR micropphone fix --- riva/client/realtime.py | 4 +- scripts/asr/realtime_asr_client.py | 113 +++++++++++++++++++++++++---- 2 files changed, 101 insertions(+), 16 deletions(-) diff --git a/riva/client/realtime.py b/riva/client/realtime.py index 7ff2ad6..21d0362 100644 --- a/riva/client/realtime.py +++ b/riva/client/realtime.py @@ -350,7 +350,7 @@ async def _send_message(self, message: Dict[str, Any]): """Send a JSON message to the WebSocket server.""" await self.websocket.send(json.dumps(message)) - async def send_audio_chunks(self, audio_chunks): + async def send_audio_chunks(self, audio_chunks, delay: float = 0.01): """Send audio chunks to the server for transcription.""" logger.info("Sending audio chunks...") @@ -368,6 +368,8 @@ async def send_audio_chunks(self, audio_chunks): "type": "input_audio_buffer.commit", }) + # Sleep for delay to give time to receive responses, only incase of microphone input + await asyncio.sleep(delay) logger.info("All chunks sent") # Tell the server that we are done sending chunks diff --git a/scripts/asr/realtime_asr_client.py b/scripts/asr/realtime_asr_client.py index eb13c7b..bf625b7 100644 --- a/scripts/asr/realtime_asr_client.py +++ b/scripts/asr/realtime_asr_client.py @@ -106,12 +106,12 @@ def parse_args() -> argparse.Namespace: args = parser.parse_args() - # Validate input configuration - if not args.mic and not args.input_file: - parser.error("Either --input-file or --mic must be specified") - - if args.mic and args.input_file: - parser.error("Cannot specify both --input-file and --mic") + if not args.list_devices: + if not args.mic and not args.input_file: + parser.error("Either --input-file or --mic must be specified") + + if args.mic and args.input_file: + parser.error("Cannot specify both --input-file and --mic") return args @@ -143,7 +143,9 @@ async def create_audio_iterator(args): Returns: Audio iterator for streaming audio data """ - if args.mic: + delay = 0 + if args.mic: + delay = 0.01 # 10ms delay to give time to receive responses, only incase of microphone input # Only import when using microphone from riva.client.audio_io import MicrophoneStream @@ -152,11 +154,49 @@ async def create_audio_iterator(args): if device_index is None: device_index = get_default_device_index() - audio_chunk_iterator = MicrophoneStream( - args.sample_rate_hz, - args.file_streaming_chunk, + mic_stream = MicrophoneStream( + args.sample_rate_hz, + args.file_streaming_chunk, device=device_index ) + + # Initialize the stream (this starts the microphone) + audio_chunk_iterator = mic_stream.__enter__() + # Store the stream object for cleanup later + args._mic_stream = mic_stream + print("Recording indefinitely (press Ctrl+C to stop gracefully)...") + + class ImmediateAudioIterator: + def __init__(self, audio_iterator): + self.audio_iterator = audio_iterator + self._stop_requested = False + self.chunk_count = 0 + + def __iter__(self): + return self + + def __next__(self): + if self._stop_requested: + print("Stop requested, raising StopIteration") + raise StopIteration + + try: + chunk = next(self.audio_iterator) + self.chunk_count += 1 + return chunk + except StopIteration: + print(f"Audio iterator exhausted after {self.chunk_count} chunks") + raise + except Exception as e: + print(f"Error getting audio chunk #{self.chunk_count + 1}: {e}") + raise + + def stop(self): + self._stop_requested = True + + audio_chunk_iterator = ImmediateAudioIterator(audio_chunk_iterator) + # Store reference for signal handler + args._interruptible_iterator = audio_chunk_iterator args.num_channels = 1 else: wav_parameters = get_wav_file_parameters(args.input_file) @@ -169,7 +209,7 @@ async def create_audio_iterator(args): delay_callback=None ) - return audio_chunk_iterator + return audio_chunk_iterator, delay async def run_transcription(args): @@ -179,17 +219,19 @@ async def run_transcription(args): args: Command line arguments containing all configuration """ client = RealtimeClient(args=args) + send_task = None + receive_task = None try: # Create audio iterator - audio_chunk_iterator = await create_audio_iterator(args) + audio_chunk_iterator, delay = await create_audio_iterator(args) # Connect and start transcription await client.connect() # Run send and receive tasks concurrently send_task = asyncio.create_task( - client.send_audio_chunks(audio_chunk_iterator) + client.send_audio_chunks(audio_chunk_iterator, delay) ) receive_task = asyncio.create_task( client.receive_responses() @@ -200,11 +242,52 @@ async def run_transcription(args): # Save results if output file specified if args.output_text: client.save_responses(args.output_text) - + + except KeyboardInterrupt: + if hasattr(args, '_interruptible_iterator'): + args._interruptible_iterator.stop() + print("Audio input stopped") + + # Cancel the send task and wait for it to finish + if send_task and not send_task.done(): + print("Cancelling send task...") + send_task.cancel() + try: + await send_task + except asyncio.CancelledError: + pass + print("Send task cancelled") + + # Wait a bit for the receive task to process any remaining audio + if receive_task and not receive_task.done(): + print("Processing remaining audio...") + try: + await asyncio.wait_for(receive_task, timeout=5.0) + print("Receive task completed") + except asyncio.TimeoutError: + print("Receive task timeout, cancelling...") + receive_task.cancel() + try: + await receive_task + except asyncio.CancelledError: + pass + print("Receive task cancelled") + + print("Transcription stopped gracefully.") + except Exception as e: - print(f"Error: {e}") + print(f"Error during realtime transcription: {e}") raise + finally: + # Clean up microphone stream if it was created + if hasattr(args, '_mic_stream') and args._mic_stream is not None: + try: + args._mic_stream.close() + print("Microphone stream closed") + except Exception as e: + print(f"Warning: Error closing microphone stream: {e}") + await client.disconnect() From 5f2da7a7a59e7f0c990b6f1684c95dbde8144904 Mon Sep 17 00:00:00 2001 From: Yash Hayaran Date: Sun, 17 Aug 2025 14:04:40 +0530 Subject: [PATCH 2/5] Set default server to localhost for realtime ASR client --- scripts/asr/realtime_asr_client.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/scripts/asr/realtime_asr_client.py b/scripts/asr/realtime_asr_client.py index bf625b7..9d84421 100644 --- a/scripts/asr/realtime_asr_client.py +++ b/scripts/asr/realtime_asr_client.py @@ -95,6 +95,9 @@ def parse_args() -> argparse.Namespace: # Add connection parameters parser = add_connection_argparse_parameters(parser) + # Override default server for realtime ASR (WebSocket endpoint, not gRPC) + parser.set_defaults(server="localhost:9090") + # Add ASR and realtime configuration parameters parser = add_asr_config_argparse_parameters( parser, From 21cb76dec48f4b236fba4158ea40ccad68f77eae Mon Sep 17 00:00:00 2001 From: Yash Hayaran Date: Mon, 18 Aug 2025 14:41:26 +0530 Subject: [PATCH 3/5] refactor argument parsing in realtime ASR client to use mutually exclusive input options and update default server port to 9000 --- scripts/asr/realtime_asr_client.py | 31 ++++++++++++------------------ 1 file changed, 12 insertions(+), 19 deletions(-) diff --git a/scripts/asr/realtime_asr_client.py b/scripts/asr/realtime_asr_client.py index 9d84421..f8ff617 100644 --- a/scripts/asr/realtime_asr_client.py +++ b/scripts/asr/realtime_asr_client.py @@ -30,17 +30,22 @@ def parse_args() -> argparse.Namespace: ) # Input configuration - parser.add_argument( + input_group = parser.add_mutually_exclusive_group(required=True) + input_group.add_argument( "--input-file", - required=False, - help="Input audio file (required when not using --mic)" + help="Input audio file" ) - parser.add_argument( + input_group.add_argument( "--mic", action="store_true", - help="Use microphone input instead of file input", - default=False + help="Use microphone input instead of file input" ) + input_group.add_argument( + "--list-devices", + action="store_true", + help="List available input audio device indices" + ) + parser.add_argument( "--duration", type=int, @@ -54,11 +59,6 @@ def parse_args() -> argparse.Namespace: default=None, help="Input audio device index to use (only used with --mic). If not specified, will use default device." ) - parser.add_argument( - "--list-devices", - action="store_true", - help="List available input audio device indices" - ) # Audio parameters parser.add_argument( @@ -96,7 +96,7 @@ def parse_args() -> argparse.Namespace: parser = add_connection_argparse_parameters(parser) # Override default server for realtime ASR (WebSocket endpoint, not gRPC) - parser.set_defaults(server="localhost:9090") + parser.set_defaults(server="localhost:9000") # Add ASR and realtime configuration parameters parser = add_asr_config_argparse_parameters( @@ -109,13 +109,6 @@ def parse_args() -> argparse.Namespace: args = parser.parse_args() - if not args.list_devices: - if not args.mic and not args.input_file: - parser.error("Either --input-file or --mic must be specified") - - if args.mic and args.input_file: - parser.error("Cannot specify both --input-file and --mic") - return args From 45a9e7a8962592060f7dcb4d3b2554ac7df7e326 Mon Sep 17 00:00:00 2001 From: Yash Hayaran Date: Fri, 22 Aug 2025 11:40:34 +0530 Subject: [PATCH 4/5] refactor logging in RealtimeClient to use debug level for detailed internal state and error messages --- riva/client/realtime.py | 62 +++++++++++++++++++---------------------- 1 file changed, 28 insertions(+), 34 deletions(-) diff --git a/riva/client/realtime.py b/riva/client/realtime.py index 21d0362..cd7a3b5 100644 --- a/riva/client/realtime.py +++ b/riva/client/realtime.py @@ -38,10 +38,6 @@ def __init__(self, args: argparse.Namespace): self.input_playback_thread = None self.is_input_playing = False self.input_buffer_size = 1024 # Buffer size for input audio playback - - # Transcription results - self.delta_transcripts: List[str] = [] - self.interim_final_transcripts: List[str] = [] self.final_transcript: str = "" self.is_config_updated = False @@ -58,13 +54,13 @@ async def connect(self): await self._initialize_session() except requests.exceptions.RequestException as e: - logger.error(f"HTTP request failed: {e}") + logger.error("HTTP request failed: %s", e) raise except WebSocketException as e: - logger.error(f"WebSocket connection failed: {e}") + logger.error("WebSocket connection failed: %s", e) raise except Exception as e: - logger.error(f"Unexpected error during connection: {e}") + logger.error("Unexpected error during connection: %s", e) raise async def _initialize_http_session(self) -> Dict[str, Any]: @@ -73,7 +69,7 @@ async def _initialize_http_session(self) -> Dict[str, Any]: uri = f"http://{self.args.server}/v1/realtime/transcription_sessions" if self.args.use_ssl: uri = f"https://{self.args.server}/v1/realtime/transcription_sessions" - logger.info(f"Initializing session via HTTP POST request to: {uri}") + logger.debug("Initializing session via HTTP POST request to: %s", uri) response = requests.post( uri, headers=headers, @@ -89,7 +85,7 @@ async def _initialize_http_session(self) -> Dict[str, Any]: ) session_data = response.json() - logger.info(f"Session initialized: {session_data}") + logger.debug("Session initialized: %s", session_data) return session_data async def _connect_websocket(self): @@ -110,7 +106,7 @@ async def _connect_websocket(self): ssl_context.check_hostname = False # ssl_context.verify_mode = ssl.CERT_REQUIRED - logger.info(f"Connecting to WebSocket: {ws_url}") + logger.debug("Connecting to WebSocket: %s", ws_url) self.websocket = await websockets.connect(ws_url, ssl=ssl_context) async def _initialize_session(self): @@ -119,14 +115,14 @@ async def _initialize_session(self): # Handle first response: "conversation.created" response = await self.websocket.recv() response_data = json.loads(response) - logger.info("Session created: %s", response_data) + logger.debug("Session created: %s", response_data) event_type = response_data.get("type", "") if event_type == "conversation.created": - logger.info("Conversation created successfully") + logger.debug("Conversation created successfully") logger.debug("Response structure: %s", list(response_data.keys())) else: - logger.warning(f"Unexpected first response type: {event_type}") + logger.warning("Unexpected first response type: %s", event_type) logger.debug("Full response: %s", response_data) # Update session configuration @@ -135,16 +131,16 @@ async def _initialize_session(self): logger.error("Failed to update session") raise Exception("Failed to update session") - logger.info("Session initialization complete") + logger.debug("Session initialization complete") except json.JSONDecodeError as e: - logger.error(f"Failed to parse JSON response: {e}") + logger.error("Failed to parse JSON response: %s", e) raise except KeyError as e: - logger.error(f"Missing expected key in response: {e}") + logger.error("Missing expected key in response: %s", e) raise except Exception as e: - logger.error(f"Unexpected error during session initialization: {e}") + logger.error("Unexpected error during session initialization: %s", e) raise def _safe_update_config(self, config: Dict[str, Any], key: str, value: Any, section: str = None): @@ -160,10 +156,10 @@ def _safe_update_config(self, config: Dict[str, Any], key: str, value: Any, sect if section not in config: config[section] = {} config[section][key] = value - logger.debug(f"Updated {section}.{key} = {value}") + logger.debug("Updated %s.%s = %s", section, key, value) else: config[key] = value - logger.debug(f"Updated {key} = {value}") + logger.debug("Updated %s = %s", key, value) async def _update_session(self) -> bool: """Update session configuration by selectively overriding server defaults. @@ -171,8 +167,8 @@ async def _update_session(self) -> bool: Returns: True if session was updated successfully, False otherwise """ - logger.info("Updating session configuration...") - logger.info(f"Server default config: {self.session_config}") + logger.debug("Updating session configuration...") + logger.debug("Server default config: %s", self.session_config) # Create a copy of the session config from server defaults session_config = self.session_config.copy() @@ -260,11 +256,11 @@ async def _update_session(self) -> bool: overrides.append("custom_configuration") if overrides: - logger.info(f"Overriding server defaults for: {', '.join(overrides)}") + logger.debug("Overriding server defaults for: %s", ', '.join(overrides)) else: - logger.info("Using server default configuration (no overrides)") + logger.debug("Using server default configuration (no overrides)") - logger.info(f"Final session config: {session_config}") + logger.debug("Final session config: %s", session_config) # Send update request update_session_request = { @@ -333,16 +329,16 @@ async def _handle_session_update_response(self) -> bool: """ response = await self.websocket.recv() response_data = json.loads(response) - logger.info("Session updated: %s", response_data) + logger.info("Current Session Config: %s", response_data) event_type = response_data.get("type", "") if event_type == "transcription_session.updated": - logger.info("Transcription session updated successfully") + logger.debug("Transcription session updated successfully") logger.debug("Response structure: %s", list(response_data.keys())) self.session_config = response_data["session"] return True else: - logger.warning(f"Unexpected response type: {event_type}") + logger.warning("Unexpected response type: %s", event_type) logger.debug("Full response: %s", response_data) return False @@ -352,7 +348,7 @@ async def _send_message(self, message: Dict[str, Any]): async def send_audio_chunks(self, audio_chunks, delay: float = 0.01): """Send audio chunks to the server for transcription.""" - logger.info("Sending audio chunks...") + logger.debug("Sending audio chunks...") for chunk in audio_chunks: chunk_base64 = base64.b64encode(chunk).decode("utf-8") @@ -370,7 +366,7 @@ async def send_audio_chunks(self, audio_chunks, delay: float = 0.01): # Sleep for delay to give time to receive responses, only incase of microphone input await asyncio.sleep(delay) - logger.info("All chunks sent") + logger.debug("All chunks sent") # Tell the server that we are done sending chunks await self._send_message({ @@ -379,7 +375,7 @@ async def send_audio_chunks(self, audio_chunks, delay: float = 0.01): async def receive_responses(self): """Receive and process transcription responses from the server.""" - logger.info("Listening for responses...") + logger.debug("Listening for responses...") received_final_response = False while not received_final_response: @@ -391,12 +387,10 @@ async def receive_responses(self): if event_type == "conversation.item.input_audio_transcription.delta": delta = event.get("delta", "") logger.info("Transcript: %s", delta) - self.delta_transcripts.append(delta) elif event_type == "conversation.item.input_audio_transcription.completed": is_last_result = event.get("is_last_result", False) interim_final_transcript = event.get("transcript", "") - self.interim_final_transcripts.append(interim_final_transcript) self.final_transcript = interim_final_transcript if is_last_result: @@ -419,7 +413,7 @@ async def receive_responses(self): except asyncio.TimeoutError: continue except Exception as e: - logger.error(f"Error: {e}") + logger.error("Error: %s", e) break def save_responses(self, output_text_file: str): @@ -433,7 +427,7 @@ def save_responses(self, output_text_file: str): with open(output_text_file, "w") as f: f.write(self.final_transcript) except Exception as e: - logger.error(f"Error saving text: {e}") + logger.error("Error saving text: %s", e) async def disconnect(self): """Close the WebSocket connection.""" From 3d78ce6bed5e8303ce66a891a8a8f450493a2211 Mon Sep 17 00:00:00 2001 From: Yash Hayaran Date: Fri, 22 Aug 2025 17:58:12 +0530 Subject: [PATCH 5/5] Enhance RealtimeClient to support microphone input with PCM16 encoding and improve audio chunk handling with async iteration. Update logging for word information formatting and handle timeouts during audio processing. --- riva/client/realtime.py | 89 ++++++++++++++++++++++++------ scripts/asr/realtime_asr_client.py | 37 +++++++------ 2 files changed, 91 insertions(+), 35 deletions(-) diff --git a/riva/client/realtime.py b/riva/client/realtime.py index cd7a3b5..1bd382a 100644 --- a/riva/client/realtime.py +++ b/riva/client/realtime.py @@ -175,6 +175,14 @@ async def _update_session(self) -> bool: # Track what we're overriding overrides = [] + + # Check if the input is microphone, then set the encoding to pcm16 + if hasattr(self.args, 'mic') and self.args.mic: + self._safe_update_config(session_config, "input_audio_format", "pcm16") + overrides.append("input_audio_format") + else: + self._safe_update_config(session_config, "input_audio_format", "none") + overrides.append("input_audio_format") # Update input audio transcription - only override if args are provided if hasattr(self.args, 'language_code') and self.args.language_code: @@ -346,26 +354,50 @@ async def _send_message(self, message: Dict[str, Any]): """Send a JSON message to the WebSocket server.""" await self.websocket.send(json.dumps(message)) - async def send_audio_chunks(self, audio_chunks, delay: float = 0.01): + async def send_audio_chunks(self, audio_chunks): """Send audio chunks to the server for transcription.""" logger.debug("Sending audio chunks...") - for chunk in audio_chunks: - chunk_base64 = base64.b64encode(chunk).decode("utf-8") - - # Send chunk to the server - await self._send_message({ - "type": "input_audio_buffer.append", - "audio": chunk_base64, - }) - - # Commit the chunk - await self._send_message({ - "type": "input_audio_buffer.commit", - }) - - # Sleep for delay to give time to receive responses, only incase of microphone input - await asyncio.sleep(delay) + # Check if the audio_chunks supports async iteration + if hasattr(audio_chunks, '__aiter__'): + # Use async for for async iterators - this allows proper task switching + async for chunk in audio_chunks: + try: + chunk_base64 = base64.b64encode(chunk).decode("utf-8") + + # Send chunk to the server + await self._send_message({ + "type": "input_audio_buffer.append", + "audio": chunk_base64, + }) + + # Commit the chunk + await self._send_message({ + "type": "input_audio_buffer.commit", + }) + except TimeoutError: + # Handle timeout from AsyncAudioIterator - no audio available, continue + logger.debug("No audio chunk available within timeout, continuing...") + continue + except Exception as e: + logger.error(f"Error processing audio chunk: {e}") + continue + else: + # Fallback for regular iterators + for chunk in audio_chunks: + chunk_base64 = base64.b64encode(chunk).decode("utf-8") + + # Send chunk to the server + await self._send_message({ + "type": "input_audio_buffer.append", + "audio": chunk_base64, + }) + + # Commit the chunk + await self._send_message({ + "type": "input_audio_buffer.commit", + }) + logger.debug("All chunks sent") # Tell the server that we are done sending chunks @@ -401,7 +433,28 @@ async def receive_responses(self): else: logger.info("Interim Transcript: %s", interim_final_transcript) - logger.info("Words Info: %s", event.get("words_info", "")) + # Format Words Info similar to print_streaming function + words_info = event.get("words_info", {}) + if words_info and "words" in words_info: + print("Words Info:") + + # Create header format similar to print_streaming + header_format = '{: <40s}{: <16s}{: <16s}{: <16s}{: <16s}' + header_values = ['Word', 'Start (ms)', 'End (ms)', 'Confidence', 'Speaker'] + print(header_format.format(*header_values)) + + # Print each word with formatted information + for word_data in words_info["words"]: + word = word_data.get("word", "") + start_time = word_data.get("start_time", 0) + end_time = word_data.get("end_time", 0) + confidence = word_data.get("confidence", 0.0) + speaker_tag = word_data.get("speaker_tag", 0) + + # Format the word info line similar to print_streaming + word_format = '{: <40s}{: <16.0f}{: <16.0f}{: <16.4f}{: <16d}' + word_values = [word, start_time, end_time, confidence, speaker_tag] + print(word_format.format(*word_values)) elif "error" in event_type.lower(): logger.error( diff --git a/scripts/asr/realtime_asr_client.py b/scripts/asr/realtime_asr_client.py index f8ff617..045595d 100644 --- a/scripts/asr/realtime_asr_client.py +++ b/scripts/asr/realtime_asr_client.py @@ -139,10 +139,7 @@ async def create_audio_iterator(args): Returns: Audio iterator for streaming audio data """ - delay = 0 if args.mic: - delay = 0.01 # 10ms delay to give time to receive responses, only incase of microphone input - # Only import when using microphone from riva.client.audio_io import MicrophoneStream # Get default device index if not specified @@ -161,28 +158,35 @@ async def create_audio_iterator(args): # Store the stream object for cleanup later args._mic_stream = mic_stream print("Recording indefinitely (press Ctrl+C to stop gracefully)...") - - class ImmediateAudioIterator: + + class AsyncAudioIterator: + """Async wrapper for blocking audio iterators to prevent event loop starvation.""" def __init__(self, audio_iterator): self.audio_iterator = audio_iterator self._stop_requested = False self.chunk_count = 0 - def __iter__(self): + def __aiter__(self): return self - def __next__(self): + async def __anext__(self): if self._stop_requested: - print("Stop requested, raising StopIteration") - raise StopIteration + raise StopAsyncIteration try: - chunk = next(self.audio_iterator) + # Add timeout to prevent hanging when no audio is available + chunk = await asyncio.wait_for( + asyncio.to_thread(lambda: next(self.audio_iterator)), + timeout=1.0 # 1 second timeout + ) self.chunk_count += 1 return chunk + except asyncio.TimeoutError: + # Return empty chunk or raise custom exception + raise TimeoutError("No audio chunk available within timeout") except StopIteration: print(f"Audio iterator exhausted after {self.chunk_count} chunks") - raise + raise StopAsyncIteration except Exception as e: print(f"Error getting audio chunk #{self.chunk_count + 1}: {e}") raise @@ -190,9 +194,8 @@ def __next__(self): def stop(self): self._stop_requested = True - audio_chunk_iterator = ImmediateAudioIterator(audio_chunk_iterator) - # Store reference for signal handler - args._interruptible_iterator = audio_chunk_iterator + # Use async iterator to prevent event loop starvation + audio_chunk_iterator = AsyncAudioIterator(audio_chunk_iterator) args.num_channels = 1 else: wav_parameters = get_wav_file_parameters(args.input_file) @@ -205,7 +208,7 @@ def stop(self): delay_callback=None ) - return audio_chunk_iterator, delay + return audio_chunk_iterator async def run_transcription(args): @@ -220,14 +223,14 @@ async def run_transcription(args): try: # Create audio iterator - audio_chunk_iterator, delay = await create_audio_iterator(args) + audio_chunk_iterator = await create_audio_iterator(args) # Connect and start transcription await client.connect() # Run send and receive tasks concurrently send_task = asyncio.create_task( - client.send_audio_chunks(audio_chunk_iterator, delay) + client.send_audio_chunks(audio_chunk_iterator) ) receive_task = asyncio.create_task( client.receive_responses()