diff --git a/dimos/agents/agent.py b/dimos/agents/agent.py index 62765ef706..599a8966ec 100644 --- a/dimos/agents/agent.py +++ b/dimos/agents/agent.py @@ -889,22 +889,5 @@ def _send_query(self, messages: list) -> Any: logger.error(f"Unexpected error in API call: {e}") raise - def stream_query(self, query_text: str) -> Observable: - """Creates an observable that processes a text query and emits the response. - - This method provides a simple way to send a text query and get an observable - stream of the response. It's designed for one-off queries rather than - continuous processing of input streams. - - Args: - query_text (str): The query text to process. - - Returns: - Observable: An observable that emits the response as a string. - """ - return create( - lambda observer, _: self._observable_query(observer, incoming_query=query_text) - ) - # endregion OpenAIAgent Subclass (OpenAI-Specific Implementation) diff --git a/dimos/agents/agent_config.py b/dimos/agents/agent_config.py index 5b9027b072..bc61736813 100644 --- a/dimos/agents/agent_config.py +++ b/dimos/agents/agent_config.py @@ -25,31 +25,3 @@ def __init__(self, agents: list[Agent] | None = None) -> None: agents (List[Agent], optional): List of Agent instances. Defaults to empty list. """ self.agents = agents if agents is not None else [] - - def add_agent(self, agent: Agent) -> None: - """ - Add an agent to the configuration. - - Args: - agent (Agent): Agent instance to add - """ - self.agents.append(agent) - - def remove_agent(self, agent: Agent) -> None: - """ - Remove an agent from the configuration. - - Args: - agent (Agent): Agent instance to remove - """ - if agent in self.agents: - self.agents.remove(agent) - - def get_agents(self) -> list[Agent]: - """ - Get the list of configured agents. - - Returns: - List[Agent]: List of configured agents - """ - return self.agents diff --git a/dimos/agents/agent_message.py b/dimos/agents/agent_message.py index cecd8092c1..c9404d6b5b 100644 --- a/dimos/agents/agent_message.py +++ b/dimos/agents/agent_message.py @@ -18,7 +18,6 @@ import time from dimos.agents.agent_types import AgentImage -from dimos.msgs.sensor_msgs.Image import Image @dataclass @@ -42,22 +41,6 @@ def add_text(self, text: str) -> None: if text: # Only add non-empty text self.messages.append(text) - def add_image(self, image: Image | AgentImage) -> None: - """Add an image. Converts Image to AgentImage if needed.""" - if isinstance(image, Image): - # Convert to AgentImage - agent_image = AgentImage( - base64_jpeg=image.agent_encode(), - width=image.width, - height=image.height, - metadata={"format": image.format.value, "frame_id": image.frame_id}, - ) - self.images.append(agent_image) - elif isinstance(image, AgentImage): - self.images.append(image) - else: - raise TypeError(f"Expected Image or AgentImage, got {type(image)}") - def has_text(self) -> bool: """Check if message contains text.""" # Check if we have any non-empty messages @@ -71,14 +54,6 @@ def is_multimodal(self) -> bool: """Check if message contains both text and images.""" return self.has_text() and self.has_images() - def get_primary_text(self) -> str | None: - """Get the first text message, if any.""" - return self.messages[0] if self.messages else None - - def get_primary_image(self) -> AgentImage | None: - """Get the first image, if any.""" - return self.images[0] if self.images else None - def get_combined_text(self) -> str: """Get all text messages combined into a single string.""" # Filter out any empty strings and join diff --git a/dimos/agents/agent_types.py b/dimos/agents/agent_types.py index db41acbafb..0e11ed50a6 100644 --- a/dimos/agents/agent_types.py +++ b/dimos/agents/agent_types.py @@ -185,42 +185,6 @@ def add_tool_result(self, tool_call_id: str, content: str, name: str | None = No ) self._trim() - def add_raw_message(self, message: dict[str, Any]) -> None: - """Add a raw message dict to history. - - Args: - message: Message dict with role and content - """ - with self._lock: - # Extract fields from raw message - role = message.get("role", "user") - content = message.get("content", "") - - # Handle tool calls if present - tool_calls = None - if "tool_calls" in message: - tool_calls = [ - ToolCall( - id=tc["id"], - name=tc["function"]["name"], - arguments=json.loads(tc["function"]["arguments"]) - if isinstance(tc["function"]["arguments"], str) - else tc["function"]["arguments"], - status="completed", - ) - for tc in message["tool_calls"] - ] - - # Handle tool_call_id for tool responses - tool_call_id = message.get("tool_call_id") - - self._messages.append( - ConversationMessage( - role=role, content=content, tool_calls=tool_calls, tool_call_id=tool_call_id - ) - ) - self._trim() - def to_openai_format(self) -> list[dict[str, Any]]: """Export history in OpenAI format. diff --git a/dimos/agents/memory/base.py b/dimos/agents/memory/base.py index eb48dcca44..d2aa56fbd8 100644 --- a/dimos/agents/memory/base.py +++ b/dimos/agents/memory/base.py @@ -107,15 +107,6 @@ def query(self, query_texts, n_results: int = 4, similarity_threshold=None): ConnectionError: If database connection fails during query. """ - ## Update ## - @abstractmethod - def update_vector(self, vector_id, new_vector_data): - """Update an existing vector in the database. - Args: - vector_id (any): The identifier of the vector to update. - new_vector_data (any): The new data to replace the existing vector data. - """ - ## Delete ## @abstractmethod def delete_vector(self, vector_id): diff --git a/dimos/agents/memory/chroma_impl.py b/dimos/agents/memory/chroma_impl.py index b238b616d8..faee04cee4 100644 --- a/dimos/agents/memory/chroma_impl.py +++ b/dimos/agents/memory/chroma_impl.py @@ -12,12 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections.abc import Sequence import os from langchain_chroma import Chroma from langchain_openai import OpenAIEmbeddings -import torch from dimos.agents.memory.base import AbstractAgentSemanticMemory @@ -71,10 +69,6 @@ def query(self, query_texts, n_results: int = 4, similarity_threshold=None): documents = self.db_connection.similarity_search(query=query_texts, k=n_results) return [(doc, None) for doc in documents] - def update_vector(self, vector_id, new_vector_data): - # TODO - return super().connect() - def delete_vector(self, vector_id): """Delete a vector from the ChromaDB using its identifier.""" if not self.db_connection: @@ -122,53 +116,3 @@ def create(self): embedding_function=self.embeddings, collection_metadata={"hnsw:space": "cosine"}, ) - - -class LocalSemanticMemory(ChromaAgentSemanticMemory): - """Semantic memory implementation using local models.""" - - def __init__( - self, - collection_name: str = "my_collection", - model_name: str = "sentence-transformers/all-MiniLM-L6-v2", - ) -> None: - """Initialize the local semantic memory using SentenceTransformer. - - Args: - collection_name (str): Name of the Chroma collection - model_name (str): Embeddings model - """ - - self.model_name = model_name - super().__init__(collection_name=collection_name) - - def create(self) -> None: - """Create local embedding model and initialize the ChromaDB client.""" - # Load the sentence transformer model - # Use CUDA if available, otherwise fall back to CPU - device = "cuda" if torch.cuda.is_available() else "cpu" - print(f"Using device: {device}") - self.model = SentenceTransformer(self.model_name, device=device) - - # Create a custom embedding class that implements the embed_query method - class SentenceTransformerEmbeddings: - def __init__(self, model) -> None: - self.model = model - - def embed_query(self, text: str): - """Embed a single query text.""" - return self.model.encode(text, normalize_embeddings=True).tolist() - - def embed_documents(self, texts: Sequence[str]): - """Embed multiple documents/texts.""" - return self.model.encode(texts, normalize_embeddings=True).tolist() - - # Create an instance of our custom embeddings class - self.embeddings = SentenceTransformerEmbeddings(self.model) - - # Create the database - self.db_connection = Chroma( - collection_name=self.collection_name, - embedding_function=self.embeddings, - collection_metadata={"hnsw:space": "cosine"}, - ) diff --git a/dimos/agents/memory/visual_memory.py b/dimos/agents/memory/visual_memory.py index 90f1272fef..9f494c53c0 100644 --- a/dimos/agents/memory/visual_memory.py +++ b/dimos/agents/memory/visual_memory.py @@ -100,18 +100,6 @@ def get(self, image_id: str) -> np.ndarray | None: logger.warning(f"Failed to decode image for ID {image_id}: {e!s}") return None - def contains(self, image_id: str) -> bool: - """ - Check if an image ID exists in visual memory. - - Args: - image_id: Unique identifier for the image - - Returns: - True if the image exists, False otherwise - """ - return image_id in self.images - def count(self) -> int: """ Get the number of images in visual memory. diff --git a/dimos/agents/modules/base_agent.py b/dimos/agents/modules/base_agent.py index 0bceb1112e..77aefe2b36 100644 --- a/dimos/agents/modules/base_agent.py +++ b/dimos/agents/modules/base_agent.py @@ -15,7 +15,6 @@ """Base agent module that wraps BaseAgent for DimOS module usage.""" import threading -from typing import Any from dimos.agents.agent_message import AgentMessage from dimos.agents.agent_types import AgentResponse @@ -147,13 +146,6 @@ def stop(self) -> None: logger.info("Agent module stopped") super().stop() - @rpc - def clear_history(self) -> None: - """Clear conversation history.""" - with self._history_lock: - self.history = [] - logger.info("Conversation history cleared") - @rpc def add_skill(self, skill: AbstractSkill) -> None: """Add a skill to the agent.""" @@ -166,12 +158,6 @@ def set_system_prompt(self, prompt: str) -> None: self.system_prompt = prompt logger.info("System prompt updated") - @rpc - def get_conversation_history(self) -> list[dict[str, Any]]: - """Get current conversation history.""" - with self._history_lock: - return self.history.copy() - def _handle_agent_message(self, message: AgentMessage) -> None: """Handle AgentMessage from module input.""" # Process through BaseAgent query method @@ -182,30 +168,3 @@ def _handle_agent_message(self, message: AgentMessage) -> None: except Exception as e: logger.error(f"Agent message processing error: {e}") self.response_subject.on_error(e) - - def _handle_module_query(self, query: str) -> None: - """Handle legacy query from module input.""" - # For simple text queries, just convert to AgentMessage - agent_msg = AgentMessage() - agent_msg.add_text(query) - - # Process through unified handler - self._handle_agent_message(agent_msg) - - def _update_latest_data(self, data: dict[str, Any]) -> None: - """Update latest data context.""" - with self._data_lock: - self._latest_data = data - - def _update_latest_image(self, img: Any) -> None: - """Update latest image.""" - with self._image_lock: - self._latest_image = img - - def _format_data_context(self, data: dict[str, Any]) -> str: - """Format data dictionary as context string.""" - # Simple formatting - can be customized - parts = [] - for key, value in data.items(): - parts.append(f"{key}: {value}") - return "\n".join(parts) diff --git a/dimos/agents/modules/gateway/client.py b/dimos/agents/modules/gateway/client.py index 6d8abf5e14..a53224f101 100644 --- a/dimos/agents/modules/gateway/client.py +++ b/dimos/agents/modules/gateway/client.py @@ -21,7 +21,6 @@ from types import TracebackType from typing import Any -import httpx from tenacity import retry, stop_after_attempt, wait_exponential from .tensorzero_embedded import TensorZeroEmbeddedGateway @@ -61,26 +60,6 @@ def __init__( logger.error(f"Failed to initialize TensorZero: {e}") raise - def _get_client(self) -> httpx.Client: - """Get or create sync HTTP client.""" - if self._client is None: - self._client = httpx.Client( - base_url=self.gateway_url, - timeout=self.timeout, - headers={"Content-Type": "application/json"}, - ) - return self._client - - def _get_async_client(self) -> httpx.AsyncClient: - """Get or create async HTTP client.""" - if self._async_client is None: - self._async_client = httpx.AsyncClient( - base_url=self.gateway_url, - timeout=self.timeout, - headers={"Content-Type": "application/json"}, - ) - return self._async_client - @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) def inference( self, diff --git a/dimos/agents/modules/gateway/utils.py b/dimos/agents/modules/gateway/utils.py index ac9dc3e364..9824f90c31 100644 --- a/dimos/agents/modules/gateway/utils.py +++ b/dimos/agents/modules/gateway/utils.py @@ -100,57 +100,3 @@ def parse_streaming_response(chunk: dict[str, Any]) -> dict[str, Any]: # Default fallback return {"type": "unknown", "content": chunk, "metadata": {}} - - -def create_tool_response(tool_id: str, result: Any, is_error: bool = False) -> dict[str, Any]: - """Create a properly formatted tool response. - - Args: - tool_id: The ID of the tool call - result: The result from executing the tool - is_error: Whether this is an error response - - Returns: - Formatted tool response message - """ - content = str(result) if not isinstance(result, str) else result - - return { - "role": "tool", - "tool_call_id": tool_id, - "content": content, - "name": None, # Will be filled by the calling code - } - - -def extract_image_from_message(message: dict[str, Any]) -> dict[str, Any] | None: - """Extract image data from a message if present. - - Args: - message: Message dict that may contain image data - - Returns: - Dict with image data and metadata, or None if no image - """ - content = message.get("content", []) - - # Handle list content (multimodal) - if isinstance(content, list): - for item in content: - if isinstance(item, dict): - # OpenAI format - if item.get("type") == "image_url": - return { - "format": "openai", - "data": item["image_url"]["url"], - "detail": item["image_url"].get("detail", "auto"), - } - # Anthropic format - elif item.get("type") == "image": - return { - "format": "anthropic", - "data": item["source"]["data"], - "media_type": item["source"].get("media_type", "image/jpeg"), - } - - return None diff --git a/dimos/agents2/temp/webcam_agent.py b/dimos/agents2/temp/webcam_agent.py index 485684d9e0..49526525c5 100644 --- a/dimos/agents2/temp/webcam_agent.py +++ b/dimos/agents2/temp/webcam_agent.py @@ -18,81 +18,18 @@ This is the migrated version using the new LangChain-based agent system. """ -from threading import Thread import time -import reactivex as rx -import reactivex.operators as ops - -from dimos.agents2 import Agent, Output, Reducer, Stream, skill +from dimos.agents2 import Agent from dimos.agents2.cli.human import HumanInput from dimos.agents2.spec import Model, Provider -from dimos.core import LCMTransport, Module, rpc, start +from dimos.core import LCMTransport, start from dimos.hardware.camera import zed from dimos.hardware.camera.module import CameraModule from dimos.hardware.camera.webcam import Webcam from dimos.msgs.geometry_msgs import Quaternion, Transform, Vector3 from dimos.msgs.sensor_msgs import CameraInfo, Image from dimos.protocol.skill.test_coordinator import SkillContainerTest -from dimos.web.robot_web_interface import RobotWebInterface - - -class WebModule(Module): - web_interface: RobotWebInterface = None - human_query: rx.subject.Subject = None - agent_response: rx.subject.Subject = None - - thread: Thread = None - - _human_messages_running = False - - def __init__(self) -> None: - super().__init__() - self.agent_response = rx.subject.Subject() - self.human_query = rx.subject.Subject() - - @rpc - def start(self) -> None: - super().start() - - text_streams = { - "agent_responses": self.agent_response, - } - - self.web_interface = RobotWebInterface( - port=5555, - text_streams=text_streams, - audio_subject=rx.subject.Subject(), - ) - - unsub = self.web_interface.query_stream.subscribe(self.human_query.on_next) - self._disposables.add(unsub) - - self.thread = Thread(target=self.web_interface.run, daemon=True) - self.thread.start() - - @rpc - def stop(self) -> None: - if self.web_interface: - self.web_interface.stop() - if self.thread: - # TODO, you can't just wait for a server to close, you have to signal it to end. - self.thread.join(timeout=1.0) - - super().stop() - - @skill(stream=Stream.call_agent, reducer=Reducer.all, output=Output.human) - def human_messages(self): - """Provide human messages from web interface. Don't use this tool, it's running implicitly already""" - if self._human_messages_running: - print("human_messages already running, not starting another") - return "already running" - self._human_messages_running = True - while True: - print("Waiting for human message...") - message = self.human_query.pipe(ops.first()).run() - print(f"Got human message: {message}") - yield message def main() -> None: diff --git a/dimos/exceptions/agent_memory_exceptions.py b/dimos/exceptions/agent_memory_exceptions.py index 073e56c643..c2e1d3d49f 100644 --- a/dimos/exceptions/agent_memory_exceptions.py +++ b/dimos/exceptions/agent_memory_exceptions.py @@ -75,19 +75,3 @@ def __init__( self, message: str = "Error in retrieving data during AgentMemory operation" ) -> None: super().__init__(message) - - -class DataNotFoundError(DataRetrievalError): - """ - Exception raised when the requested data is not found in the database. - This is used when a query completes successfully but returns no result for the specified identifier. - - Args: - vector_id (int or str): The identifier for the vector that was not found. - message (str, optional): Human-readable message providing more detail. If not provided, a default message is generated. - """ - - def __init__(self, vector_id, message=None) -> None: - message = message or f"Requested data for vector ID {vector_id} was not found." - super().__init__(message) - self.vector_id = vector_id diff --git a/dimos/hardware/end_effector.py b/dimos/hardware/end_effector.py index 1c5eb08281..859ea97682 100644 --- a/dimos/hardware/end_effector.py +++ b/dimos/hardware/end_effector.py @@ -16,6 +16,3 @@ class EndEffector: def __init__(self, effector_type=None) -> None: self.effector_type = effector_type - - def get_effector_type(self): - return self.effector_type diff --git a/dimos/hardware/piper_arm.py b/dimos/hardware/piper_arm.py index e6e9da693d..82196ceace 100644 --- a/dimos/hardware/piper_arm.py +++ b/dimos/hardware/piper_arm.py @@ -488,38 +488,4 @@ def get_key(timeout: float = 0.1): finally: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) - def teleop_linear_vel(arm) -> None: - print("Use arrow keys to control linear velocity (x/y/z). Press 'q' to quit.") - print("Up/Down: +x/-x, Left/Right: +y/-y, 'w'/'s': +z/-z") - x_dot, y_dot, z_dot = 0.0, 0.0, 0.0 - while True: - key = get_key(timeout=0.1) - if key == "\x1b[A": # Up arrow - x_dot += 0.01 - elif key == "\x1b[B": # Down arrow - x_dot -= 0.01 - elif key == "\x1b[C": # Right arrow - y_dot += 0.01 - elif key == "\x1b[D": # Left arrow - y_dot -= 0.01 - elif key == "w": - z_dot += 0.01 - elif key == "s": - z_dot -= 0.01 - elif key == "q": - logger.info("Exiting teleop") - arm.disable() - break - - # Optionally, clamp velocities to reasonable limits - x_dot = max(min(x_dot, 0.5), -0.5) - y_dot = max(min(y_dot, 0.5), -0.5) - z_dot = max(min(z_dot, 0.5), -0.5) - - # Only linear velocities, angular set to zero - arm.cmd_vel_ee(x_dot, y_dot, z_dot, 0, 0, 0) - logger.debug( - f"Current linear velocity: x={x_dot:.3f} m/s, y={y_dot:.3f} m/s, z={z_dot:.3f} m/s" - ) - run_velocity_controller() diff --git a/dimos/models/depth/metric3d.py b/dimos/models/depth/metric3d.py index 0c10f31e63..3eeac985ba 100644 --- a/dimos/models/depth/metric3d.py +++ b/dimos/models/depth/metric3d.py @@ -77,13 +77,6 @@ def infer_depth(self, img, debug: bool=False): return depth_image.cpu().numpy() - def save_depth(self, pred_depth) -> None: - # Save the depth map to a file - pred_depth_np = pred_depth.cpu().numpy() - output_depth_file = "output_depth_map.png" - cv2.imwrite(output_depth_file, pred_depth_np) - print(f"Depth map saved to {output_depth_file}") - # Adjusts input size to fit pretrained ViT model def rescale_input(self, rgb, rgb_origin): #### ajust input size to fit pretrained model @@ -155,13 +148,3 @@ def unpad_transform_depth(self, pred_depth): def update_intrinsic(self, intrinsic) -> None: self.intrinsic = intrinsic - def eval_predicted_depth(self, depth_file, pred_depth) -> None: - if depth_file is not None: - gt_depth = cv2.imread(depth_file, -1) - gt_depth = gt_depth / self.gt_depth_scale - gt_depth = torch.from_numpy(gt_depth).float().cuda() - assert gt_depth.shape == pred_depth.shape - - mask = gt_depth > 1e-8 - abs_rel_err = (torch.abs(pred_depth[mask] - gt_depth[mask]) / gt_depth[mask]).mean() - print("abs_rel_err:", abs_rel_err.item()) diff --git a/dimos/models/pointcloud/pointcloud_utils.py b/dimos/models/pointcloud/pointcloud_utils.py index 33b4b59607..8858142a2e 100644 --- a/dimos/models/pointcloud/pointcloud_utils.py +++ b/dimos/models/pointcloud/pointcloud_utils.py @@ -14,86 +14,6 @@ import random -import numpy as np -import open3d as o3d - - -def save_pointcloud(pcd, file_path) -> None: - """ - Save a point cloud to a file using Open3D. - """ - o3d.io.write_point_cloud(file_path, pcd) - - -def restore_pointclouds(pointcloud_paths): - restored_pointclouds = [] - for path in pointcloud_paths: - restored_pointclouds.append(o3d.io.read_point_cloud(path)) - return restored_pointclouds - - -def create_point_cloud_from_rgbd(rgb_image, depth_image, intrinsic_parameters): - rgbd_image = o3d.geometry.RGBDImage.create_from_color_and_depth( - o3d.geometry.Image(rgb_image), - o3d.geometry.Image(depth_image), - depth_scale=0.125, # 1000.0, - depth_trunc=10.0, # 10.0, - convert_rgb_to_intensity=False, - ) - intrinsic = o3d.camera.PinholeCameraIntrinsic() - intrinsic.set_intrinsics( - intrinsic_parameters["width"], - intrinsic_parameters["height"], - intrinsic_parameters["fx"], - intrinsic_parameters["fy"], - intrinsic_parameters["cx"], - intrinsic_parameters["cy"], - ) - pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd_image, intrinsic) - return pcd - - -def canonicalize_point_cloud(pcd, canonicalize_threshold: float=0.3): - # Segment the largest plane, assumed to be the floor - plane_model, inliers = pcd.segment_plane( - distance_threshold=0.01, ransac_n=3, num_iterations=1000 - ) - - canonicalized = False - if len(inliers) / len(pcd.points) > canonicalize_threshold: - canonicalized = True - - # Ensure the plane normal points upwards - if np.dot(plane_model[:3], [0, 1, 0]) < 0: - plane_model = -plane_model - - # Normalize the plane normal vector - normal = plane_model[:3] / np.linalg.norm(plane_model[:3]) - - # Compute the new basis vectors - new_y = normal - new_x = np.cross(new_y, [0, 0, -1]) - new_x /= np.linalg.norm(new_x) - new_z = np.cross(new_x, new_y) - - # Create the transformation matrix - transformation = np.identity(4) - transformation[:3, :3] = np.vstack((new_x, new_y, new_z)).T - transformation[:3, 3] = -np.dot(transformation[:3, :3], pcd.points[inliers[0]]) - - # Apply the transformation - pcd.transform(transformation) - - # Additional 180-degree rotation around the Z-axis - rotation_z_180 = np.array( - [[np.cos(np.pi), -np.sin(np.pi), 0], [np.sin(np.pi), np.cos(np.pi), 0], [0, 0, 1]] - ) - pcd.rotate(rotation_z_180, center=(0, 0, 0)) - - return pcd, canonicalized, transformation - else: - return pcd, canonicalized, None - # Distance calculations def human_like_distance(distance_meters) -> str: @@ -152,36 +72,6 @@ def human_like_distance(distance_meters) -> str: return f"{choices[-1][0]} {choices[-1][1]}" -def calculate_distances_between_point_clouds(A, B): - dist_pcd1_to_pcd2 = np.asarray(A.compute_point_cloud_distance(B)) - dist_pcd2_to_pcd1 = np.asarray(B.compute_point_cloud_distance(A)) - combined_distances = np.concatenate((dist_pcd1_to_pcd2, dist_pcd2_to_pcd1)) - avg_dist = np.mean(combined_distances) - return human_like_distance(avg_dist) - - -def calculate_centroid(pcd): - """Calculate the centroid of a point cloud.""" - points = np.asarray(pcd.points) - centroid = np.mean(points, axis=0) - return centroid - - -def calculate_relative_positions(centroids): - """Calculate the relative positions between centroids of point clouds.""" - num_centroids = len(centroids) - relative_positions_info = [] - - for i in range(num_centroids): - for j in range(i + 1, num_centroids): - relative_vector = centroids[j] - centroids[i] - - distance = np.linalg.norm(relative_vector) - relative_positions_info.append( - {"pcd_pair": (i, j), "relative_vector": relative_vector, "distance": distance} - ) - - return relative_positions_info def get_bounding_box_height(pcd): @@ -198,18 +88,3 @@ def get_bounding_box_height(pcd): return aabb.get_extent()[1] # Assuming the Y-axis is the up-direction -def compare_bounding_box_height(pcd_i, pcd_j): - """ - Compare the bounding box heights of two point clouds. - - Parameters: - pcd_i (open3d.geometry.PointCloud): The first point cloud. - pcd_j (open3d.geometry.PointCloud): The second point cloud. - - Returns: - bool: True if the bounding box of pcd_i is taller than that of pcd_j, False otherwise. - """ - height_i = get_bounding_box_height(pcd_i) - height_j = get_bounding_box_height(pcd_j) - - return height_i > height_j diff --git a/dimos/models/qwen/video_query.py b/dimos/models/qwen/video_query.py index 80bb078bac..5cecc71a36 100644 --- a/dimos/models/qwen/video_query.py +++ b/dimos/models/qwen/video_query.py @@ -201,41 +201,3 @@ def get_bbox_from_qwen( return None -def get_bbox_from_qwen_frame(frame, object_name: str | None = None) -> BBox | None: - """Get bounding box coordinates from Qwen for a specific object or any object using a single frame. - - Args: - frame: A single image frame (numpy array in RGB format) - object_name: Optional name of object to detect - - Returns: - BBox: Bounding box as (x1, y1, x2, y2) or None if no detection - """ - # Ensure frame is numpy array - if not isinstance(frame, np.ndarray): - raise ValueError("Frame must be a numpy array") - - prompt = ( - f"Look at this image and find the {object_name if object_name else 'most prominent object'}. " - "Return ONLY a JSON object with format: {'name': 'object_name', 'bbox': [x1, y1, x2, y2]} " - "where x1,y1 is the top-left and x2,y2 is the bottom-right corner of the bounding box. If not found, return None." - ) - - response = query_single_frame(frame, prompt) - - try: - # Extract JSON from response - start_idx = response.find("{") - end_idx = response.rfind("}") + 1 - if start_idx >= 0 and end_idx > start_idx: - json_str = response[start_idx:end_idx] - result = json.loads(json_str) - - # Extract and validate bbox - if "bbox" in result and len(result["bbox"]) == 4: - return tuple(result["bbox"]) # Convert list to tuple - except Exception as e: - print(f"Error parsing Qwen response: {e}") - print(f"Raw response: {response}") - - return None diff --git a/dimos/models/segmentation/segment_utils.py b/dimos/models/segmentation/segment_utils.py index 59a805afaa..7153b3cf80 100644 --- a/dimos/models/segmentation/segment_utils.py +++ b/dimos/models/segmentation/segment_utils.py @@ -13,7 +13,6 @@ # limitations under the License. import numpy as np -import torch def find_medoid_and_closest_points(points, num_closest: int=5): @@ -37,37 +36,3 @@ def find_medoid_and_closest_points(points, num_closest: int=5): return medoid, points[closest_indices] -def sample_points_from_heatmap(heatmap, original_size: int, num_points: int=5, percentile: float=0.95): - """ - Sample points from the given heatmap, focusing on areas with higher values. - """ - width, height = original_size - threshold = np.percentile(heatmap.numpy(), percentile) - masked_heatmap = torch.where(heatmap > threshold, heatmap, torch.tensor(0.0)) - probabilities = torch.softmax(masked_heatmap.flatten(), dim=0) - - attn = torch.sigmoid(heatmap) - w = attn.shape[0] - sampled_indices = torch.multinomial( - torch.tensor(probabilities.ravel()), num_points, replacement=True - ) - - sampled_coords = np.array(np.unravel_index(sampled_indices, attn.shape)).T - _medoid, sampled_coords = find_medoid_and_closest_points(sampled_coords) - pts = [] - for pt in sampled_coords.tolist(): - x, y = pt - x = height * x / w - y = width * y / w - pts.append([y, x]) - return pts - - -def apply_mask_to_image(image, mask): - """ - Apply a binary mask to an image. The mask should be a binary array where the regions to keep are True. - """ - masked_image = image.copy() - for c in range(masked_image.shape[2]): - masked_image[:, :, c] = masked_image[:, :, c] * mask - return masked_image diff --git a/dimos/models/vl/moondream_hosted.py b/dimos/models/vl/moondream_hosted.py index 1bf0f43e67..5f0decf794 100644 --- a/dimos/models/vl/moondream_hosted.py +++ b/dimos/models/vl/moondream_hosted.py @@ -1,6 +1,6 @@ +from functools import cached_property import os import warnings -from functools import cached_property import moondream as md import numpy as np @@ -34,19 +34,19 @@ def _to_pil_image(self, image: Image | np.ndarray) -> PILImage.Image: stacklevel=3, ) image = Image.from_numpy(image) - + rgb_image = image.to_rgb() return PILImage.fromarray(rgb_image.data) def query(self, image: Image | np.ndarray, query: str, **kwargs) -> str: pil_image = self._to_pil_image(image) - + result = self._client.query(pil_image, query) return result.get("answer", str(result)) def caption(self, image: Image | np.ndarray, length: str = "normal") -> str: """Generate a caption for the image. - + Args: image: Input image length: Caption length ("normal", "short", "long") @@ -61,14 +61,14 @@ def query_detections(self, image: Image, query: str, **kwargs) -> ImageDetection Args: image: Input image query: Object query (e.g., "person", "car") - max_objects: Maximum number of objects to detect (not directly supported by hosted API args in docs, + max_objects: Maximum number of objects to detect (not directly supported by hosted API args in docs, but we handle the output) Returns: ImageDetections2D containing detected bounding boxes """ pil_image = self._to_pil_image(image) - + # API docs: detect(image, object) -> {"objects": [...]} result = self._client.detect(pil_image, query) objects = result.get("objects", []) @@ -109,25 +109,25 @@ def query_detections(self, image: Image, query: str, **kwargs) -> ImageDetection def point(self, image: Image, query: str) -> list[tuple[float, float]]: """Get coordinates of specific objects in an image. - + Args: image: Input image query: Object query - + Returns: List of (x, y) pixel coordinates """ pil_image = self._to_pil_image(image) result = self._client.point(pil_image, query) points = result.get("points", []) - + pixel_points = [] height, width = image.height, image.width - + for p in points: x_norm = p.get("x", 0.0) y_norm = p.get("y", 0.0) pixel_points.append((x_norm * width, y_norm * height)) - + return pixel_points diff --git a/dimos/models/vl/test_moondream_hosted.py b/dimos/models/vl/test_moondream_hosted.py index dd18b993a6..1f3d59d1b9 100644 --- a/dimos/models/vl/test_moondream_hosted.py +++ b/dimos/models/vl/test_moondream_hosted.py @@ -1,13 +1,15 @@ import os import time + import pytest + from dimos.models.vl.moondream_hosted import MoondreamHostedVlModel from dimos.msgs.sensor_msgs import Image from dimos.perception.detection.type import ImageDetections2D # Skip all tests in this module if API key is missing pytestmark = pytest.mark.skipif( - not os.getenv("MOONDREAM_API_KEY"), + not os.getenv("MOONDREAM_API_KEY"), reason="MOONDREAM_API_KEY not set" ) @@ -22,7 +24,7 @@ def test_image(): pytest.skip(f"Test image not found at {image_path}") return Image.from_file(image_path) -def test_caption(model, test_image): +def test_caption(model, test_image) -> None: """Test generating a caption.""" print("\n--- Testing Caption ---") caption = model.caption(test_image) @@ -30,7 +32,7 @@ def test_caption(model, test_image): assert isinstance(caption, str) assert len(caption) > 0 -def test_query(model, test_image): +def test_query(model, test_image) -> None: """Test querying the image.""" print("\n--- Testing Query ---") question = "Is there an xbox controller in the image?" @@ -42,35 +44,35 @@ def test_query(model, test_image): # The answer should likely be positive given the user's prompt assert "yes" in answer.lower() or "controller" in answer.lower() -def test_query_latency(model, test_image): +def test_query_latency(model, test_image) -> None: """Test that a simple query returns in under 1 second.""" print("\n--- Testing Query Latency ---") question = "What is this?" - + # Warmup (optional, but good practice if first call establishes connection) - # model.query(test_image, "warmup") - + # model.query(test_image, "warmup") + start_time = time.perf_counter() model.query(test_image, question) end_time = time.perf_counter() - + duration = end_time - start_time print(f"Query took {duration:.4f} seconds") - + assert duration < 1.0, f"Query took too long: {duration:.4f}s > 1.0s" @pytest.mark.parametrize("subject", ["xbox controller", "lip balm"]) -def test_detect(model, test_image, subject): +def test_detect(model, test_image, subject: str) -> None: """Test detecting objects.""" print(f"\n--- Testing Detect: {subject} ---") detections = model.query_detections(test_image, subject) - + assert isinstance(detections, ImageDetections2D) print(f"Found {len(detections.detections)} detections for {subject}") - + # We expect to find at least one of each in the provided test image assert len(detections.detections) > 0 - + for det in detections.detections: assert det.is_valid() assert det.name == subject @@ -80,15 +82,15 @@ def test_detect(model, test_image, subject): assert 0 <= y1 < y2 <= test_image.height @pytest.mark.parametrize("subject", ["xbox controller", "lip balm"]) -def test_point(model, test_image, subject): +def test_point(model, test_image, subject: str) -> None: """Test pointing at objects.""" print(f"\n--- Testing Point: {subject} ---") points = model.point(test_image, subject) - + print(f"Found {len(points)} points for {subject}: {points}") assert isinstance(points, list) assert len(points) > 0 - + for x, y in points: assert isinstance(x, (int, float)) assert isinstance(y, (int, float)) diff --git a/dimos/navigation/frontier_exploration/utils.py b/dimos/navigation/frontier_exploration/utils.py index d307749531..04c0118b13 100644 --- a/dimos/navigation/frontier_exploration/utils.py +++ b/dimos/navigation/frontier_exploration/utils.py @@ -17,9 +17,8 @@ """ import numpy as np -from PIL import Image, ImageDraw +from PIL import Image -from dimos.msgs.geometry_msgs import Vector3 from dimos.msgs.nav_msgs import CostValues, OccupancyGrid @@ -62,77 +61,3 @@ def costmap_to_pil_image(costmap: OccupancyGrid, scale_factor: int = 2) -> Image img = img.resize(new_size, Image.NEAREST) # Use NEAREST to keep sharp pixels return img - - -def draw_frontiers_on_image( - image: Image.Image, - costmap: OccupancyGrid, - frontiers: list[Vector3], - scale_factor: int = 2, - unfiltered_frontiers: list[Vector3] | None = None, -) -> Image.Image: - """ - Draw frontier points on the costmap image. - - Args: - image: PIL Image to draw on - costmap: Original costmap for coordinate conversion - frontiers: List of frontier centroids (top 5) - scale_factor: Scaling factor used for the image - unfiltered_frontiers: All unfiltered frontier results (light green) - - Returns: - PIL Image with frontiers drawn - """ - img_copy = image.copy() - draw = ImageDraw.Draw(img_copy) - - def world_to_image_coords(world_pos: Vector3) -> tuple[int, int]: - """Convert world coordinates to image pixel coordinates.""" - grid_pos = costmap.world_to_grid(world_pos) - # Flip Y coordinate and apply scaling - img_x = int(grid_pos.x * scale_factor) - img_y = int((costmap.height - grid_pos.y) * scale_factor) # Flip Y - return img_x, img_y - - # Draw all unfiltered frontiers as light green circles - if unfiltered_frontiers: - for frontier in unfiltered_frontiers: - x, y = world_to_image_coords(frontier) - radius = 3 * scale_factor - draw.ellipse( - [x - radius, y - radius, x + radius, y + radius], - fill=(144, 238, 144), - outline=(144, 238, 144), - ) # Light green - - # Draw top 5 frontiers as green circles - for i, frontier in enumerate(frontiers[1:]): # Skip the best one for now - x, y = world_to_image_coords(frontier) - radius = 4 * scale_factor - draw.ellipse( - [x - radius, y - radius, x + radius, y + radius], - fill=(0, 255, 0), - outline=(0, 128, 0), - width=2, - ) # Green - - # Add number label - draw.text((x + radius + 2, y - radius), str(i + 2), fill=(0, 255, 0)) - - # Draw best frontier as red circle - if frontiers: - best_frontier = frontiers[0] - x, y = world_to_image_coords(best_frontier) - radius = 6 * scale_factor - draw.ellipse( - [x - radius, y - radius, x + radius, y + radius], - fill=(255, 0, 0), - outline=(128, 0, 0), - width=3, - ) # Red - - # Add "BEST" label - draw.text((x + radius + 2, y - radius), "BEST", fill=(255, 0, 0)) - - return img_copy diff --git a/dimos/navigation/local_planner/holonomic_local_planner.py b/dimos/navigation/local_planner/holonomic_local_planner.py index acb8dcec98..cdbd66dbdf 100644 --- a/dimos/navigation/local_planner/holonomic_local_planner.py +++ b/dimos/navigation/local_planner/holonomic_local_planner.py @@ -264,10 +264,6 @@ def _find_lookahead_point(self, path: np.ndarray, start_idx: int) -> np.ndarray: return path[-1] - def _clip(self, v: np.ndarray) -> np.ndarray: - """Instance method to clip velocity with access to v_max.""" - return np.clip(v, -self.v_max, self.v_max) - holonomic_local_planner = HolonomicLocalPlanner.blueprint diff --git a/dimos/navigation/rosnav.py b/dimos/navigation/rosnav.py index bd67fbd532..2e189f0c24 100644 --- a/dimos/navigation/rosnav.py +++ b/dimos/navigation/rosnav.py @@ -51,7 +51,6 @@ ) from dimos.msgs.nav_msgs import Path from dimos.msgs.sensor_msgs import PointCloud2 -from dimos.msgs.std_msgs import Bool from dimos.msgs.tf2_msgs.TFMessage import TFMessage from dimos.navigation.base import NavigationInterface, NavigationState from dimos.utils.logging_config import setup_logger @@ -236,10 +235,6 @@ def _on_ros_tf(self, msg: ROSTFMessage) -> None: def _on_goal_pose(self, msg: PoseStamped) -> None: self.navigate_to(msg) - def _on_cancel_goal(self, msg: Bool) -> None: - if msg.data: - self.stop() - def _set_autonomy_mode(self) -> None: joy_msg = ROSJoy() joy_msg.axes = [ diff --git a/dimos/perception/common/ibvs.py b/dimos/perception/common/ibvs.py index 2978aff84f..3e2423ac72 100644 --- a/dimos/perception/common/ibvs.py +++ b/dimos/perception/common/ibvs.py @@ -181,15 +181,6 @@ def estimate_object_size(self, bbox: tuple, distance: float): return estimated_size - def set_estimated_object_size(self, size: float) -> None: - """ - Set the estimated object size for future distance calculations. - - Args: - size: Estimated physical size of the object (in meters) - """ - self.estimated_object_size = size - def estimate_distance_angle(self, bbox: tuple): """ Estimate distance and angle to object using size-based estimation. diff --git a/dimos/perception/common/utils.py b/dimos/perception/common/utils.py index 2676206bd7..9e14033ac5 100644 --- a/dimos/perception/common/utils.py +++ b/dimos/perception/common/utils.py @@ -21,7 +21,6 @@ import torch import yaml -from dimos.msgs.geometry_msgs import Pose, Quaternion, Vector3 from dimos.msgs.sensor_msgs import Image from dimos.msgs.std_msgs import Header from dimos.types.manipulation import ObjectData @@ -860,22 +859,6 @@ def combine_object_data( return combined -def point_in_bbox(point: tuple[int, int], bbox: list[float]) -> bool: - """ - Check if a point is inside a bounding box. - - Args: - point: (x, y) coordinates - bbox: Bounding box [x1, y1, x2, y2] - - Returns: - True if point is inside bbox - """ - x, y = point - x1, y1, x2, y2 = bbox - return x1 <= x <= x2 and y1 <= y <= y2 - - def bbox2d_to_corners(bbox_2d: BoundingBox2D) -> tuple[float, float, float, float]: """ Convert BoundingBox2D from center format to corner format. @@ -923,27 +906,3 @@ def find_clicked_detection( return detections_3d[i] return None - - -def extract_pose_from_detection3d(detection3d: Detection3D): - """Extract PoseStamped from Detection3D message. - - Args: - detection3d: Detection3D message - - Returns: - Pose or None if no valid detection - """ - if not detection3d or not detection3d.bbox or not detection3d.bbox.center: - return None - - # Extract position - pos = detection3d.bbox.center.position - position = Vector3(pos.x, pos.y, pos.z) - - # Extract orientation - orient = detection3d.bbox.center.orientation - orientation = Quaternion(orient.x, orient.y, orient.z, orient.w) - - pose = Pose(position=position, orientation=orientation) - return pose diff --git a/dimos/perception/detection/moduleDB.py b/dimos/perception/detection/moduleDB.py index 88547ed427..5669937231 100644 --- a/dimos/perception/detection/moduleDB.py +++ b/dimos/perception/detection/moduleDB.py @@ -266,18 +266,10 @@ def agent_encode(self) -> str: # return ret[0] if ret else None - def lookup(self, label: str) -> list[Detection3DPC]: - """Look up a detection by label.""" - return [] - @rpc def stop(self): return super().stop() - def goto_object(self, object_id: str) -> Object3D | None: - """Go to object by id.""" - return self.objects.get(object_id, None) - def to_foxglove_scene_update(self) -> "SceneUpdate": """Convert all detections to a Foxglove SceneUpdate message. diff --git a/dimos/perception/pointcloud/cuboid_fit.py b/dimos/perception/pointcloud/cuboid_fit.py index 376ae08da0..ea2787abc7 100644 --- a/dimos/perception/pointcloud/cuboid_fit.py +++ b/dimos/perception/pointcloud/cuboid_fit.py @@ -103,20 +103,6 @@ def fit_cuboid( return None -def fit_cuboid_simple(points: np.ndarray | o3d.geometry.PointCloud) -> dict | None: - """ - Simple wrapper for minimal oriented bounding box fitting. - - Args: - points: Nx3 array of points or Open3D PointCloud - - Returns: - Dictionary with center, dimensions, rotation, and bounding_box, - or None if insufficient points - """ - return fit_cuboid(points, method="minimal") - - def _compute_fitting_error( points: np.ndarray, center: np.ndarray, dimensions: np.ndarray, rotation: np.ndarray ) -> float: @@ -352,63 +338,3 @@ def compute_cuboid_surface_area(cuboid_params: dict) -> float: dims = cuboid_params["dimensions"] return 2.0 * (dims[0] * dims[1] + dims[1] * dims[2] + dims[2] * dims[0]) - - -def check_cuboid_quality(cuboid_params: dict, points: np.ndarray) -> dict: - """ - Assess the quality of a cuboid fit. - - Args: - cuboid_params: Dictionary containing cuboid parameters - points: Original points used for fitting - - Returns: - Dictionary with quality metrics - """ - if len(points) == 0: - return {"error": "No points provided"} - - # Basic metrics - volume = compute_cuboid_volume(cuboid_params) - surface_area = compute_cuboid_surface_area(cuboid_params) - error = cuboid_params.get("error", 0.0) - - # Aspect ratio analysis - dims = cuboid_params["dimensions"] - aspect_ratios = [ - dims[0] / dims[1] if dims[1] > 0 else float("inf"), - dims[1] / dims[2] if dims[2] > 0 else float("inf"), - dims[2] / dims[0] if dims[0] > 0 else float("inf"), - ] - max_aspect_ratio = max(aspect_ratios) - - # Volume ratio (cuboid volume vs convex hull volume) - try: - pcd = o3d.geometry.PointCloud() - pcd.points = o3d.utility.Vector3dVector(points) - hull, _ = pcd.compute_convex_hull() - hull_volume = hull.get_volume() - volume_ratio = volume / hull_volume if hull_volume > 0 else float("inf") - except: - volume_ratio = None - - return { - "fitting_error": error, - "volume": volume, - "surface_area": surface_area, - "max_aspect_ratio": max_aspect_ratio, - "volume_ratio": volume_ratio, - "num_points": len(points), - "method": cuboid_params.get("method", "unknown"), - } - - -# Backward compatibility -def visualize_fit(image, cuboid_params, camera_matrix, R=None, t=None): - """ - Legacy function for backward compatibility. - Use visualize_cuboid_on_image instead. - """ - return visualize_cuboid_on_image( - image, cuboid_params, camera_matrix, R, t, show_dimensions=True - ) diff --git a/dimos/perception/segmentation/utils.py b/dimos/perception/segmentation/utils.py index 24d6ce4bf2..4efceaa573 100644 --- a/dimos/perception/segmentation/utils.py +++ b/dimos/perception/segmentation/utils.py @@ -57,10 +57,6 @@ def update(self, track_ids): # Return IDs that appear often enough return [track_id for track_id, count in id_counts.items() if count >= self.min_count] - def get_total_counts(self): - """Returns the total count of each tracking ID seen over time, limited to history size.""" - return self.total_counts - def extract_masks_bboxes_probs_names(result, max_size: float = 0.7): """ diff --git a/dimos/robot/position_stream.py b/dimos/robot/position_stream.py index 8cb5966b24..476b325223 100644 --- a/dimos/robot/position_stream.py +++ b/dimos/robot/position_stream.py @@ -145,15 +145,6 @@ def get_position_stream(self) -> Observable: ops.share() # Share the stream among multiple subscribers ) - def get_current_position(self) -> tuple[float, float] | None: - """ - Get the most recent position. - - Returns: - Tuple of (x, y) coordinates, or None if no position has been received - """ - return self.last_position - def cleanup(self) -> None: """Clean up resources.""" if hasattr(self, "subscription") and self.subscription: diff --git a/dimos/robot/ros_command_queue.py b/dimos/robot/ros_command_queue.py index 770f44e1a6..078df660b7 100644 --- a/dimos/robot/ros_command_queue.py +++ b/dimos/robot/ros_command_queue.py @@ -41,17 +41,6 @@ class CommandType(Enum): ACTION = auto() # Any action client or function call -class WebRTCRequest(NamedTuple): - """Class to represent a WebRTC request in the queue""" - - id: str # Unique ID for tracking - api_id: int # API ID for the command - topic: str # Topic to publish to - parameter: str # Optional parameter string - priority: int # Priority level - timeout: float # How long to wait for this request to complete - - class ROSCommand(NamedTuple): """Class to represent a command in the queue""" diff --git a/dimos/robot/ros_control.py b/dimos/robot/ros_control.py index 2e9eb95204..025fcfed46 100644 --- a/dimos/robot/ros_control.py +++ b/dimos/robot/ros_control.py @@ -265,24 +265,6 @@ def __init__( logger.info(f"{node_name} initialized with multi-threaded executor") print(f"{node_name} initialized with multi-threaded executor") - def get_global_costmap(self) -> OccupancyGrid | None: - """ - Get current global_costmap data - - Returns: - Optional[OccupancyGrid]: Current global_costmap data or None if not available - """ - if not self._global_costmap_topic: - logger.warning( - "No global_costmap topic provided - global_costmap data tracking will be unavailable" - ) - return None - - if self._global_costmap_data: - return self._global_costmap_data - else: - return None - def _global_costmap_callback(self, msg) -> None: """Callback for costmap data""" self._global_costmap_data = msg @@ -348,21 +330,6 @@ def get_state(self) -> Any | None: return self._robot_state - def get_imu_state(self) -> Any | None: - """ - Get current IMU state - - Base implementation provides common state fields. Child classes should - extend this method to include their specific state information. - - Returns: - ROS msg containing the IMU state information - """ - if not self._imu_topic: - logger.warning("No IMU topic provided - IMU data tracking will be unavailable") - return None - return self._imu_state - def get_odometry(self) -> Odometry | None: """ Get current odometry data diff --git a/dimos/robot/unitree/g1/g1zed.py b/dimos/robot/unitree/g1/g1zed.py index 6b6336cd0d..68db71ec8d 100644 --- a/dimos/robot/unitree/g1/g1zed.py +++ b/dimos/robot/unitree/g1/g1zed.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TypedDict, cast +from typing import cast from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE from dimos.core import DimosCluster, LCMTransport, pSHMTransport @@ -26,22 +26,13 @@ ) from dimos.msgs.sensor_msgs import CameraInfo from dimos.navigation import rosnav -from dimos.navigation.rosnav import ROSNav from dimos.robot import foxglove_bridge from dimos.robot.unitree.connection import g1 -from dimos.robot.unitree.connection.g1 import G1Connection from dimos.utils.logging_config import setup_logger logger = setup_logger(__name__) -class G1ZedDeployResult(TypedDict): - nav: ROSNav - connection: G1Connection - camera: CameraModule - camerainfo: CameraInfo - - def deploy_g1_monozed(dimos: DimosCluster) -> CameraModule: camera = cast( "CameraModule", diff --git a/dimos/robot/unitree_webrtc/testing/helpers.py b/dimos/robot/unitree_webrtc/testing/helpers.py index 5159deab4c..5f4845176e 100644 --- a/dimos/robot/unitree_webrtc/testing/helpers.py +++ b/dimos/robot/unitree_webrtc/testing/helpers.py @@ -14,10 +14,9 @@ from collections.abc import Callable, Iterable import time -from typing import Any, Protocol +from typing import Protocol import open3d as o3d -from reactivex.observable import Observable color1 = [1, 0.706, 0] color2 = [0, 0.651, 0.929] @@ -72,99 +71,3 @@ def show3d(*components: Iterable[Drawable], title: str = "open3d") -> o3d.visual vis.poll_events() vis.update_renderer() return vis - - -def multivis(*vis: o3d.visualization.Visualizer) -> None: - while True: - for v in vis: - v.poll_events() - v.update_renderer() - - -def show3d_stream( - geometry_observable: Observable[Any], - clearframe: bool = False, - title: str = "open3d", -) -> o3d.visualization.Visualizer: - """ - Visualize a stream of geometries using Open3D. The first geometry initializes the visualizer. - Subsequent geometries update the visualizer. If no new geometry, just poll events. - geometry_observable: Observable of objects with .o3d_geometry or Open3D geometry - """ - import queue - import threading - import time - from typing import Any - - q: queue.Queue[Any] = queue.Queue() - stop_flag = threading.Event() - - def on_next(geometry: O3dDrawable) -> None: - q.put(geometry) - - def on_error(e: Exception) -> None: - print(f"Visualization error: {e}") - stop_flag.set() - - def on_completed() -> None: - print("Geometry stream completed") - stop_flag.set() - - subscription = geometry_observable.subscribe( - on_next=on_next, - on_error=on_error, - on_completed=on_completed, - ) - - def geom(geometry: Drawable) -> O3dDrawable: - """Extracts the Open3D geometry from the given object.""" - return geometry.o3d_geometry if hasattr(geometry, "o3d_geometry") else geometry - - # Wait for the first geometry - first_geometry = None - while first_geometry is None and not stop_flag.is_set(): - try: - first_geometry = q.get(timeout=100) - except queue.Empty: - print("No geometry received to visualize.") - return - - scene_geometries = [] - first_geom_obj = geom(first_geometry) - - scene_geometries.append(first_geom_obj) - - vis = show3d(first_geom_obj, title=title) - - try: - while not stop_flag.is_set(): - try: - geometry = q.get_nowait() - geom_obj = geom(geometry) - if clearframe: - scene_geometries = [] - vis.clear_geometries() - - vis.add_geometry(geom_obj) - scene_geometries.append(geom_obj) - else: - if geom_obj in scene_geometries: - print("updating existing geometry") - vis.update_geometry(geom_obj) - else: - print("new geometry") - vis.add_geometry(geom_obj) - scene_geometries.append(geom_obj) - except queue.Empty: - pass - vis.poll_events() - vis.update_renderer() - time.sleep(0.1) - - except KeyboardInterrupt: - print("closing visualizer...") - stop_flag.set() - vis.destroy_window() - subscription.dispose() - - return vis diff --git a/dimos/robot/unitree_webrtc/type/map.py b/dimos/robot/unitree_webrtc/type/map.py index 65ad6ce0d9..ad276c15b5 100644 --- a/dimos/robot/unitree_webrtc/type/map.py +++ b/dimos/robot/unitree_webrtc/type/map.py @@ -22,7 +22,6 @@ from dimos.core import DimosCluster, In, LCMTransport, Module, Out, rpc from dimos.core.global_config import GlobalConfig from dimos.msgs.nav_msgs import OccupancyGrid -from dimos.msgs.sensor_msgs import PointCloud2 from dimos.robot.unitree.connection.go2 import Go2ConnectionProtocol from dimos.robot.unitree_webrtc.type.lidar import LidarMessage @@ -86,12 +85,6 @@ def publish(_) -> None: def stop(self) -> None: super().stop() - def to_PointCloud2(self) -> PointCloud2: - return PointCloud2( - pointcloud=self.pointcloud, - ts=time.time(), - ) - def to_lidar_message(self) -> LidarMessage: return LidarMessage( pointcloud=self.pointcloud, diff --git a/dimos/robot/unitree_webrtc/type/vector.py b/dimos/robot/unitree_webrtc/type/vector.py index be00e3403c..1d326329d7 100644 --- a/dimos/robot/unitree_webrtc/type/vector.py +++ b/dimos/robot/unitree_webrtc/type/vector.py @@ -365,41 +365,6 @@ def to_list(value: VectorLike) -> list[float]: return [float(value[i]) for i in range(len(value))] -# Helper functions to check dimensionality -def is_2d(value: VectorLike) -> bool: - """Check if a vector-compatible value is 2D. - - Args: - value: Any vector-like object (Vector, numpy array, tuple, list) - - Returns: - True if the value is 2D - """ - if isinstance(value, Vector): - return len(value) == 2 - elif isinstance(value, np.ndarray): - return value.shape[-1] == 2 or value.size == 2 - else: - return len(value) == 2 - - -def is_3d(value: VectorLike) -> bool: - """Check if a vector-compatible value is 3D. - - Args: - value: Any vector-like object (Vector, numpy array, tuple, list) - - Returns: - True if the value is 3D - """ - if isinstance(value, Vector): - return len(value) == 3 - elif isinstance(value, np.ndarray): - return value.shape[-1] == 3 or value.size == 3 - else: - return len(value) == 3 - - # Extraction functions for XYZ components def x(value: VectorLike) -> float: """Get the X component of a vector-compatible value. diff --git a/dimos/skills/skills.py b/dimos/skills/skills.py index 196fcf07b5..094956ab26 100644 --- a/dimos/skills/skills.py +++ b/dimos/skills/skills.py @@ -319,14 +319,6 @@ def __init__(self, *args, robot: Robot | None = None, **kwargs) -> None: f"{Colors.BLUE_PRINT_COLOR}Robot Skill Initialized with Robot: {robot}{Colors.RESET_COLOR}" ) - def set_robot(self, robot: Robot) -> None: - """Set the robot reference for this skills instance. - - Args: - robot: The robot instance to associate with these skills. - """ - self._robot = robot - def __call__(self): if self._robot is None: raise RuntimeError( diff --git a/dimos/spec/nav.py b/dimos/spec/nav.py index feb98aebf4..3979b21eb4 100644 --- a/dimos/spec/nav.py +++ b/dimos/spec/nav.py @@ -24,8 +24,3 @@ class Nav(Protocol): goal_active: Out[PoseStamped] path_active: Out[Path] ctrl: Out[Twist] - - # identity quaternion (Quaternion(0,0,0,1)) represents "no rotation requested" - def navigate_to_target(self, target: PoseStamped) -> None: ... - - def stop_navigating(self) -> None: ... diff --git a/dimos/stream/audio/node_microphone.py b/dimos/stream/audio/node_microphone.py index 1f4bf13499..39c518a5d2 100644 --- a/dimos/stream/audio/node_microphone.py +++ b/dimos/stream/audio/node_microphone.py @@ -14,7 +14,6 @@ # limitations under the License. import time -from typing import Any import numpy as np from reactivex import Observable, create, disposable @@ -118,10 +117,6 @@ def dispose() -> None: return create(on_subscribe) - def get_available_devices(self) -> list[dict[str, Any]]: - """Get a list of available audio input devices.""" - return sd.query_devices() - if __name__ == "__main__": from dimos.stream.audio.node_volume_monitor import monitor diff --git a/dimos/stream/audio/node_output.py b/dimos/stream/audio/node_output.py index 3dc93d3757..0d682c6784 100644 --- a/dimos/stream/audio/node_output.py +++ b/dimos/stream/audio/node_output.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any import numpy as np from reactivex import Observable @@ -164,10 +163,6 @@ def _handle_completion(self) -> None: self._stream.close() self._stream = None - def get_available_devices(self) -> list[dict[str, Any]]: - """Get a list of available audio output devices.""" - return sd.query_devices() - if __name__ == "__main__": from dimos.stream.audio.node_microphone import ( diff --git a/dimos/stream/data_provider.py b/dimos/stream/data_provider.py index f931857fda..25ec34f44c 100644 --- a/dimos/stream/data_provider.py +++ b/dimos/stream/data_provider.py @@ -48,59 +48,6 @@ def dispose(self) -> None: self._data_subject.dispose() -class ROSDataProvider(AbstractDataProvider): - """ReactiveX data provider for ROS topics.""" - - def __init__(self, dev_name: str = "ros_provider") -> None: - super().__init__(dev_name) - self.logger = logging.getLogger(dev_name) - - def push_data(self, data) -> None: - """Push new data to the stream.""" - print(f"ROSDataProvider pushing data of type: {type(data)}") - super().push_data(data) - print("Data pushed to subject") - - def capture_data_as_observable(self, fps: int | None = None) -> Observable: - """Get the data stream as an observable. - - Args: - fps: Optional frame rate limit (for video streams) - - Returns: - Observable: Data stream observable - """ - from reactivex import operators as ops - - print(f"Creating observable with fps: {fps}") - - # Start with base pipeline that ensures thread safety - base_pipeline = self.data_stream.pipe( - # Ensure emissions are handled on thread pool - ops.observe_on(pool_scheduler), - # Add debug logging to track data flow - ops.do_action( - on_next=lambda x: print(f"Got frame in pipeline: {type(x)}"), - on_error=lambda e: print(f"Pipeline error: {e}"), - on_completed=lambda: print("Pipeline completed"), - ), - ) - - # If fps is specified, add rate limiting - if fps and fps > 0: - print(f"Adding rate limiting at {fps} FPS") - return base_pipeline.pipe( - # Use scheduler for time-based operations - ops.sample(1.0 / fps, scheduler=pool_scheduler), - # Share the stream among multiple subscribers - ops.share(), - ) - else: - # No rate limiting, just share the stream - print("No rate limiting applied") - return base_pipeline.pipe(ops.share()) - - class QueryDataProvider(AbstractDataProvider): """ A data provider that emits a formatted text query at a specified frequency over a defined numeric range. diff --git a/dimos/stream/frame_processor.py b/dimos/stream/frame_processor.py index fda13ece61..05f3391c60 100644 --- a/dimos/stream/frame_processor.py +++ b/dimos/stream/frame_processor.py @@ -161,16 +161,6 @@ def process_stream_edge_detection(self, frame_stream): ops.map(self.edge_detection), ) - def process_stream_resize(self, frame_stream): - return frame_stream.pipe( - ops.map(self.resize), - ) - - def process_stream_to_greyscale(self, frame_stream): - return frame_stream.pipe( - ops.map(self.to_grayscale), - ) - def process_stream_optical_flow(self, frame_stream: Observable) -> Observable: """Processes video stream to compute and visualize optical flow. @@ -260,42 +250,3 @@ def process_stream_optical_flow_with_relevancy(self, frame_stream: Observable) - ), ops.filter(lambda result: result[0] is not None), # Ensure valid visualization ) - - def process_stream_with_jpeg_export( - self, frame_stream: Observable, suffix: str = "", loop: bool = False - ) -> Observable: - """Processes stream by saving frames as JPEGs while passing them through. - - Saves each frame from the stream as a JPEG file and passes the frame - downstream unmodified. Files are saved sequentially with optional suffix - in the configured output directory (self.output_dir). If loop is True, - it will cycle back and overwrite images starting from the first one - after reaching the save_limit. - - Args: - frame_stream: An Observable emitting video frames as numpy arrays. - Each frame should be in BGR format with shape (height, width, 3). - suffix: Optional string to append to filename before index. - Defaults to empty string. Example: "optical" -> "optical_1.jpg" - loop: If True, reset the image counter to 1 after reaching - save_limit, effectively looping the saves. Defaults to False. - - Returns: - An Observable emitting the same frames that were saved. Returns None - for frames that could not be saved due to format issues or save_limit - (unless loop is True). - - Raises: - TypeError: If frame_stream is not an Observable. - ValueError: If frames have invalid format or output directory - is not writable. - OSError: If there are file system permission issues. - - Note: - Frames are saved as '{suffix}_{index}.jpg' where index - increments for each saved frame. Saving stops after reaching - the configured save_limit (default: 100) unless loop is True. - """ - return frame_stream.pipe( - ops.map(lambda frame: self.export_to_jpeg(frame, suffix=suffix, loop=loop)), - ) diff --git a/dimos/stream/video_operators.py b/dimos/stream/video_operators.py index d7299f3dce..77e6899a11 100644 --- a/dimos/stream/video_operators.py +++ b/dimos/stream/video_operators.py @@ -16,12 +16,11 @@ from collections.abc import Callable from datetime import datetime, timedelta from enum import Enum -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING import cv2 import numpy as np from reactivex import Observable, Observer, create, operators as ops -import zmq if TYPE_CHECKING: from dimos.stream.frame_processor import FrameProcessor @@ -189,50 +188,6 @@ def with_optical_flow_filtering(threshold: float = 1.0) -> Callable[[Observable] ops.map(lambda result: result[0]), ) - @staticmethod - def with_edge_detection( - frame_processor: "FrameProcessor", - ) -> Callable[[Observable], Observable]: - return lambda source: source.pipe( - ops.map(lambda frame: frame_processor.edge_detection(frame)) - ) - - @staticmethod - def with_optical_flow( - frame_processor: "FrameProcessor", - ) -> Callable[[Observable], Observable]: - return lambda source: source.pipe( - ops.scan( - lambda acc, frame: frame_processor.compute_optical_flow( - acc, frame, compute_relevancy=False - ), - (None, None, None), - ), - ops.map(lambda result: result[1]), # Extract flow component - ops.filter(lambda flow: flow is not None), - ops.map(frame_processor.visualize_flow), - ) - - @staticmethod - def with_zmq_socket( - socket: zmq.Socket, scheduler: Any | None = None - ) -> Callable[[Observable], Observable]: - def send_frame(frame, socket) -> None: - _, img_encoded = cv2.imencode(".jpg", frame) - socket.send(img_encoded.tobytes()) - # print(f"Frame received: {frame.shape}") - - # Use a default scheduler if none is provided - if scheduler is None: - from reactivex.scheduler import ThreadPoolScheduler - - scheduler = ThreadPoolScheduler(1) # Single-threaded pool for isolation - - return lambda source: source.pipe( - ops.observe_on(scheduler), # Ensure this part runs on its own thread - ops.do_action(lambda frame: send_frame(frame, socket)), - ) - @staticmethod def encode_image() -> Callable[[Observable], Observable]: """ @@ -260,299 +215,7 @@ def _encode_image(image: np.ndarray) -> tuple[str, tuple[int, int]]: return _operator -from threading import Lock - -from reactivex import Observable -from reactivex.disposable import Disposable - - class Operators: - @staticmethod - def exhaust_lock(process_item): - """ - For each incoming item, call `process_item(item)` to get an Observable. - - If we're busy processing the previous one, skip new items. - - Use a lock to ensure concurrency safety across threads. - """ - - def _exhaust_lock(source: Observable) -> Observable: - def _subscribe(observer, scheduler=None): - in_flight = False - lock = Lock() - upstream_done = False - - upstream_disp = None - active_inner_disp = None - - def dispose_all() -> None: - if upstream_disp: - upstream_disp.dispose() - if active_inner_disp: - active_inner_disp.dispose() - - def on_next(value) -> None: - nonlocal in_flight, active_inner_disp - lock.acquire() - try: - if not in_flight: - in_flight = True - print("Processing new item.") - else: - print("Skipping item, already processing.") - return - finally: - lock.release() - - # We only get here if we grabbed the in_flight slot - try: - inner_source = process_item(value) - except Exception as ex: - observer.on_error(ex) - return - - def inner_on_next(ivalue) -> None: - observer.on_next(ivalue) - - def inner_on_error(err) -> None: - nonlocal in_flight - with lock: - in_flight = False - observer.on_error(err) - - def inner_on_completed() -> None: - nonlocal in_flight - with lock: - in_flight = False - if upstream_done: - observer.on_completed() - - # Subscribe to the inner observable - nonlocal active_inner_disp - active_inner_disp = inner_source.subscribe( - on_next=inner_on_next, - on_error=inner_on_error, - on_completed=inner_on_completed, - scheduler=scheduler, - ) - - def on_error(err) -> None: - dispose_all() - observer.on_error(err) - - def on_completed() -> None: - nonlocal upstream_done - with lock: - upstream_done = True - # If we're not busy, we can end now - if not in_flight: - observer.on_completed() - - upstream_disp = source.subscribe( - on_next, on_error, on_completed, scheduler=scheduler - ) - return dispose_all - - return create(_subscribe) - - return _exhaust_lock - - @staticmethod - def exhaust_lock_per_instance(process_item, lock: Lock): - """ - - For each item from upstream, call process_item(item) -> Observable. - - If a frame arrives while one is "in flight", discard it. - - 'lock' ensures we safely check/modify the 'in_flight' state in a multithreaded environment. - """ - - def _exhaust_lock(source: Observable) -> Observable: - def _subscribe(observer, scheduler=None): - in_flight = False - upstream_done = False - - upstream_disp = None - active_inner_disp = None - - def dispose_all() -> None: - if upstream_disp: - upstream_disp.dispose() - if active_inner_disp: - active_inner_disp.dispose() - - def on_next(value) -> None: - nonlocal in_flight, active_inner_disp - with lock: - # If not busy, claim the slot - if not in_flight: - in_flight = True - print("\033[34mProcessing new item.\033[0m") - else: - # Already processing => drop - print("\033[34mSkipping item, already processing.\033[0m") - return - - # We only get here if we acquired the slot - try: - inner_source = process_item(value) - except Exception as ex: - observer.on_error(ex) - return - - def inner_on_next(ivalue) -> None: - observer.on_next(ivalue) - - def inner_on_error(err) -> None: - nonlocal in_flight - with lock: - in_flight = False - print("\033[34mError in inner on error.\033[0m") - observer.on_error(err) - - def inner_on_completed() -> None: - nonlocal in_flight - with lock: - in_flight = False - print("\033[34mInner on completed.\033[0m") - if upstream_done: - observer.on_completed() - - # Subscribe to the inner Observable - nonlocal active_inner_disp - active_inner_disp = inner_source.subscribe( - on_next=inner_on_next, - on_error=inner_on_error, - on_completed=inner_on_completed, - scheduler=scheduler, - ) - - def on_error(e) -> None: - dispose_all() - observer.on_error(e) - - def on_completed() -> None: - nonlocal upstream_done - with lock: - upstream_done = True - print("\033[34mOn completed.\033[0m") - if not in_flight: - observer.on_completed() - - upstream_disp = source.subscribe( - on_next=on_next, - on_error=on_error, - on_completed=on_completed, - scheduler=scheduler, - ) - - return Disposable(dispose_all) - - return create(_subscribe) - - return _exhaust_lock - - @staticmethod - def exhaust_map(project): - def _exhaust_map(source: Observable): - def subscribe(observer, scheduler=None): - is_processing = False - - def on_next(item) -> None: - nonlocal is_processing - if not is_processing: - is_processing = True - print("\033[35mProcessing item.\033[0m") - try: - inner_observable = project(item) # Create the inner observable - inner_observable.subscribe( - on_next=observer.on_next, - on_error=observer.on_error, - on_completed=lambda: set_not_processing(), - scheduler=scheduler, - ) - except Exception as e: - observer.on_error(e) - else: - print("\033[35mSkipping item, already processing.\033[0m") - - def set_not_processing() -> None: - nonlocal is_processing - is_processing = False - print("\033[35mItem processed.\033[0m") - - return source.subscribe( - on_next=on_next, - on_error=observer.on_error, - on_completed=observer.on_completed, - scheduler=scheduler, - ) - - return create(subscribe) - - return _exhaust_map - - @staticmethod - def with_lock(lock: Lock): - def operator(source: Observable): - def subscribe(observer, scheduler=None): - def on_next(item) -> None: - if not lock.locked(): # Check if the lock is free - if lock.acquire(blocking=False): # Non-blocking acquire - try: - print("\033[32mAcquired lock, processing item.\033[0m") - observer.on_next(item) - finally: # Ensure lock release even if observer.on_next throws - lock.release() - else: - print("\033[34mLock busy, skipping item.\033[0m") - else: - print("\033[34mLock busy, skipping item.\033[0m") - - def on_error(error) -> None: - observer.on_error(error) - - def on_completed() -> None: - observer.on_completed() - - return source.subscribe( - on_next=on_next, - on_error=on_error, - on_completed=on_completed, - scheduler=scheduler, - ) - - return Observable(subscribe) - - return operator - - @staticmethod - def with_lock_check(lock: Lock): # Renamed for clarity - def operator(source: Observable): - def subscribe(observer, scheduler=None): - def on_next(item) -> None: - if not lock.locked(): # Check if the lock is held WITHOUT acquiring - print(f"\033[32mLock is free, processing item: {item}\033[0m") - observer.on_next(item) - else: - print(f"\033[34mLock is busy, skipping item: {item}\033[0m") - # observer.on_completed() - - def on_error(error) -> None: - observer.on_error(error) - - def on_completed() -> None: - observer.on_completed() - - return source.subscribe( - on_next=on_next, - on_error=on_error, - on_completed=on_completed, - scheduler=scheduler, - ) - - return Observable(subscribe) - - return operator - - # PrintColor enum for standardized color formatting class PrintColor(Enum): RED = "\033[31m" GREEN = "\033[32m" diff --git a/dimos/types/manipulation.py b/dimos/types/manipulation.py index 0df62362a4..01146827fe 100644 --- a/dimos/types/manipulation.py +++ b/dimos/types/manipulation.py @@ -14,7 +14,6 @@ from abc import ABC from dataclasses import dataclass, field -from enum import Enum import time from typing import TYPE_CHECKING, Any, Literal, TypedDict import uuid @@ -27,14 +26,6 @@ import open3d as o3d -class ConstraintType(Enum): - """Types of manipulation constraints.""" - - TRANSLATION = "translation" - ROTATION = "rotation" - FORCE = "force" - - @dataclass class AbstractConstraint(ABC): """Base class for all manipulation constraints.""" diff --git a/dimos/types/sample.py b/dimos/types/sample.py index fdb29cf174..3b2db43066 100644 --- a/dimos/types/sample.py +++ b/dimos/types/sample.py @@ -14,7 +14,6 @@ import builtins from collections import OrderedDict -from collections.abc import Sequence from enum import Enum import json import logging @@ -27,7 +26,7 @@ from mbodied.data.utils import to_features from mbodied.utils.import_utils import smart_import import numpy as np -from pydantic import BaseModel, ConfigDict, ValidationError +from pydantic import BaseModel, ConfigDict from pydantic.fields import FieldInfo from pydantic_core import from_json import torch @@ -340,15 +339,6 @@ def to(self, container: Any) -> Any: return to_features(self.dict()) raise ValueError(f"Unsupported container type: {container}") - @classmethod - def default_value(cls) -> "Sample": - """Get the default value for the Sample instance. - - Returns: - Sample: The default value for the Sample instance. - """ - return cls() - @classmethod def space_for( cls, @@ -408,56 +398,6 @@ def space_for( ) raise ValueError(f"Unsupported object {value} of type: {type(value)} for space generation") - @classmethod - def init_from(cls, d: Any, pack: bool = False) -> "Sample": - if isinstance(d, spaces.Space): - return cls.from_space(d) - if isinstance(d, Union[Sequence, np.ndarray]): - if pack: - return cls.pack_from(d) - return cls.unflatten(d) - if isinstance(d, dict): - try: - return cls.model_validate(d) - except ValidationError as e: - logging.info(f" Unable to validate {d} as {cls} {e}. Attempting to unflatten.") - - try: - return cls.unflatten(d) - except Exception as e: - logging.info(f" Unable to unflatten {d} as {cls} {e}. Attempting to read.") - return cls.read(d) - return cls(d) - - @classmethod - def from_flat_dict( - cls, flat_dict: builtins.dict[str, Any], schema: builtins.dict | None = None - ) -> "Sample": - """Initialize a Sample instance from a flattened dictionary.""" - """ - Reconstructs the original JSON object from a flattened dictionary using the provided schema. - - Args: - flat_dict (dict): A flattened dictionary with keys like "key1.nestedkey1". - schema (dict): A dictionary representing the JSON schema. - - Returns: - dict: The reconstructed JSON object. - """ - schema = schema or replace_refs(cls.model_json_schema()) - reconstructed = {} - - for flat_key, value in flat_dict.items(): - keys = flat_key.split(".") - current = reconstructed - for key in keys[:-1]: - if key not in current: - current[key] = {} - current = current[key] - current[keys[-1]] = value - - return reconstructed - @classmethod def from_space(cls, space: spaces.Space) -> "Sample": """Generate a Sample instance from a Gym space.""" @@ -524,23 +464,6 @@ def unpack(self, to_dicts: bool = False) -> list[Union["Sample", builtins.dict]] for i in range(list_size) ] - @classmethod - def default_space(cls) -> spaces.Dict: - """Return the Gym space for the Sample class based on its class attributes.""" - return cls().space() - - @classmethod - def default_sample( - cls, output_type: str = "Sample" - ) -> Union["Sample", builtins.dict[str, Any]]: - """Generate a default Sample instance from its class attributes. Useful for padding. - - This is the "no-op" instance and should be overriden as needed. - """ - if output_type == "Sample": - return cls() - return cls().dict() - def model_field_info(self, key: str) -> FieldInfo: """Get the FieldInfo for a given attribute key.""" if self.model_extra and self.model_extra.get(key) is not None: @@ -567,13 +490,6 @@ def space(self) -> spaces.Dict: ) return spaces.Dict(space_dict) - def random_sample(self) -> "Sample": - """Generate a random Sample instance based on its instance attributes. Omits None values. - - Override this method in subclasses to customize the sample generation. - """ - return self.__class__.model_validate(self.space().sample()) - if __name__ == "__main__": sample = Sample(x=1, y=2, z={"a": 3, "b": 4}, extra_field=5) diff --git a/dimos/types/timestamped.py b/dimos/types/timestamped.py index 0045c73ef4..8eb04e790d 100644 --- a/dimos/types/timestamped.py +++ b/dimos/types/timestamped.py @@ -107,12 +107,6 @@ def __init__(self, ts: float) -> None: def dt(self) -> datetime: return datetime.fromtimestamp(self.ts, tz=timezone.utc).astimezone() - def ros_timestamp(self) -> list[int]: - """Convert timestamp to ROS-style list [sec, nanosec].""" - sec = int(self.ts) - nanosec = int((self.ts - sec) * 1_000_000_000) - return [sec, nanosec] - T = TypeVar("T", bound=Timestamped) diff --git a/dimos/types/vector.py b/dimos/types/vector.py index 161084fc2c..f5eb35e5e5 100644 --- a/dimos/types/vector.py +++ b/dimos/types/vector.py @@ -106,10 +106,6 @@ def getArrow(): return f"{getArrow()} Vector {self.__repr__()}" - def serialize(self) -> builtins.tuple: - """Serialize the vector to a tuple.""" - return {"type": "vector", "c": self._data.tolist()} - def __eq__(self, other) -> bool: """Check if two vectors are equal using numpy's allclose for floating point comparison.""" if not isinstance(other, Vector): @@ -370,45 +366,6 @@ def to_list(value: VectorLike) -> list[float]: return list(value) -# Helper functions to check dimensionality -def is_2d(value: VectorLike) -> bool: - """Check if a vector-compatible value is 2D. - - Args: - value: Any vector-like object (Vector, numpy array, tuple, list) - - Returns: - True if the value is 2D - """ - if isinstance(value, Vector3): - return False - elif isinstance(value, Vector): - return len(value) == 2 - elif isinstance(value, np.ndarray): - return value.shape[-1] == 2 or value.size == 2 - else: - return len(value) == 2 - - -def is_3d(value: VectorLike) -> bool: - """Check if a vector-compatible value is 3D. - - Args: - value: Any vector-like object (Vector, numpy array, tuple, list) - - Returns: - True if the value is 3D - """ - if isinstance(value, Vector): - return len(value) == 3 - elif isinstance(value, Vector3): - return True - elif isinstance(value, np.ndarray): - return value.shape[-1] == 3 or value.size == 3 - else: - return len(value) == 3 - - # Extraction functions for XYZ components def x(value: VectorLike) -> float: """Get the X component of a vector-compatible value. diff --git a/dimos/utils/simple_controller.py b/dimos/utils/simple_controller.py index dd92ae0c55..0a572f4948 100644 --- a/dimos/utils/simple_controller.py +++ b/dimos/utils/simple_controller.py @@ -109,64 +109,3 @@ def _apply_output_deadband_compensation(self, output): return output - self.output_deadband else: return output - - def _apply_deadband_compensation(self, error): - """ - Apply deadband compensation to the error. - - This maintains the original error value, as the deadband compensation - will be applied to the output, not the error. - """ - return error - - -# ---------------------------- -# Visual Servoing Controller Class -# ---------------------------- -class VisualServoingController: - def __init__(self, distance_pid_params, angle_pid_params) -> None: - """ - Initialize the visual servoing controller using enhanced PID controllers. - - Args: - distance_pid_params (tuple): (kp, ki, kd, output_limits, integral_limit, deadband) for distance. - angle_pid_params (tuple): (kp, ki, kd, output_limits, integral_limit, deadband) for angle. - """ - self.distance_pid = PIDController(*distance_pid_params) - self.angle_pid = PIDController(*angle_pid_params) - self.prev_measured_angle = 0.0 # Used for angular feed-forward damping - - def compute_control( - self, measured_distance, measured_angle, desired_distance, desired_angle, dt - ): - """ - Compute the forward (x) and angular (z) commands. - - Args: - measured_distance (float): Current distance to target (from camera). - measured_angle (float): Current angular offset to target (radians). - desired_distance (float): Desired distance to target. - desired_angle (float): Desired angular offset (e.g., 0 for centered). - dt (float): Timestep. - - Returns: - tuple: (forward_command, angular_command) - """ - # Compute the errors. - error_distance = measured_distance - desired_distance - error_angle = normalize_angle(measured_angle - desired_angle) - - # Get raw PID outputs. - forward_command_raw = self.distance_pid.update(error_distance, dt) - angular_command_raw = self.angle_pid.update(error_angle, dt) - - # print("forward: {} angular: {}".format(forward_command_raw, angular_command_raw)) - - angular_command = angular_command_raw - - # Couple forward command to angular error: - # scale the forward command smoothly. - scaling_factor = max(0.0, min(1.0, math.exp(-2.0 * abs(error_angle)))) - forward_command = forward_command_raw * scaling_factor - - return forward_command, angular_command diff --git a/dimos/utils/testing.py b/dimos/utils/testing.py index 5e3725bc81..089254cda0 100644 --- a/dimos/utils/testing.py +++ b/dimos/utils/testing.py @@ -134,10 +134,6 @@ def __init__(self, name: str, autocast: Callable[[T], Any] | None = None) -> Non # Create the directory self.root_dir.mkdir(parents=True, exist_ok=True) - def consume_stream(self, observable: Observable[T | Any]) -> None: - """Consume an observable stream of sensor data without saving.""" - return observable.subscribe(self.save_one) - def save_stream(self, observable: Observable[T | Any]) -> Observable[int]: """Save an observable stream of sensor data to pickle files.""" return observable.pipe(ops.map(lambda frame: self.save_one(frame))) diff --git a/dimos/utils/threadpool.py b/dimos/utils/threadpool.py index 45625e9980..eeb0de181a 100644 --- a/dimos/utils/threadpool.py +++ b/dimos/utils/threadpool.py @@ -60,18 +60,6 @@ def get_scheduler() -> ThreadPoolScheduler: return scheduler -def make_single_thread_scheduler() -> ThreadPoolScheduler: - """Create a new ThreadPoolScheduler with a single worker. - - This provides a dedicated scheduler for tasks that should run serially - on their own thread rather than using the shared thread pool. - - Returns: - ThreadPoolScheduler: A scheduler instance with a single worker thread. - """ - return ThreadPoolScheduler(max_workers=1) - - # Example usage: # scheduler = get_scheduler() # # Use the scheduler for parallel tasks