From 1ca1da3e5a6490a3f08ae24b6872003eebb556a9 Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Thu, 20 Nov 2025 05:40:29 +0200 Subject: [PATCH 1/5] add tts and stt --- dimos/agents2/cli/web.py | 84 ++++++++++++++++ dimos/agents2/skills/speak_skill.py | 98 +++++++++++++++++++ dimos/robot/all_blueprints.py | 2 + .../unitree_webrtc/unitree_go2_blueprints.py | 4 + .../unitree_webrtc/unitree_skill_container.py | 5 - 5 files changed, 188 insertions(+), 5 deletions(-) create mode 100644 dimos/agents2/cli/web.py create mode 100644 dimos/agents2/skills/speak_skill.py diff --git a/dimos/agents2/cli/web.py b/dimos/agents2/cli/web.py new file mode 100644 index 0000000000..d58951207c --- /dev/null +++ b/dimos/agents2/cli/web.py @@ -0,0 +1,84 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from threading import Thread + +import reactivex as rx +from reactivex.disposable import Disposable +import reactivex.operators as ops + +from dimos.core import Module, rpc +from dimos.core.transport import pLCMTransport +from dimos.stream.audio.node_normalizer import AudioNormalizer +from dimos.stream.audio.stt.node_whisper import WhisperNode +from dimos.utils.logging_config import setup_logger +from dimos.web.robot_web_interface import RobotWebInterface + +logger = setup_logger(__name__) + + +class WebInput(Module): + _web_interface: RobotWebInterface = None + _thread: Thread = None + _human_transport: pLCMTransport | None = None + + @rpc + def start(self) -> None: + super().start() + + self._human_transport = pLCMTransport("/human_input") + + audio_subject = rx.subject.Subject() + + self._web_interface = RobotWebInterface( + port=5555, + text_streams={"agent_responses": rx.subject.Subject()}, + audio_subject=audio_subject, + ) + + normalizer = AudioNormalizer() + stt_node = WhisperNode() + + # Connect audio pipeline: browser audio → normalizer → whisper + normalizer.consume_audio(audio_subject.pipe(ops.share())) + stt_node.consume_audio(normalizer.emit_audio()) + + # Subscribe to both text input sources + # 1. Direct text from web interface + unsub = self._web_interface.query_stream.subscribe(self._human_transport.publish) + self._disposables.add(Disposable(unsub)) + + # 2. Transcribed text from STT + unsub = stt_node.emit_text().subscribe(self._human_transport.publish) + self._disposables.add(Disposable(unsub)) + + self._thread = Thread(target=self._web_interface.run, daemon=True) + self._thread.start() + + logger.info("Web interface started at http://localhost:5555") + + @rpc + def stop(self) -> None: + if self._web_interface: + self._web_interface.stop() + if self._thread: + self._thread.join(timeout=1.0) + if self._human_transport: + self._human_transport.close() + super().stop() + + +web_input = WebInput.blueprint + +__all__ = ["web_input", "WebInput"] \ No newline at end of file diff --git a/dimos/agents2/skills/speak_skill.py b/dimos/agents2/skills/speak_skill.py new file mode 100644 index 0000000000..1a26eb7439 --- /dev/null +++ b/dimos/agents2/skills/speak_skill.py @@ -0,0 +1,98 @@ +# Copyright 2025 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +import time + +from reactivex import Subject + +from dimos.core.core import rpc +from dimos.core.skill_module import SkillModule +from dimos.protocol.skill.skill import skill +from dimos.stream.audio.node_output import SounddeviceAudioOutput +from dimos.stream.audio.tts.node_openai import OpenAITTSNode, Voice +from dimos.utils.logging_config import setup_logger + +logger = setup_logger("dimos.agents2.skills.speak_skill") + + +class SpeakSkill(SkillModule): + _tts_node: OpenAITTSNode | None = None + _audio_output: SounddeviceAudioOutput | None = None + _audio_lock: threading.Lock = threading.Lock() + + @rpc + def start(self) -> None: + super().start() + self._tts_node = OpenAITTSNode(speed=1.2, voice=Voice.ONYX) + self._audio_output = SounddeviceAudioOutput(sample_rate=24000) + self._audio_output.consume_audio(self._tts_node.emit_audio()) + + @rpc + def stop(self) -> None: + if self._tts_node: + self._tts_node.dispose() + self._tts_node = None + if self._audio_output: + self._audio_output.stop() + self._audio_output = None + super().stop() + + @skill() + def speak(self, text: str) -> str: + """Speak text out loud through the robot's speakers. + + USE THIS TOOL AS OFTEN AS NEEDED. People can't normally see what you say in text, but can hear what you speak. + + Try to be as concise as possible. Remember that speaking takes time, so get to the point quickly. + + Example usage: + + speak("Hello, I am your robot assistant.") + """ + if self._tts_node is None: + return "Error: TTS not initialized" + + # Use lock to prevent simultaneous speech + with self._audio_lock: + text_subject = Subject() + audio_complete = threading.Event() + self._tts_node.consume_text(text_subject) + subscription = self._tts_node.emit_text().subscribe( + on_next=lambda t: logger.debug(f"TTS processing: {t}"), + on_completed=audio_complete.set, + on_error=audio_complete.set, + ) + + text_subject.on_next(text) + text_subject.on_completed() + + timeout = max(5, len(text) * 0.1) + + if not audio_complete.wait(timeout=timeout): + logger.warning(f"TTS timeout reached for: {text}") + subscription.dispose() + return f"Warning: TTS timeout while speaking: {text}" + else: + # Small delay to ensure buffers flush + time.sleep(0.3) + + subscription.dispose() + + return f"Spoke: {text}" + + +speak_skill = SpeakSkill.blueprint + +__all__ = ["SpeakSkill", "speak_skill"] diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 6121a8bc4d..177a9aaa2c 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -62,10 +62,12 @@ "osm_skill": "dimos.agents2.skills.osm", "ros_nav": "dimos.navigation.rosnav", "spatial_memory": "dimos.perception.spatial_perception", + "speak_skill": "dimos.agents2.skills.speak_skill", "unitree_skills": "dimos.robot.unitree_webrtc.unitree_skill_container", "utilization": "dimos.utils.monitoring", "wavefront_frontier_explorer": "dimos.navigation.frontier_exploration.wavefront_frontier_goal_selector", "websocket_vis": "dimos.web.websocket_vis.websocket_vis_module", + "web_input": "dimos.agents2.cli.web", } diff --git a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py index 8973f6cd68..36565a80f7 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py +++ b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py @@ -18,7 +18,9 @@ from dimos.agents2.agent import llm_agent from dimos.agents2.cli.human import human_input +from dimos.agents2.cli.web import web_input from dimos.agents2.skills.navigation import navigation_skill +from dimos.agents2.skills.speak_skill import speak_skill from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE from dimos.core.blueprints import autoconnect from dimos.core.transport import JpegLcmTransport, JpegShmTransport, LCMTransport, pSHMTransport @@ -113,4 +115,6 @@ human_input(), navigation_skill(), unitree_skills(), + web_input(), + speak_skill(), ) diff --git a/dimos/robot/unitree_webrtc/unitree_skill_container.py b/dimos/robot/unitree_webrtc/unitree_skill_container.py index 8fca216d04..1a1c80bc35 100644 --- a/dimos/robot/unitree_webrtc/unitree_skill_container.py +++ b/dimos/robot/unitree_webrtc/unitree_skill_container.py @@ -105,11 +105,6 @@ def current_time(self): yield str(datetime.datetime.now()) time.sleep(1) - @skill() - def speak(self, text: str) -> str: - """Speak text out loud through the robot's speakers.""" - return f"This is being said aloud: {text}" - @skill() def execute_sport_command(self, command_name: str) -> str: if self._publish_request is None: From 1b002fb0856f62c81d7ea35e3da7f1a256ec5a33 Mon Sep 17 00:00:00 2001 From: paul-nechifor <1262969+paul-nechifor@users.noreply.github.com> Date: Thu, 20 Nov 2025 06:52:44 +0000 Subject: [PATCH 2/5] CI code cleanup --- dimos/agents2/cli/web.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dimos/agents2/cli/web.py b/dimos/agents2/cli/web.py index d58951207c..515c0260aa 100644 --- a/dimos/agents2/cli/web.py +++ b/dimos/agents2/cli/web.py @@ -81,4 +81,4 @@ def stop(self) -> None: web_input = WebInput.blueprint -__all__ = ["web_input", "WebInput"] \ No newline at end of file +__all__ = ["WebInput", "web_input"] From bb2958602d7d01a1093c94286968095de3245201 Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Sun, 23 Nov 2025 02:46:19 +0200 Subject: [PATCH 3/5] fix code review comment --- dimos/agents2/skills/speak_skill.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/dimos/agents2/skills/speak_skill.py b/dimos/agents2/skills/speak_skill.py index 1a26eb7439..2b0178200f 100644 --- a/dimos/agents2/skills/speak_skill.py +++ b/dimos/agents2/skills/speak_skill.py @@ -69,10 +69,13 @@ def speak(self, text: str) -> str: text_subject = Subject() audio_complete = threading.Event() self._tts_node.consume_text(text_subject) + + def set_as_complete(t: str): + audio_complete.set() + subscription = self._tts_node.emit_text().subscribe( - on_next=lambda t: logger.debug(f"TTS processing: {t}"), - on_completed=audio_complete.set, - on_error=audio_complete.set, + on_next=set_as_complete, + on_error=set_as_complete, ) text_subject.on_next(text) From 75895146014d21cc10d2829707a9667cbadb26b8 Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Thu, 27 Nov 2025 03:45:40 +0200 Subject: [PATCH 4/5] fix mypy --- dimos/agents2/cli/web.py | 21 +++++++++++---------- dimos/agents2/skills/speak_skill.py | 9 ++++++--- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/dimos/agents2/cli/web.py b/dimos/agents2/cli/web.py index 515c0260aa..026bbe915d 100644 --- a/dimos/agents2/cli/web.py +++ b/dimos/agents2/cli/web.py @@ -13,9 +13,9 @@ # limitations under the License. from threading import Thread +from typing import TYPE_CHECKING import reactivex as rx -from reactivex.disposable import Disposable import reactivex.operators as ops from dimos.core import Module, rpc @@ -25,13 +25,16 @@ from dimos.utils.logging_config import setup_logger from dimos.web.robot_web_interface import RobotWebInterface +if TYPE_CHECKING: + from dimos.stream.audio.base import AudioEvent + logger = setup_logger(__name__) class WebInput(Module): - _web_interface: RobotWebInterface = None - _thread: Thread = None - _human_transport: pLCMTransport | None = None + _web_interface: RobotWebInterface | None = None + _thread: Thread | None = None + _human_transport: pLCMTransport[str] | None = None @rpc def start(self) -> None: @@ -39,7 +42,7 @@ def start(self) -> None: self._human_transport = pLCMTransport("/human_input") - audio_subject = rx.subject.Subject() + audio_subject: rx.subject.Subject[AudioEvent] = rx.subject.Subject() self._web_interface = RobotWebInterface( port=5555, @@ -57,11 +60,11 @@ def start(self) -> None: # Subscribe to both text input sources # 1. Direct text from web interface unsub = self._web_interface.query_stream.subscribe(self._human_transport.publish) - self._disposables.add(Disposable(unsub)) + self._disposables.add(unsub) # 2. Transcribed text from STT unsub = stt_node.emit_text().subscribe(self._human_transport.publish) - self._disposables.add(Disposable(unsub)) + self._disposables.add(unsub) self._thread = Thread(target=self._web_interface.run, daemon=True) self._thread.start() @@ -70,12 +73,10 @@ def start(self) -> None: @rpc def stop(self) -> None: - if self._web_interface: - self._web_interface.stop() if self._thread: self._thread.join(timeout=1.0) if self._human_transport: - self._human_transport.close() + self._human_transport.lcm.stop() super().stop() diff --git a/dimos/agents2/skills/speak_skill.py b/dimos/agents2/skills/speak_skill.py index 2b0178200f..0623824266 100644 --- a/dimos/agents2/skills/speak_skill.py +++ b/dimos/agents2/skills/speak_skill.py @@ -66,16 +66,19 @@ def speak(self, text: str) -> str: # Use lock to prevent simultaneous speech with self._audio_lock: - text_subject = Subject() + text_subject: Subject[str] = Subject() audio_complete = threading.Event() self._tts_node.consume_text(text_subject) - def set_as_complete(t: str): + def set_as_complete(_t: str) -> None: + audio_complete.set() + + def set_as_complete_e(_e: Exception) -> None: audio_complete.set() subscription = self._tts_node.emit_text().subscribe( on_next=set_as_complete, - on_error=set_as_complete, + on_error=set_as_complete_e, ) text_subject.on_next(text) From 0c9932fbf31692e3b48d87d2612826e1bae993b6 Mon Sep 17 00:00:00 2001 From: Paul Nechifor Date: Thu, 27 Nov 2025 03:55:44 +0200 Subject: [PATCH 5/5] fix mypy --- dimos/agents2/cli/web.py | 2 ++ dimos/web/dimos_interface/api/server.py | 17 +++++++++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/dimos/agents2/cli/web.py b/dimos/agents2/cli/web.py index 026bbe915d..cb589ac5ce 100644 --- a/dimos/agents2/cli/web.py +++ b/dimos/agents2/cli/web.py @@ -73,6 +73,8 @@ def start(self) -> None: @rpc def stop(self) -> None: + if self._web_interface: + self._web_interface.shutdown() if self._thread: self._thread.join(timeout=1.0) if self._human_transport: diff --git a/dimos/web/dimos_interface/api/server.py b/dimos/web/dimos_interface/api/server.py index 5db477dbca..faa787851e 100644 --- a/dimos/web/dimos_interface/api/server.py +++ b/dimos/web/dimos_interface/api/server.py @@ -68,6 +68,7 @@ def __init__( # type: ignore[no-untyped-def] print("Starting FastAPIServer initialization...") # Debug print super().__init__(dev_name, edge_type) self.app = FastAPI() + self._server: uvicorn.Server | None = None # Add CORS middleware with more permissive settings for development self.app.add_middleware( @@ -353,10 +354,18 @@ async def text_stream(key: str): # type: ignore[no-untyped-def] self.app.get(f"/video_feed/{key}")(self.create_video_feed_route(key)) # type: ignore[no-untyped-call] def run(self) -> None: - """Run the FastAPI server.""" - uvicorn.run( - self.app, host=self.host, port=self.port - ) # TODO: Translate structure to enable in-built workers' + config = uvicorn.Config( + self.app, + host=self.host, + port=self.port, + log_level="error", # Reduce verbosity + ) + self._server = uvicorn.Server(config) + self._server.run() + + def shutdown(self) -> None: + if self._server is not None: + self._server.should_exit = True if __name__ == "__main__":