From 1ec549cccf8651c21de2768d44c3a29585dc2dd1 Mon Sep 17 00:00:00 2001 From: stash Date: Wed, 7 Jan 2026 12:17:17 -0800 Subject: [PATCH 1/9] Working VLM agent via Agentspec on go2 replay --- dimos/agents/__init__.py | 3 +- dimos/agents/agent.py | 7 + dimos/agents/modules/vlm_agent.py | 154 +++++++++++++++++ dimos/agents/modules/vlm_stream_tester.py | 156 ++++++++++++++++++ dimos/agents/spec.py | 5 + dimos/robot/all_blueprints.py | 1 + .../unitree_webrtc/unitree_go2_blueprints.py | 8 + 7 files changed, 333 insertions(+), 1 deletion(-) create mode 100644 dimos/agents/modules/vlm_agent.py create mode 100644 dimos/agents/modules/vlm_stream_tester.py diff --git a/dimos/agents/__init__.py b/dimos/agents/__init__.py index 9e1dd2df77..a3c73e852d 100644 --- a/dimos/agents/__init__.py +++ b/dimos/agents/__init__.py @@ -8,8 +8,9 @@ ) from dimos.agents.agent import Agent, deploy +from dimos.agents.modules.vlm_agent import VLMAgent from dimos.agents.spec import AgentSpec from dimos.protocol.skill.skill import skill from dimos.protocol.skill.type import Output, Reducer, Stream -__all__ = ["Agent", "AgentSpec", "Output", "Reducer", "Stream", "deploy", "skill"] +__all__ = ["Agent", "AgentSpec", "Output", "Reducer", "Stream", "VLMAgent", "deploy", "skill"] diff --git a/dimos/agents/agent.py b/dimos/agents/agent.py index bf5ded4f00..b9f17cf984 100644 --- a/dimos/agents/agent.py +++ b/dimos/agents/agent.py @@ -33,6 +33,7 @@ from dimos.agents.spec import AgentSpec, Model, Provider from dimos.agents.system_prompt import SYSTEM_PROMPT from dimos.core import DimosCluster, rpc +from dimos.msgs.sensor_msgs import Image from dimos.protocol.skill.coordinator import SkillCoordinator, SkillState, SkillStateDict from dimos.protocol.skill.skill import SkillContainer from dimos.protocol.skill.type import Output @@ -362,6 +363,12 @@ def query(self, query: str): # type: ignore[no-untyped-def] # return sync(self._loop, self.agent_loop, query) return asyncio.run_coroutine_threadsafe(self.agent_loop(query), self._loop).result() # type: ignore[arg-type] + @rpc + def query_image(self, image: Image, query: str): # type: ignore[no-untyped-def] + content = [{"type": "text", "text": query}, *image.agent_encode()] + self.append_history(HumanMessage(content=content)) # type: ignore[arg-type] + return asyncio.run_coroutine_threadsafe(self.agent_loop(), self._loop).result() # type: ignore[arg-type] + async def query_async(self, query: str): # type: ignore[no-untyped-def] return await self.agent_loop(query) diff --git a/dimos/agents/modules/vlm_agent.py b/dimos/agents/modules/vlm_agent.py new file mode 100644 index 0000000000..ed0c0b6691 --- /dev/null +++ b/dimos/agents/modules/vlm_agent.py @@ -0,0 +1,154 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +from langchain.chat_models import init_chat_model +from langchain_core.messages import AIMessage, HumanMessage, SystemMessage +from langchain_huggingface import ChatHuggingFace, HuggingFacePipeline + +from dimos.agents.ollama_agent import ensure_ollama_model +from dimos.agents.spec import AgentSpec +from dimos.agents.system_prompt import SYSTEM_PROMPT +from dimos.core import rpc +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + from dimos.core.stream import In, Out + from dimos.msgs.sensor_msgs import Image + +logger = setup_logger() + + +class VLMAgent(AgentSpec): + """Stream-first agent for vision queries with optional RPC access.""" + + color_image: In[Image] + query: In[HumanMessage] + answer: Out[AIMessage] + + def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] + super().__init__(*args, **kwargs) + if self.config.model_instance: + self._llm = self.config.model_instance + else: + if self.config.provider.value.lower() == "ollama": + ensure_ollama_model(self.config.model) + + if self.config.provider.value.lower() == "huggingface": + llm = HuggingFacePipeline.from_model_id( + model_id=self.config.model, + task="text-generation", + pipeline_kwargs={ + "max_new_tokens": 512, + "temperature": 0.7, + }, + ) + self._llm = ChatHuggingFace(llm=llm, model_id=self.config.model) + else: + self._llm = init_chat_model( # type: ignore[call-overload] + model_provider=self.config.provider, model=self.config.model + ) + self._latest_image: Image | None = None + self._history: list[AIMessage | HumanMessage] = [] + + if self.config.system_prompt: + if isinstance(self.config.system_prompt, str): + self._system_message = SystemMessage(self.config.system_prompt) + else: + self._system_message = self.config.system_prompt + else: + self._system_message = SystemMessage(SYSTEM_PROMPT) + + self.publish(self._system_message) + + @rpc + def start(self) -> None: + super().start() + self._disposables.add(self.color_image.subscribe(self._on_image)) # type: ignore[arg-type] + self._disposables.add(self.query.subscribe(self._on_query)) # type: ignore[arg-type] + + @rpc + def stop(self) -> None: + super().stop() + + def _on_image(self, image: Image) -> None: + self._latest_image = image + + def _on_query(self, msg: HumanMessage) -> None: + if not self._latest_image: + self.answer.publish(AIMessage(content="No image available yet.")) + return + + query_text = self._extract_text(msg) + response = self._invoke_image(self._latest_image, query_text) + self.answer.publish(response) + + def _extract_text(self, msg: HumanMessage) -> str: + content = msg.content + if isinstance(content, str): + return content + if isinstance(content, list): + for part in content: + if isinstance(part, dict) and part.get("type") == "text": + return str(part.get("text", "")) + return str(content) + + def _invoke(self, msg: HumanMessage) -> AIMessage: + messages = [self._system_message, msg] + response = self._llm.invoke(messages) + self.append_history(msg, response) # type: ignore[arg-type] + return response # type: ignore[return-value] + + def _invoke_image(self, image: Image, query: str) -> AIMessage: + content = [{"type": "text", "text": query}, *image.agent_encode()] + return self._invoke(HumanMessage(content=content)) + + @rpc + def clear_history(self): # type: ignore[no-untyped-def] + self._history.clear() + + def append_history(self, *msgs: list[AIMessage | HumanMessage]) -> None: + for msg in msgs: + self.publish(msg) # type: ignore[arg-type] + self._history.extend(msgs) + + def history(self) -> list[SystemMessage | AIMessage | HumanMessage]: + return [self._system_message, *self._history] + + @rpc + def register_skills( # type: ignore[no-untyped-def] + self, container, run_implicit_name: str | None = None + ) -> None: + logger.warning( + "VLMAgent does not manage skills; register_skills is a no-op", + container=str(container), + run_implicit_name=run_implicit_name, + ) + + @rpc + def query(self, query: str): # type: ignore[no-untyped-def] + response = self._invoke(HumanMessage(query)) + return response.content + + @rpc + def query_image(self, image: Image, query: str): # type: ignore[no-untyped-def] + response = self._invoke_image(image, query) + return response.content + + +vlm_agent = VLMAgent.blueprint + +__all__ = ["VLMAgent", "vlm_agent"] diff --git a/dimos/agents/modules/vlm_stream_tester.py b/dimos/agents/modules/vlm_stream_tester.py new file mode 100644 index 0000000000..c649e86583 --- /dev/null +++ b/dimos/agents/modules/vlm_stream_tester.py @@ -0,0 +1,156 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import threading +import time +from typing import TYPE_CHECKING + +from langchain_core.messages import AIMessage, HumanMessage + +from dimos.core import Module, rpc +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + from dimos.core.stream import In, Out + from dimos.msgs.sensor_msgs import Image + +logger = setup_logger() + + +class VlmStreamTester(Module): + """Smoke-test VLMAgent with replayed images and stream queries.""" + + color_image: In[Image] + query: Out[HumanMessage] + answer: In[AIMessage] + + rpc_calls: list[str] = [ + "VLMAgent.query_image", + ] + + def __init__( # type: ignore[no-untyped-def] + self, + prompt: str = "What do you see?", + num_queries: int = 3, + query_interval_s: float = 2.0, + max_image_age_s: float = 1.5, + max_image_gap_s: float = 1.5, + ) -> None: + super().__init__() + self._prompt = prompt + self._num_queries = num_queries + self._query_interval_s = query_interval_s + self._max_image_age_s = max_image_age_s + self._max_image_gap_s = max_image_gap_s + self._latest_image: Image | None = None + self._latest_image_wall_ts: float | None = None + self._last_image_wall_ts: float | None = None + self._max_gap_seen_s = 0.0 + self._answer_count = 0 + self._stop_event = threading.Event() + self._worker: threading.Thread | None = None + + @rpc + def start(self) -> None: + super().start() + self._disposables.add(self.color_image.subscribe(self._on_image)) # type: ignore[arg-type] + self._disposables.add(self.answer.subscribe(self._on_answer)) # type: ignore[arg-type] + self._worker = threading.Thread(target=self._run_queries, daemon=True) + self._worker.start() + + @rpc + def stop(self) -> None: + self._stop_event.set() + if self._worker and self._worker.is_alive(): + self._worker.join(timeout=1.0) + super().stop() + + def _on_image(self, image: Image) -> None: + now = time.time() + if self._last_image_wall_ts is not None: + gap = now - self._last_image_wall_ts + if gap > self._max_gap_seen_s: + self._max_gap_seen_s = gap + self._last_image_wall_ts = now + self._latest_image_wall_ts = now + self._latest_image = image + + def _on_answer(self, msg: AIMessage) -> None: + self._answer_count += 1 + logger.info( + "VLMAgent stream answer", + count=self._answer_count, + content=msg.content, + ) + + def _run_queries(self) -> None: + try: + while not self._stop_event.is_set() and self._latest_image is None: + time.sleep(0.05) + + for idx in range(self._num_queries): + if self._stop_event.is_set(): + break + if self._latest_image is None: + logger.warning("No image available for stream query.") + break + + image_age = None + if self._latest_image_wall_ts is not None: + image_age = time.time() - self._latest_image_wall_ts + if image_age > self._max_image_age_s: + logger.warning( + "Latest image is stale", + age_s=image_age, + max_age_s=self._max_image_age_s, + ) + + logger.info("Sending stream query", index=idx + 1) + self.query.publish( + HumanMessage( + content=f"{self._prompt} (stream query {idx + 1}/{self._num_queries})" + ) + ) + + try: + rpc_query = self.get_rpc_calls("VLMAgent.query_image") + response = rpc_query( + self._latest_image, + f"{self._prompt} (rpc query {idx + 1}/{self._num_queries})", + ) + logger.info( + "VLMAgent RPC answer", + query_index=idx + 1, + image_age_s=image_age, + content=response, + ) + except Exception as exc: + logger.warning("RPC query_image failed", error=str(exc)) + + time.sleep(self._query_interval_s) + except Exception as exc: + logger.exception("VlmStreamTester query loop failed", error=str(exc)) + finally: + if self._max_gap_seen_s > self._max_image_gap_s: + logger.warning( + "Image stream gap exceeded threshold", + max_gap_s=self._max_gap_seen_s, + threshold_s=self._max_image_gap_s, + ) + + +vlm_stream_tester = VlmStreamTester.blueprint + +__all__ = ["VlmStreamTester", "vlm_stream_tester"] diff --git a/dimos/agents/spec.py b/dimos/agents/spec.py index b0a0324e89..3a282aac3e 100644 --- a/dimos/agents/spec.py +++ b/dimos/agents/spec.py @@ -20,6 +20,7 @@ from typing import TYPE_CHECKING, Any, Union if TYPE_CHECKING: + from dimos.msgs.sensor_msgs import Image from dimos.protocol.skill.skill import SkillContainer from langchain.chat_models.base import _SUPPORTED_PROVIDERS @@ -190,6 +191,10 @@ def register_skills( @abstractmethod def query(self, query: str): ... # type: ignore[no-untyped-def] + @rpc + @abstractmethod + def query_image(self, image: "Image", query: str): ... # type: ignore[no-untyped-def] + def __str__(self) -> str: console = Console(force_terminal=True, legacy_windows=False) table = Table(show_header=True) diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 9b118cbd60..f989098f05 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -25,6 +25,7 @@ "unitree-go2-agentic-mcp": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:agentic_mcp", "unitree-go2-agentic-ollama": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:agentic_ollama", "unitree-go2-agentic-huggingface": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:agentic_huggingface", + "unitree-go2-vlm-stream-test": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:vlm_stream_test", "unitree-g1": "dimos.robot.unitree_webrtc.unitree_g1_blueprints:standard", "unitree-g1-sim": "dimos.robot.unitree_webrtc.unitree_g1_blueprints:standard_sim", "unitree-g1-basic": "dimos.robot.unitree_webrtc.unitree_g1_blueprints:basic_ros", diff --git a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py index 46d951650c..566667c690 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py +++ b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py @@ -23,6 +23,8 @@ from dimos.agents.agent import llm_agent from dimos.agents.cli.human import human_input from dimos.agents.cli.web import web_input +from dimos.agents.modules.vlm_agent import vlm_agent +from dimos.agents.modules.vlm_stream_tester import vlm_stream_tester from dimos.agents.ollama_agent import ollama_installed from dimos.agents.skills.navigation import navigation_skill from dimos.agents.skills.speak_skill import speak_skill @@ -190,3 +192,9 @@ ), _common_agentic, ) + +vlm_stream_test = autoconnect( + basic, + vlm_agent(), + vlm_stream_tester(), +) From 5c6ee9fc4338302e44f0f53a34e636d6edf4506d Mon Sep 17 00:00:00 2001 From: stash Date: Wed, 7 Jan 2026 13:00:40 -0800 Subject: [PATCH 2/9] Fix type checking bug --- dimos/agents/modules/vlm_agent.py | 9 ++------- dimos/agents/modules/vlm_stream_tester.py | 2 +- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/dimos/agents/modules/vlm_agent.py b/dimos/agents/modules/vlm_agent.py index ed0c0b6691..ebb6988974 100644 --- a/dimos/agents/modules/vlm_agent.py +++ b/dimos/agents/modules/vlm_agent.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import annotations - -from typing import TYPE_CHECKING from langchain.chat_models import init_chat_model from langchain_core.messages import AIMessage, HumanMessage, SystemMessage @@ -23,12 +20,10 @@ from dimos.agents.spec import AgentSpec from dimos.agents.system_prompt import SYSTEM_PROMPT from dimos.core import rpc +from dimos.core.stream import In, Out +from dimos.msgs.sensor_msgs import Image from dimos.utils.logging_config import setup_logger -if TYPE_CHECKING: - from dimos.core.stream import In, Out - from dimos.msgs.sensor_msgs import Image - logger = setup_logger() diff --git a/dimos/agents/modules/vlm_stream_tester.py b/dimos/agents/modules/vlm_stream_tester.py index c649e86583..41d5d24e86 100644 --- a/dimos/agents/modules/vlm_stream_tester.py +++ b/dimos/agents/modules/vlm_stream_tester.py @@ -43,7 +43,7 @@ class VlmStreamTester(Module): def __init__( # type: ignore[no-untyped-def] self, prompt: str = "What do you see?", - num_queries: int = 3, + num_queries: int = 10, query_interval_s: float = 2.0, max_image_age_s: float = 1.5, max_image_gap_s: float = 1.5, From 1486f48ad3c05bff8caeeb3748a62a34fbb36289 Mon Sep 17 00:00:00 2001 From: stash Date: Wed, 7 Jan 2026 13:04:59 -0800 Subject: [PATCH 3/9] Fix type checking bug --- dimos/agents/modules/vlm_stream_tester.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/dimos/agents/modules/vlm_stream_tester.py b/dimos/agents/modules/vlm_stream_tester.py index 41d5d24e86..ebeeec0e7a 100644 --- a/dimos/agents/modules/vlm_stream_tester.py +++ b/dimos/agents/modules/vlm_stream_tester.py @@ -11,21 +11,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import annotations import threading import time -from typing import TYPE_CHECKING from langchain_core.messages import AIMessage, HumanMessage from dimos.core import Module, rpc +from dimos.core.stream import In, Out +from dimos.msgs.sensor_msgs import Image from dimos.utils.logging_config import setup_logger -if TYPE_CHECKING: - from dimos.core.stream import In, Out - from dimos.msgs.sensor_msgs import Image - logger = setup_logger() From 7ef72b77ea8bd6c336b8f2c15e47381b1c36aed6 Mon Sep 17 00:00:00 2001 From: stash Date: Wed, 7 Jan 2026 13:15:31 -0800 Subject: [PATCH 4/9] revert changes to spec and agent not needed for vlm agent --- dimos/agents/agent.py | 7 ------- dimos/agents/spec.py | 5 ----- 2 files changed, 12 deletions(-) diff --git a/dimos/agents/agent.py b/dimos/agents/agent.py index b9f17cf984..bf5ded4f00 100644 --- a/dimos/agents/agent.py +++ b/dimos/agents/agent.py @@ -33,7 +33,6 @@ from dimos.agents.spec import AgentSpec, Model, Provider from dimos.agents.system_prompt import SYSTEM_PROMPT from dimos.core import DimosCluster, rpc -from dimos.msgs.sensor_msgs import Image from dimos.protocol.skill.coordinator import SkillCoordinator, SkillState, SkillStateDict from dimos.protocol.skill.skill import SkillContainer from dimos.protocol.skill.type import Output @@ -363,12 +362,6 @@ def query(self, query: str): # type: ignore[no-untyped-def] # return sync(self._loop, self.agent_loop, query) return asyncio.run_coroutine_threadsafe(self.agent_loop(query), self._loop).result() # type: ignore[arg-type] - @rpc - def query_image(self, image: Image, query: str): # type: ignore[no-untyped-def] - content = [{"type": "text", "text": query}, *image.agent_encode()] - self.append_history(HumanMessage(content=content)) # type: ignore[arg-type] - return asyncio.run_coroutine_threadsafe(self.agent_loop(), self._loop).result() # type: ignore[arg-type] - async def query_async(self, query: str): # type: ignore[no-untyped-def] return await self.agent_loop(query) diff --git a/dimos/agents/spec.py b/dimos/agents/spec.py index 3a282aac3e..b0a0324e89 100644 --- a/dimos/agents/spec.py +++ b/dimos/agents/spec.py @@ -20,7 +20,6 @@ from typing import TYPE_CHECKING, Any, Union if TYPE_CHECKING: - from dimos.msgs.sensor_msgs import Image from dimos.protocol.skill.skill import SkillContainer from langchain.chat_models.base import _SUPPORTED_PROVIDERS @@ -191,10 +190,6 @@ def register_skills( @abstractmethod def query(self, query: str): ... # type: ignore[no-untyped-def] - @rpc - @abstractmethod - def query_image(self, image: "Image", query: str): ... # type: ignore[no-untyped-def] - def __str__(self) -> str: console = Console(force_terminal=True, legacy_windows=False) table = Table(show_header=True) From a704e201098f49332711f978aeb2316d5d3a349d Mon Sep 17 00:00:00 2001 From: stash Date: Wed, 7 Jan 2026 13:46:52 -0800 Subject: [PATCH 5/9] Added LLMinitmixin for cleaner / generic llm initialization --- dimos/agents/agent.py | 43 +++------------------- dimos/agents/llm_init_mixin.py | 59 +++++++++++++++++++++++++++++++ dimos/agents/modules/vlm_agent.py | 39 +++----------------- 3 files changed, 67 insertions(+), 74 deletions(-) create mode 100644 dimos/agents/llm_init_mixin.py diff --git a/dimos/agents/agent.py b/dimos/agents/agent.py index bf5ded4f00..2c1f16ebd4 100644 --- a/dimos/agents/agent.py +++ b/dimos/agents/agent.py @@ -19,7 +19,6 @@ from typing import Any, TypedDict import uuid -from langchain.chat_models import init_chat_model from langchain_core.messages import ( AIMessage, HumanMessage, @@ -27,11 +26,9 @@ ToolCall, ToolMessage, ) -from langchain_huggingface import ChatHuggingFace, HuggingFacePipeline -from dimos.agents.ollama_agent import ensure_ollama_model +from dimos.agents.llm_init_mixin import LlmInitMixin from dimos.agents.spec import AgentSpec, Model, Provider -from dimos.agents.system_prompt import SYSTEM_PROMPT from dimos.core import DimosCluster, rpc from dimos.protocol.skill.coordinator import SkillCoordinator, SkillState, SkillStateDict from dimos.protocol.skill.skill import SkillContainer @@ -158,7 +155,7 @@ def snapshot_to_messages( # Agent class job is to glue skill coordinator state to an agent, builds langchain messages -class Agent(AgentSpec): +class Agent(LlmInitMixin, AgentSpec): system_message: SystemMessage state_messages: list[AIMessage | HumanMessage] @@ -175,40 +172,8 @@ def __init__( # type: ignore[no-untyped-def] self._agent_id = str(uuid.uuid4()) self._agent_stopped = False - if self.config.system_prompt: - if isinstance(self.config.system_prompt, str): - self.system_message = SystemMessage(self.config.system_prompt + SYSTEM_MSG_APPEND) - else: - self.config.system_prompt.content += SYSTEM_MSG_APPEND # type: ignore[operator] - self.system_message = self.config.system_prompt - else: - self.system_message = SystemMessage(SYSTEM_PROMPT + SYSTEM_MSG_APPEND) - - self.publish(self.system_message) - - # Use provided model instance if available, otherwise initialize from config - if self.config.model_instance: - self._llm = self.config.model_instance - else: - # For Ollama provider, ensure the model is available before initializing - if self.config.provider.value.lower() == "ollama": - ensure_ollama_model(self.config.model) - - # For HuggingFace, we need to create a pipeline and wrap it in ChatHuggingFace - if self.config.provider.value.lower() == "huggingface": - llm = HuggingFacePipeline.from_model_id( - model_id=self.config.model, - task="text-generation", - pipeline_kwargs={ - "max_new_tokens": 512, - "temperature": 0.7, - }, - ) - self._llm = ChatHuggingFace(llm=llm, model_id=self.config.model) - else: - self._llm = init_chat_model( # type: ignore[call-overload] - model_provider=self.config.provider, model=self.config.model - ) + self.system_message = self._init_system_message(append=SYSTEM_MSG_APPEND) + self._llm = self._init_llm() @rpc def get_agent_id(self) -> str: diff --git a/dimos/agents/llm_init_mixin.py b/dimos/agents/llm_init_mixin.py new file mode 100644 index 0000000000..c07f604119 --- /dev/null +++ b/dimos/agents/llm_init_mixin.py @@ -0,0 +1,59 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from langchain.chat_models import init_chat_model +from langchain_core.language_models.chat_models import BaseChatModel +from langchain_core.messages import SystemMessage +from langchain_huggingface import ChatHuggingFace, HuggingFacePipeline + +from dimos.agents.ollama_agent import ensure_ollama_model +from dimos.agents.system_prompt import SYSTEM_PROMPT + + +class LlmInitMixin: + def _init_llm(self) -> BaseChatModel: + if self.config.model_instance: + return self.config.model_instance + + if self.config.provider.value.lower() == "ollama": + ensure_ollama_model(self.config.model) + + if self.config.provider.value.lower() == "huggingface": + llm = HuggingFacePipeline.from_model_id( + model_id=self.config.model, + task="text-generation", + pipeline_kwargs={ + "max_new_tokens": 512, + "temperature": 0.7, + }, + ) + return ChatHuggingFace(llm=llm, model_id=self.config.model) + + return init_chat_model( # type: ignore[call-overload] + model_provider=self.config.provider, + model=self.config.model, + ) + + def _init_system_message(self, *, append: str = "") -> SystemMessage: + if self.config.system_prompt: + if isinstance(self.config.system_prompt, str): + system_message = SystemMessage(self.config.system_prompt + append) + else: + if append: + self.config.system_prompt.content += append # type: ignore[operator] + system_message = self.config.system_prompt + else: + system_message = SystemMessage(SYSTEM_PROMPT + append) + + self.publish(system_message) + return system_message diff --git a/dimos/agents/modules/vlm_agent.py b/dimos/agents/modules/vlm_agent.py index ebb6988974..fe854ffb11 100644 --- a/dimos/agents/modules/vlm_agent.py +++ b/dimos/agents/modules/vlm_agent.py @@ -12,13 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from langchain.chat_models import init_chat_model from langchain_core.messages import AIMessage, HumanMessage, SystemMessage -from langchain_huggingface import ChatHuggingFace, HuggingFacePipeline -from dimos.agents.ollama_agent import ensure_ollama_model +from dimos.agents.llm_init_mixin import LlmInitMixin from dimos.agents.spec import AgentSpec -from dimos.agents.system_prompt import SYSTEM_PROMPT from dimos.core import rpc from dimos.core.stream import In, Out from dimos.msgs.sensor_msgs import Image @@ -27,7 +24,7 @@ logger = setup_logger() -class VLMAgent(AgentSpec): +class VLMAgent(LlmInitMixin, AgentSpec): """Stream-first agent for vision queries with optional RPC access.""" color_image: In[Image] @@ -36,38 +33,10 @@ class VLMAgent(AgentSpec): def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(*args, **kwargs) - if self.config.model_instance: - self._llm = self.config.model_instance - else: - if self.config.provider.value.lower() == "ollama": - ensure_ollama_model(self.config.model) - - if self.config.provider.value.lower() == "huggingface": - llm = HuggingFacePipeline.from_model_id( - model_id=self.config.model, - task="text-generation", - pipeline_kwargs={ - "max_new_tokens": 512, - "temperature": 0.7, - }, - ) - self._llm = ChatHuggingFace(llm=llm, model_id=self.config.model) - else: - self._llm = init_chat_model( # type: ignore[call-overload] - model_provider=self.config.provider, model=self.config.model - ) + self._llm = self._init_llm() self._latest_image: Image | None = None self._history: list[AIMessage | HumanMessage] = [] - - if self.config.system_prompt: - if isinstance(self.config.system_prompt, str): - self._system_message = SystemMessage(self.config.system_prompt) - else: - self._system_message = self.config.system_prompt - else: - self._system_message = SystemMessage(SYSTEM_PROMPT) - - self.publish(self._system_message) + self._system_message = self._init_system_message() @rpc def start(self) -> None: From 671c0228381024314f5d6f19f5409e561871b3ee Mon Sep 17 00:00:00 2001 From: stash Date: Wed, 7 Jan 2026 14:08:05 -0800 Subject: [PATCH 6/9] Changed mixin --> helper function --- dimos/agents/agent.py | 9 ++--- dimos/agents/llm_init.py | 57 +++++++++++++++++++++++++++++ dimos/agents/llm_init_mixin.py | 59 ------------------------------- dimos/agents/modules/vlm_agent.py | 9 ++--- 4 files changed, 67 insertions(+), 67 deletions(-) create mode 100644 dimos/agents/llm_init.py delete mode 100644 dimos/agents/llm_init_mixin.py diff --git a/dimos/agents/agent.py b/dimos/agents/agent.py index 2c1f16ebd4..e9c7c5d7b9 100644 --- a/dimos/agents/agent.py +++ b/dimos/agents/agent.py @@ -27,7 +27,7 @@ ToolMessage, ) -from dimos.agents.llm_init_mixin import LlmInitMixin +from dimos.agents.llm_init import build_llm, build_system_message from dimos.agents.spec import AgentSpec, Model, Provider from dimos.core import DimosCluster, rpc from dimos.protocol.skill.coordinator import SkillCoordinator, SkillState, SkillStateDict @@ -155,7 +155,7 @@ def snapshot_to_messages( # Agent class job is to glue skill coordinator state to an agent, builds langchain messages -class Agent(LlmInitMixin, AgentSpec): +class Agent(AgentSpec): system_message: SystemMessage state_messages: list[AIMessage | HumanMessage] @@ -172,8 +172,9 @@ def __init__( # type: ignore[no-untyped-def] self._agent_id = str(uuid.uuid4()) self._agent_stopped = False - self.system_message = self._init_system_message(append=SYSTEM_MSG_APPEND) - self._llm = self._init_llm() + self.system_message = build_system_message(self.config, append=SYSTEM_MSG_APPEND) + self.publish(self.system_message) + self._llm = build_llm(self.config) @rpc def get_agent_id(self) -> str: diff --git a/dimos/agents/llm_init.py b/dimos/agents/llm_init.py new file mode 100644 index 0000000000..7576dc59ab --- /dev/null +++ b/dimos/agents/llm_init.py @@ -0,0 +1,57 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from langchain.chat_models import init_chat_model +from langchain_core.language_models.chat_models import BaseChatModel +from langchain_core.messages import SystemMessage +from langchain_huggingface import ChatHuggingFace, HuggingFacePipeline + +from dimos.agents.ollama_agent import ensure_ollama_model +from dimos.agents.spec import AgentConfig +from dimos.agents.system_prompt import SYSTEM_PROMPT + + +def build_llm(config: AgentConfig) -> BaseChatModel: + if config.model_instance: + return config.model_instance + + if config.provider.value.lower() == "ollama": + ensure_ollama_model(config.model) + + if config.provider.value.lower() == "huggingface": + llm = HuggingFacePipeline.from_model_id( + model_id=config.model, + task="text-generation", + pipeline_kwargs={ + "max_new_tokens": 512, + "temperature": 0.7, + }, + ) + return ChatHuggingFace(llm=llm, model_id=config.model) + + return init_chat_model( # type: ignore[call-overload] + model_provider=config.provider, + model=config.model, + ) + + +def build_system_message(config: AgentConfig, *, append: str = "") -> SystemMessage: + if config.system_prompt: + if isinstance(config.system_prompt, str): + return SystemMessage(config.system_prompt + append) + if append: + config.system_prompt.content += append # type: ignore[operator] + return config.system_prompt + + return SystemMessage(SYSTEM_PROMPT + append) diff --git a/dimos/agents/llm_init_mixin.py b/dimos/agents/llm_init_mixin.py deleted file mode 100644 index c07f604119..0000000000 --- a/dimos/agents/llm_init_mixin.py +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from langchain.chat_models import init_chat_model -from langchain_core.language_models.chat_models import BaseChatModel -from langchain_core.messages import SystemMessage -from langchain_huggingface import ChatHuggingFace, HuggingFacePipeline - -from dimos.agents.ollama_agent import ensure_ollama_model -from dimos.agents.system_prompt import SYSTEM_PROMPT - - -class LlmInitMixin: - def _init_llm(self) -> BaseChatModel: - if self.config.model_instance: - return self.config.model_instance - - if self.config.provider.value.lower() == "ollama": - ensure_ollama_model(self.config.model) - - if self.config.provider.value.lower() == "huggingface": - llm = HuggingFacePipeline.from_model_id( - model_id=self.config.model, - task="text-generation", - pipeline_kwargs={ - "max_new_tokens": 512, - "temperature": 0.7, - }, - ) - return ChatHuggingFace(llm=llm, model_id=self.config.model) - - return init_chat_model( # type: ignore[call-overload] - model_provider=self.config.provider, - model=self.config.model, - ) - - def _init_system_message(self, *, append: str = "") -> SystemMessage: - if self.config.system_prompt: - if isinstance(self.config.system_prompt, str): - system_message = SystemMessage(self.config.system_prompt + append) - else: - if append: - self.config.system_prompt.content += append # type: ignore[operator] - system_message = self.config.system_prompt - else: - system_message = SystemMessage(SYSTEM_PROMPT + append) - - self.publish(system_message) - return system_message diff --git a/dimos/agents/modules/vlm_agent.py b/dimos/agents/modules/vlm_agent.py index fe854ffb11..25c783722d 100644 --- a/dimos/agents/modules/vlm_agent.py +++ b/dimos/agents/modules/vlm_agent.py @@ -14,7 +14,7 @@ from langchain_core.messages import AIMessage, HumanMessage, SystemMessage -from dimos.agents.llm_init_mixin import LlmInitMixin +from dimos.agents.llm_init import build_llm, build_system_message from dimos.agents.spec import AgentSpec from dimos.core import rpc from dimos.core.stream import In, Out @@ -24,7 +24,7 @@ logger = setup_logger() -class VLMAgent(LlmInitMixin, AgentSpec): +class VLMAgent(AgentSpec): """Stream-first agent for vision queries with optional RPC access.""" color_image: In[Image] @@ -33,10 +33,11 @@ class VLMAgent(LlmInitMixin, AgentSpec): def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(*args, **kwargs) - self._llm = self._init_llm() + self._llm = build_llm(self.config) self._latest_image: Image | None = None self._history: list[AIMessage | HumanMessage] = [] - self._system_message = self._init_system_message() + self._system_message = build_system_message(self.config) + self.publish(self._system_message) @rpc def start(self) -> None: From 8f21f22efb759844ecccd61f675853607aae54b5 Mon Sep 17 00:00:00 2001 From: stash Date: Wed, 7 Jan 2026 14:17:00 -0800 Subject: [PATCH 7/9] File rename in agents/modules --- dimos/agents/__init__.py | 15 +++++++++++++-- dimos/agents/{modules => }/vlm_agent.py | 0 dimos/agents/{modules => }/vlm_stream_tester.py | 0 .../unitree_webrtc/unitree_go2_blueprints.py | 4 ++-- 4 files changed, 15 insertions(+), 4 deletions(-) rename dimos/agents/{modules => }/vlm_agent.py (100%) rename dimos/agents/{modules => }/vlm_stream_tester.py (100%) diff --git a/dimos/agents/__init__.py b/dimos/agents/__init__.py index a3c73e852d..2bac584249 100644 --- a/dimos/agents/__init__.py +++ b/dimos/agents/__init__.py @@ -8,9 +8,20 @@ ) from dimos.agents.agent import Agent, deploy -from dimos.agents.modules.vlm_agent import VLMAgent from dimos.agents.spec import AgentSpec +from dimos.agents.vlm_agent import VLMAgent +from dimos.agents.vlm_stream_tester import VlmStreamTester from dimos.protocol.skill.skill import skill from dimos.protocol.skill.type import Output, Reducer, Stream -__all__ = ["Agent", "AgentSpec", "Output", "Reducer", "Stream", "VLMAgent", "deploy", "skill"] +__all__ = [ + "Agent", + "AgentSpec", + "Output", + "Reducer", + "Stream", + "VLMAgent", + "VlmStreamTester", + "deploy", + "skill", +] diff --git a/dimos/agents/modules/vlm_agent.py b/dimos/agents/vlm_agent.py similarity index 100% rename from dimos/agents/modules/vlm_agent.py rename to dimos/agents/vlm_agent.py diff --git a/dimos/agents/modules/vlm_stream_tester.py b/dimos/agents/vlm_stream_tester.py similarity index 100% rename from dimos/agents/modules/vlm_stream_tester.py rename to dimos/agents/vlm_stream_tester.py diff --git a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py index 566667c690..7629644ed6 100644 --- a/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py +++ b/dimos/robot/unitree_webrtc/unitree_go2_blueprints.py @@ -23,12 +23,12 @@ from dimos.agents.agent import llm_agent from dimos.agents.cli.human import human_input from dimos.agents.cli.web import web_input -from dimos.agents.modules.vlm_agent import vlm_agent -from dimos.agents.modules.vlm_stream_tester import vlm_stream_tester from dimos.agents.ollama_agent import ollama_installed from dimos.agents.skills.navigation import navigation_skill from dimos.agents.skills.speak_skill import speak_skill from dimos.agents.spec import Provider +from dimos.agents.vlm_agent import vlm_agent +from dimos.agents.vlm_stream_tester import vlm_stream_tester from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE from dimos.core.blueprints import autoconnect from dimos.core.transport import JpegLcmTransport, JpegShmTransport, LCMTransport, pSHMTransport From e70b8a16fc7edec5d17b2dbe4c944998347c5d5e Mon Sep 17 00:00:00 2001 From: stash Date: Wed, 7 Jan 2026 14:27:28 -0800 Subject: [PATCH 8/9] VLM stream example added both rpc and stream examples --- dimos/agents/vlm_stream_tester.py | 107 +++++++++++++++++++----------- 1 file changed, 67 insertions(+), 40 deletions(-) diff --git a/dimos/agents/vlm_stream_tester.py b/dimos/agents/vlm_stream_tester.py index ebeeec0e7a..8fb5b8b58c 100644 --- a/dimos/agents/vlm_stream_tester.py +++ b/dimos/agents/vlm_stream_tester.py @@ -96,46 +96,8 @@ def _run_queries(self) -> None: while not self._stop_event.is_set() and self._latest_image is None: time.sleep(0.05) - for idx in range(self._num_queries): - if self._stop_event.is_set(): - break - if self._latest_image is None: - logger.warning("No image available for stream query.") - break - - image_age = None - if self._latest_image_wall_ts is not None: - image_age = time.time() - self._latest_image_wall_ts - if image_age > self._max_image_age_s: - logger.warning( - "Latest image is stale", - age_s=image_age, - max_age_s=self._max_image_age_s, - ) - - logger.info("Sending stream query", index=idx + 1) - self.query.publish( - HumanMessage( - content=f"{self._prompt} (stream query {idx + 1}/{self._num_queries})" - ) - ) - - try: - rpc_query = self.get_rpc_calls("VLMAgent.query_image") - response = rpc_query( - self._latest_image, - f"{self._prompt} (rpc query {idx + 1}/{self._num_queries})", - ) - logger.info( - "VLMAgent RPC answer", - query_index=idx + 1, - image_age_s=image_age, - content=response, - ) - except Exception as exc: - logger.warning("RPC query_image failed", error=str(exc)) - - time.sleep(self._query_interval_s) + self._run_stream_queries() + self._run_rpc_queries() except Exception as exc: logger.exception("VlmStreamTester query loop failed", error=str(exc)) finally: @@ -146,6 +108,71 @@ def _run_queries(self) -> None: threshold_s=self._max_image_gap_s, ) + def _run_stream_queries(self) -> None: + for idx in range(self._num_queries): + if self._stop_event.is_set(): + break + if self._latest_image is None: + logger.warning("No image available for stream query.") + break + + image_age = None + if self._latest_image_wall_ts is not None: + image_age = time.time() - self._latest_image_wall_ts + if image_age > self._max_image_age_s: + logger.warning( + "Latest image is stale", + age_s=image_age, + max_age_s=self._max_image_age_s, + ) + + logger.info("Sending stream query", index=idx + 1, total=self._num_queries) + self.query.publish( + HumanMessage(content=f"{self._prompt} (stream query {idx + 1}/{self._num_queries})") + ) + time.sleep(self._query_interval_s) + + def _run_rpc_queries(self) -> None: + rpc_query = None + try: + rpc_query = self.get_rpc_calls("VLMAgent.query_image") + except Exception as exc: + logger.warning("RPC query_image lookup failed", error=str(exc)) + return + + for idx in range(self._num_queries): + if self._stop_event.is_set(): + break + if self._latest_image is None: + logger.warning("No image available for RPC query.") + break + + image_age = None + if self._latest_image_wall_ts is not None: + image_age = time.time() - self._latest_image_wall_ts + if image_age > self._max_image_age_s: + logger.warning( + "Latest image is stale", + age_s=image_age, + max_age_s=self._max_image_age_s, + ) + + logger.info("Sending RPC query", index=idx + 1, total=self._num_queries) + try: + response = rpc_query( + self._latest_image, + f"{self._prompt} (rpc query {idx + 1}/{self._num_queries})", + ) + logger.info( + "VLMAgent RPC answer", + query_index=idx + 1, + image_age_s=image_age, + content=response, + ) + except Exception as exc: + logger.warning("RPC query_image failed", error=str(exc)) + time.sleep(self._query_interval_s) + vlm_stream_tester = VlmStreamTester.blueprint From f2502f86fd6cfb267b5edaa8e6bda9b78343eef2 Mon Sep 17 00:00:00 2001 From: stash Date: Wed, 7 Jan 2026 14:38:05 -0800 Subject: [PATCH 9/9] Passing mypy --- dimos/agents/llm_init.py | 11 ++++++++--- dimos/agents/vlm_agent.py | 23 ++++++++++++----------- dimos/agents/vlm_stream_tester.py | 8 ++++---- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/dimos/agents/llm_init.py b/dimos/agents/llm_init.py index 7576dc59ab..eb8c33c631 100644 --- a/dimos/agents/llm_init.py +++ b/dimos/agents/llm_init.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import cast + from langchain.chat_models import init_chat_model from langchain_core.language_models.chat_models import BaseChatModel from langchain_core.messages import SystemMessage @@ -40,9 +42,12 @@ def build_llm(config: AgentConfig) -> BaseChatModel: ) return ChatHuggingFace(llm=llm, model_id=config.model) - return init_chat_model( # type: ignore[call-overload] - model_provider=config.provider, - model=config.model, + return cast( + "BaseChatModel", + init_chat_model( # type: ignore[call-overload] + model_provider=config.provider, + model=config.model, + ), ) diff --git a/dimos/agents/vlm_agent.py b/dimos/agents/vlm_agent.py index 25c783722d..0757a59d22 100644 --- a/dimos/agents/vlm_agent.py +++ b/dimos/agents/vlm_agent.py @@ -15,7 +15,7 @@ from langchain_core.messages import AIMessage, HumanMessage, SystemMessage from dimos.agents.llm_init import build_llm, build_system_message -from dimos.agents.spec import AgentSpec +from dimos.agents.spec import AgentSpec, AnyMessage from dimos.core import rpc from dimos.core.stream import In, Out from dimos.msgs.sensor_msgs import Image @@ -28,8 +28,8 @@ class VLMAgent(AgentSpec): """Stream-first agent for vision queries with optional RPC access.""" color_image: In[Image] - query: In[HumanMessage] - answer: Out[AIMessage] + query_stream: In[HumanMessage] + answer_stream: Out[AIMessage] def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(*args, **kwargs) @@ -43,7 +43,7 @@ def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] def start(self) -> None: super().start() self._disposables.add(self.color_image.subscribe(self._on_image)) # type: ignore[arg-type] - self._disposables.add(self.query.subscribe(self._on_query)) # type: ignore[arg-type] + self._disposables.add(self.query_stream.subscribe(self._on_query)) # type: ignore[arg-type] @rpc def stop(self) -> None: @@ -54,12 +54,12 @@ def _on_image(self, image: Image) -> None: def _on_query(self, msg: HumanMessage) -> None: if not self._latest_image: - self.answer.publish(AIMessage(content="No image available yet.")) + self.answer_stream.publish(AIMessage(content="No image available yet.")) return query_text = self._extract_text(msg) response = self._invoke_image(self._latest_image, query_text) - self.answer.publish(response) + self.answer_stream.publish(response) def _extract_text(self, msg: HumanMessage) -> str: content = msg.content @@ -74,7 +74,7 @@ def _extract_text(self, msg: HumanMessage) -> str: def _invoke(self, msg: HumanMessage) -> AIMessage: messages = [self._system_message, msg] response = self._llm.invoke(messages) - self.append_history(msg, response) # type: ignore[arg-type] + self.append_history([msg, response]) # type: ignore[arg-type] return response # type: ignore[return-value] def _invoke_image(self, image: Image, query: str) -> AIMessage: @@ -86,11 +86,12 @@ def clear_history(self): # type: ignore[no-untyped-def] self._history.clear() def append_history(self, *msgs: list[AIMessage | HumanMessage]) -> None: - for msg in msgs: - self.publish(msg) # type: ignore[arg-type] - self._history.extend(msgs) + for msg_list in msgs: + for msg in msg_list: + self.publish(msg) # type: ignore[arg-type] + self._history.extend(msg_list) - def history(self) -> list[SystemMessage | AIMessage | HumanMessage]: + def history(self) -> list[AnyMessage]: return [self._system_message, *self._history] @rpc diff --git a/dimos/agents/vlm_stream_tester.py b/dimos/agents/vlm_stream_tester.py index 8fb5b8b58c..79bb802a03 100644 --- a/dimos/agents/vlm_stream_tester.py +++ b/dimos/agents/vlm_stream_tester.py @@ -29,8 +29,8 @@ class VlmStreamTester(Module): """Smoke-test VLMAgent with replayed images and stream queries.""" color_image: In[Image] - query: Out[HumanMessage] - answer: In[AIMessage] + query_stream: Out[HumanMessage] + answer_stream: In[AIMessage] rpc_calls: list[str] = [ "VLMAgent.query_image", @@ -62,7 +62,7 @@ def __init__( # type: ignore[no-untyped-def] def start(self) -> None: super().start() self._disposables.add(self.color_image.subscribe(self._on_image)) # type: ignore[arg-type] - self._disposables.add(self.answer.subscribe(self._on_answer)) # type: ignore[arg-type] + self._disposables.add(self.answer_stream.subscribe(self._on_answer)) # type: ignore[arg-type] self._worker = threading.Thread(target=self._run_queries, daemon=True) self._worker.start() @@ -127,7 +127,7 @@ def _run_stream_queries(self) -> None: ) logger.info("Sending stream query", index=idx + 1, total=self._num_queries) - self.query.publish( + self.query_stream.publish( HumanMessage(content=f"{self._prompt} (stream query {idx + 1}/{self._num_queries})") ) time.sleep(self._query_interval_s)