From 0a24b6275d14b3d0916f93b77b3ea1a75f612e4e Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Thu, 15 Jan 2026 16:44:17 -0800 Subject: [PATCH 1/8] added HardwareComponent defining universal schema fo robot type and joint type classes --- dimos/control/components.py | 106 ++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 dimos/control/components.py diff --git a/dimos/control/components.py b/dimos/control/components.py new file mode 100644 index 0000000000..9094db3ae5 --- /dev/null +++ b/dimos/control/components.py @@ -0,0 +1,106 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Hardware component schema for the ControlOrchestrator.""" + +from dataclasses import dataclass +from enum import Enum + +from dimos.hardware.manipulators.spec import ControlMode + +HardwareId = str +JointName = str + + +class HardwareType(Enum): + MANIPULATOR = "manipulator" + BASE = "base" + GRIPPER = "gripper" + + +class JointType(Enum): + REVOLUTE = "revolute" # Rotary with limits (radians) + PRISMATIC = "prismatic" # Linear with limits (meters) + CONTINUOUS = "continuous" # Rotary no limits (wheels) + VELOCITY = "velocity" # Velocity-only (base vx/vy/wz) + + +@dataclass(frozen=True) +class JointConfig: + joint_name: JointName + joint_type: JointType + supported_modes: tuple[ControlMode, ...] + limits: tuple[float, float] | None = None + default_on_timeout: float | None = None # None=hold, 0.0=zero velocity + + +@dataclass +class HardwareComponent: + """Configuration for a hardware component. + + Attributes: + hardware_id: Unique hardware identifier (e.g., "arm", "left_arm") + hardware_type: Type of hardware (MANIPULATOR, BASE, GRIPPER) + joints: List of joint configurations + backend_type: Backend type ("mock", "xarm", "piper") + address: Connection address - IP for TCP backends, port name for CAN backends + auto_enable: Whether to auto-enable servos (default True) + description: Optional description + """ + + hardware_id: HardwareId + hardware_type: HardwareType + joints: list[JointConfig] + backend_type: str = "mock" + address: str | None = None + auto_enable: bool = True + description: str = "" + + +def make_joints( + hardware_id: HardwareId, + dof: int, + joint_type: JointType = JointType.REVOLUTE, + supported_modes: tuple[ControlMode, ...] = (ControlMode.POSITION, ControlMode.SERVO_POSITION), +) -> list[JointConfig]: + """Create joint configs for hardware. + + Args: + hardware_id: The hardware identifier (e.g., "left_arm") + dof: Degrees of freedom + joint_type: Type of joints (default: REVOLUTE) + supported_modes: Control modes the joints support + + Returns: + List of JointConfig with names like "left_arm_joint1", "left_arm_joint2", ... + """ + return [ + JointConfig( + joint_name=f"{hardware_id}_joint{i + 1}", + joint_type=joint_type, + supported_modes=supported_modes, + ) + for i in range(dof) + ] + + +__all__ = [ + "HardwareComponent", + "HardwareId", + "HardwareType", + "JointConfig", + "JointName", + "JointType", + "make_joints", +] From bff40b0732afb2bff85bab9a86c8abbfaca1d7ef Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Thu, 15 Jan 2026 16:45:46 -0800 Subject: [PATCH 2/8] removed simple HardwareConfig and replaced with new HardwareComponent dataclass --- dimos/control/__init__.py | 14 ++++- dimos/control/hardware_interface.py | 28 +++++----- dimos/control/orchestrator.py | 86 +++++++++++------------------ 3 files changed, 58 insertions(+), 70 deletions(-) diff --git a/dimos/control/__init__.py b/dimos/control/__init__.py index 3d7d647cd4..48f2ad4329 100644 --- a/dimos/control/__init__.py +++ b/dimos/control/__init__.py @@ -49,6 +49,13 @@ >>> orch.start() """ +from dimos.control.components import ( + HardwareComponent, + HardwareType, + JointConfig, + JointType, + make_joints, +) from dimos.control.hardware_interface import ( BackendHardwareInterface, HardwareInterface, @@ -56,7 +63,6 @@ from dimos.control.orchestrator import ( ControlOrchestrator, ControlOrchestratorConfig, - HardwareConfig, TaskConfig, control_orchestrator, ) @@ -79,14 +85,18 @@ "ControlOrchestratorConfig", # Task protocol and types "ControlTask", - "HardwareConfig", + "HardwareComponent", "HardwareInterface", + "HardwareType", "JointCommandOutput", + "JointConfig", "JointStateSnapshot", + "JointType", "OrchestratorState", "ResourceClaim", "TaskConfig", # Tick loop "TickLoop", "control_orchestrator", + "make_joints", ] diff --git a/dimos/control/hardware_interface.py b/dimos/control/hardware_interface.py index e4fba70e1c..cf90e0b8e4 100644 --- a/dimos/control/hardware_interface.py +++ b/dimos/control/hardware_interface.py @@ -28,6 +28,9 @@ from dimos.hardware.manipulators.spec import ControlMode, ManipulatorBackend +if TYPE_CHECKING: + from dimos.control.components import HardwareComponent + logger = logging.getLogger(__name__) @@ -82,7 +85,7 @@ class BackendHardwareInterface: """Concrete implementation wrapping a ManipulatorBackend. Features: - - Generates namespaced joint names (prefix_joint1, prefix_joint2, ...) + - Uses joint names from HardwareComponent - Holds last commanded value for partial commands - On first tick, reads current position from hardware for missing joints """ @@ -90,26 +93,20 @@ class BackendHardwareInterface: def __init__( self, backend: ManipulatorBackend, - hardware_id: str, - joint_prefix: str | None = None, + component: HardwareComponent, ) -> None: """Initialize hardware interface. Args: backend: ManipulatorBackend instance (XArmBackend, PiperBackend, etc.) - hardware_id: Unique identifier for this hardware - joint_prefix: Prefix for joint names (defaults to hardware_id) + component: Hardware component with joints config """ if not isinstance(backend, ManipulatorBackend): raise TypeError("backend must implement ManipulatorBackend") self._backend = backend - self._hardware_id = hardware_id - self._prefix = joint_prefix or hardware_id - self._dof = backend.get_dof() - - # Generate joint names: prefix_joint1, prefix_joint2, ... - self._joint_names = [f"{self._prefix}_joint{i + 1}" for i in range(self._dof)] + self._component = component + self._joint_names = [j.joint_name for j in component.joints] # Track last commanded values for hold-last behavior self._last_commanded: dict[str, float] = {} @@ -120,17 +117,22 @@ def __init__( @property def hardware_id(self) -> str: """Unique ID for this hardware.""" - return self._hardware_id + return self._component.hardware_id @property def joint_names(self) -> list[str]: """Ordered list of joint names.""" return self._joint_names + @property + def component(self) -> HardwareComponent: + """The hardware component config.""" + return self._component + @property def dof(self) -> int: """Degrees of freedom.""" - return self._dof + return len(self._joint_names) def disconnect(self) -> None: """Disconnect the underlying backend.""" diff --git a/dimos/control/orchestrator.py b/dimos/control/orchestrator.py index e7ea147dcf..619969fa89 100644 --- a/dimos/control/orchestrator.py +++ b/dimos/control/orchestrator.py @@ -33,6 +33,7 @@ import time from typing import TYPE_CHECKING, Any +from dimos.control.components import HardwareComponent from dimos.control.hardware_interface import BackendHardwareInterface, HardwareInterface from dimos.control.task import ControlTask from dimos.control.tick_loop import TickLoop @@ -55,29 +56,6 @@ # ============================================================================= -@dataclass -class HardwareConfig: - """Configuration for a hardware backend. - - Attributes: - id: Unique hardware identifier (e.g., "arm", "left_arm") - type: Backend type ("mock", "xarm", "piper") - dof: Degrees of freedom - joint_prefix: Prefix for joint names (defaults to id) - ip: IP address (required for xarm) - can_port: CAN port (for piper, default "can0") - auto_enable: Whether to auto-enable servos (default True) - """ - - id: str - type: str = "mock" - dof: int = 7 - joint_prefix: str | None = None - ip: str | None = None - can_port: str | None = None - auto_enable: bool = True - - @dataclass class TaskConfig: """Configuration for a control task. @@ -127,7 +105,7 @@ class ControlOrchestratorConfig(ModuleConfig): publish_joint_state: bool = True joint_state_frame_id: str = "orchestrator" log_ticks: bool = False - hardware: list[HardwareConfig] = field(default_factory=lambda: []) + hardware: list[HardwareComponent] = field(default_factory=lambda: []) tasks: list[TaskConfig] = field(default_factory=lambda: []) @@ -198,9 +176,9 @@ def _setup_from_config(self) -> None: hardware_added: list[str] = [] try: - for hw_cfg in self.config.hardware: - self._setup_hardware(hw_cfg) - hardware_added.append(hw_cfg.id) + for component in self.config.hardware: + self._setup_hardware(component) + hardware_added.append(component.hardware_id) for task_cfg in self.config.tasks: task = self._create_task_from_config(task_cfg) @@ -215,44 +193,42 @@ def _setup_from_config(self) -> None: pass raise - def _setup_hardware(self, hw_cfg: HardwareConfig) -> None: + def _setup_hardware(self, component: HardwareComponent) -> None: """Connect and add a single hardware backend.""" - backend = self._create_backend_from_config(hw_cfg) + backend = self._create_backend(component) if not backend.connect(): - raise RuntimeError(f"Failed to connect to {hw_cfg.type} backend") + raise RuntimeError(f"Failed to connect to {component.backend_type} backend") try: - if hw_cfg.auto_enable and hasattr(backend, "write_enable"): + if component.auto_enable and hasattr(backend, "write_enable"): backend.write_enable(True) - self.add_hardware( - hw_cfg.id, - backend, - joint_prefix=hw_cfg.joint_prefix or hw_cfg.id, - ) + + self.add_hardware(backend, component) except Exception: backend.disconnect() raise - def _create_backend_from_config(self, cfg: HardwareConfig) -> ManipulatorBackend: - """Create a manipulator backend from config.""" - match cfg.type.lower(): + def _create_backend(self, component: HardwareComponent) -> ManipulatorBackend: + """Create a manipulator backend from component config.""" + dof = len(component.joints) + match component.backend_type.lower(): case "mock": from dimos.hardware.manipulators.mock import MockBackend - return MockBackend(dof=cfg.dof) + return MockBackend(dof=dof) case "xarm": - if cfg.ip is None: - raise ValueError("ip is required for xarm backend") + if component.address is None: + raise ValueError("address (IP) is required for xarm backend") from dimos.hardware.manipulators.xarm import XArmBackend - return XArmBackend(ip=cfg.ip, dof=cfg.dof) + return XArmBackend(ip=component.address, dof=dof) case "piper": from dimos.hardware.manipulators.piper import PiperBackend - return PiperBackend(can_port=cfg.can_port or "can0", dof=cfg.dof) + return PiperBackend(can_port=component.address or "can0", dof=dof) case _: - raise ValueError(f"Unknown backend type: {cfg.type}") + raise ValueError(f"Unknown backend type: {component.backend_type}") def _create_task_from_config(self, cfg: TaskConfig) -> ControlTask: """Create a control task from config.""" @@ -279,27 +255,27 @@ def _create_task_from_config(self, cfg: TaskConfig) -> ControlTask: @rpc def add_hardware( self, - hardware_id: str, backend: ManipulatorBackend, - joint_prefix: str | None = None, + component: HardwareComponent, ) -> bool: """Register a hardware backend with the orchestrator.""" with self._hardware_lock: - if hardware_id in self._hardware: - logger.warning(f"Hardware {hardware_id} already registered") + if component.hardware_id in self._hardware: + logger.warning(f"Hardware {component.hardware_id} already registered") return False interface = BackendHardwareInterface( backend=backend, - hardware_id=hardware_id, - joint_prefix=joint_prefix, + component=component, ) - self._hardware[hardware_id] = interface + self._hardware[component.hardware_id] = interface for joint_name in interface.joint_names: - self._joint_to_hardware[joint_name] = hardware_id + self._joint_to_hardware[joint_name] = component.hardware_id - logger.info(f"Added hardware {hardware_id} with joints: {interface.joint_names}") + logger.info( + f"Added hardware {component.hardware_id} with joints: {interface.joint_names}" + ) return True @rpc @@ -533,7 +509,7 @@ def get_tick_count(self) -> int: __all__ = [ "ControlOrchestrator", "ControlOrchestratorConfig", - "HardwareConfig", + "HardwareComponent", "TaskConfig", "control_orchestrator", ] From 2a91ca801cbf99b7414aeffa04d6b223c0b0f920 Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Thu, 15 Jan 2026 16:45:57 -0800 Subject: [PATCH 3/8] updated blueprints and test files --- dimos/control/blueprints.py | 148 +++++++++--------- dimos/control/test_control.py | 29 +++- dimos/e2e_tests/test_control_orchestrator.py | 8 +- .../control/orchestrator_client.py | 12 +- 4 files changed, 107 insertions(+), 90 deletions(-) diff --git a/dimos/control/blueprints.py b/dimos/control/blueprints.py index 5cbc109ece..8c8b0e62c5 100644 --- a/dimos/control/blueprints.py +++ b/dimos/control/blueprints.py @@ -37,23 +37,19 @@ from __future__ import annotations +from dimos.control.components import HardwareComponent, HardwareType, make_joints from dimos.control.orchestrator import ( ControlOrchestrator, - HardwareConfig, TaskConfig, control_orchestrator, ) from dimos.core.transport import LCMTransport from dimos.msgs.sensor_msgs import JointState -# ============================================================================= -# Helper function to generate joint names -# ============================================================================= - -def _joint_names(prefix: str, dof: int) -> list[str]: - """Generate joint names with prefix.""" - return [f"{prefix}_joint{i + 1}" for i in range(dof)] +def _joint_names(hardware_id: str, dof: int) -> list[str]: + """Generate joint names for a hardware component.""" + return [f"{hardware_id}_joint{i + 1}" for i in range(dof)] # ============================================================================= @@ -66,11 +62,11 @@ def _joint_names(prefix: str, dof: int) -> list[str]: publish_joint_state=True, joint_state_frame_id="orchestrator", hardware=[ - HardwareConfig( - id="arm", - type="mock", - dof=7, - joint_prefix="arm", + HardwareComponent( + hardware_id="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("arm", 7), + backend_type="mock", ), ], tasks=[ @@ -93,12 +89,12 @@ def _joint_names(prefix: str, dof: int) -> list[str]: publish_joint_state=True, joint_state_frame_id="orchestrator", hardware=[ - HardwareConfig( - id="arm", - type="xarm", - dof=7, - joint_prefix="arm", - ip="192.168.2.235", # Default IP, override via env or config + HardwareComponent( + hardware_id="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("arm", 7), + backend_type="xarm", + address="192.168.2.235", # Default IP, override via env or config auto_enable=True, ), ], @@ -122,12 +118,12 @@ def _joint_names(prefix: str, dof: int) -> list[str]: publish_joint_state=True, joint_state_frame_id="orchestrator", hardware=[ - HardwareConfig( - id="arm", - type="xarm", - dof=6, - joint_prefix="arm", - ip="192.168.1.210", + HardwareComponent( + hardware_id="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("arm", 6), + backend_type="xarm", + address="192.168.1.210", auto_enable=True, ), ], @@ -151,12 +147,12 @@ def _joint_names(prefix: str, dof: int) -> list[str]: publish_joint_state=True, joint_state_frame_id="orchestrator", hardware=[ - HardwareConfig( - id="arm", - type="piper", - dof=6, - joint_prefix="arm", - can_port="can0", + HardwareComponent( + hardware_id="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("arm", 6), + backend_type="piper", + address="can0", auto_enable=True, ), ], @@ -184,30 +180,30 @@ def _joint_names(prefix: str, dof: int) -> list[str]: publish_joint_state=True, joint_state_frame_id="orchestrator", hardware=[ - HardwareConfig( - id="left_arm", - type="mock", - dof=7, - joint_prefix="left", + HardwareComponent( + hardware_id="left_arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("left_arm", 7), + backend_type="mock", ), - HardwareConfig( - id="right_arm", - type="mock", - dof=6, - joint_prefix="right", + HardwareComponent( + hardware_id="right_arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("right_arm", 6), + backend_type="mock", ), ], tasks=[ TaskConfig( name="traj_left", type="trajectory", - joint_names=_joint_names("left", 7), + joint_names=_joint_names("left_arm", 7), priority=10, ), TaskConfig( name="traj_right", type="trajectory", - joint_names=_joint_names("right", 6), + joint_names=_joint_names("right_arm", 6), priority=10, ), ], @@ -223,20 +219,20 @@ def _joint_names(prefix: str, dof: int) -> list[str]: publish_joint_state=True, joint_state_frame_id="orchestrator", hardware=[ - HardwareConfig( - id="left_arm", - type="xarm", - dof=7, - joint_prefix="left", - ip="192.168.2.235", + HardwareComponent( + hardware_id="left_arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("left_arm", 7), + backend_type="xarm", + address="192.168.2.235", auto_enable=True, ), - HardwareConfig( - id="right_arm", - type="xarm", - dof=6, - joint_prefix="right", - ip="192.168.1.210", + HardwareComponent( + hardware_id="right_arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("right_arm", 6), + backend_type="xarm", + address="192.168.1.210", auto_enable=True, ), ], @@ -244,13 +240,13 @@ def _joint_names(prefix: str, dof: int) -> list[str]: TaskConfig( name="traj_left", type="trajectory", - joint_names=_joint_names("left", 7), + joint_names=_joint_names("left_arm", 7), priority=10, ), TaskConfig( name="traj_right", type="trajectory", - joint_names=_joint_names("right", 6), + joint_names=_joint_names("right_arm", 6), priority=10, ), ], @@ -266,20 +262,20 @@ def _joint_names(prefix: str, dof: int) -> list[str]: publish_joint_state=True, joint_state_frame_id="orchestrator", hardware=[ - HardwareConfig( - id="xarm_arm", - type="xarm", - dof=6, - joint_prefix="xarm", - ip="192.168.1.210", + HardwareComponent( + hardware_id="xarm_arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("xarm_arm", 6), + backend_type="xarm", + address="192.168.1.210", auto_enable=True, ), - HardwareConfig( - id="piper_arm", - type="piper", - dof=6, - joint_prefix="piper", - can_port="can0", + HardwareComponent( + hardware_id="piper_arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("piper_arm", 6), + backend_type="piper", + address="can0", auto_enable=True, ), ], @@ -287,13 +283,13 @@ def _joint_names(prefix: str, dof: int) -> list[str]: TaskConfig( name="traj_xarm", type="trajectory", - joint_names=_joint_names("xarm", 6), + joint_names=_joint_names("xarm_arm", 6), priority=10, ), TaskConfig( name="traj_piper", type="trajectory", - joint_names=_joint_names("piper", 6), + joint_names=_joint_names("piper_arm", 6), priority=10, ), ], @@ -313,11 +309,11 @@ def _joint_names(prefix: str, dof: int) -> list[str]: publish_joint_state=True, joint_state_frame_id="orchestrator", hardware=[ - HardwareConfig( - id="arm", - type="mock", - dof=7, - joint_prefix="arm", + HardwareComponent( + hardware_id="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("arm", 7), + backend_type="mock", ), ], tasks=[ diff --git a/dimos/control/test_control.py b/dimos/control/test_control.py index e35a9853d5..58dffd05e7 100644 --- a/dimos/control/test_control.py +++ b/dimos/control/test_control.py @@ -22,6 +22,7 @@ import pytest +from dimos.control.components import HardwareComponent, HardwareType, make_joints from dimos.control.hardware_interface import BackendHardwareInterface from dimos.control.task import ( ControlMode, @@ -61,11 +62,12 @@ def mock_backend(): @pytest.fixture def hardware_interface(mock_backend): """Create a BackendHardwareInterface with mock backend.""" - return BackendHardwareInterface( - backend=mock_backend, + component = HardwareComponent( hardware_id="test_arm", - joint_prefix="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("arm", 6), ) + return BackendHardwareInterface(backend=mock_backend, component=component) @pytest.fixture @@ -429,7 +431,12 @@ def test_non_overlapping_joints(self): class TestTickLoop: def test_tick_loop_starts_and_stops(self, mock_backend): - hw = BackendHardwareInterface(mock_backend, "arm", "arm") + component = HardwareComponent( + hardware_id="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("arm", 6), + ) + hw = BackendHardwareInterface(mock_backend, component) hardware = {"arm": hw} tasks: dict = {} joint_to_hardware = {f"arm_joint{i + 1}": "arm" for i in range(6)} @@ -453,7 +460,12 @@ def test_tick_loop_starts_and_stops(self, mock_backend): assert tick_loop.tick_count == final_count def test_tick_loop_calls_compute(self, mock_backend): - hw = BackendHardwareInterface(mock_backend, "arm", "arm") + component = HardwareComponent( + hardware_id="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("arm", 6), + ) + hw = BackendHardwareInterface(mock_backend, component) hardware = {"arm": hw} mock_task = MagicMock() @@ -495,7 +507,12 @@ def test_tick_loop_calls_compute(self, mock_backend): class TestIntegration: def test_full_trajectory_execution(self, mock_backend): - hw = BackendHardwareInterface(mock_backend, "arm", "arm") + component = HardwareComponent( + hardware_id="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("arm", 6), + ) + hw = BackendHardwareInterface(mock_backend, component) hardware = {"arm": hw} config = JointTrajectoryTaskConfig( diff --git a/dimos/e2e_tests/test_control_orchestrator.py b/dimos/e2e_tests/test_control_orchestrator.py index a97c045ce6..ae2bb17352 100644 --- a/dimos/e2e_tests/test_control_orchestrator.py +++ b/dimos/e2e_tests/test_control_orchestrator.py @@ -222,8 +222,8 @@ def test_dual_arm_orchestrator(self, lcm_spy, start_blueprint) -> None: try: # Verify both arms present joints = client.list_joints() - assert "left_joint1" in joints - assert "right_joint1" in joints + assert "left_arm_joint1" in joints + assert "right_arm_joint1" in joints tasks = client.list_tasks() assert "traj_left" in tasks @@ -231,7 +231,7 @@ def test_dual_arm_orchestrator(self, lcm_spy, start_blueprint) -> None: # Create trajectories for both arms left_trajectory = JointTrajectory( - joint_names=[f"left_joint{i + 1}" for i in range(7)], + joint_names=[f"left_arm_joint{i + 1}" for i in range(7)], points=[ TrajectoryPoint(time_from_start=0.0, positions=[0.0] * 7), TrajectoryPoint(time_from_start=0.5, positions=[0.2] * 7), @@ -239,7 +239,7 @@ def test_dual_arm_orchestrator(self, lcm_spy, start_blueprint) -> None: ) right_trajectory = JointTrajectory( - joint_names=[f"right_joint{i + 1}" for i in range(6)], + joint_names=[f"right_arm_joint{i + 1}" for i in range(6)], points=[ TrajectoryPoint(time_from_start=0.0, positions=[0.0] * 6), TrajectoryPoint(time_from_start=0.5, positions=[0.3] * 6), diff --git a/dimos/manipulation/control/orchestrator_client.py b/dimos/manipulation/control/orchestrator_client.py index 84e85dfb3d..7f30e6ba8c 100644 --- a/dimos/manipulation/control/orchestrator_client.py +++ b/dimos/manipulation/control/orchestrator_client.py @@ -155,14 +155,18 @@ def select_task(self, task_name: str) -> bool: self._current_task = task_name # Get joints for this task (infer from task name pattern) - # e.g., "traj_left" -> joints starting with "left_" + # e.g., "traj_left" -> joints starting with "left_arm_" (hardware_id based naming) # e.g., "traj_arm" -> joints starting with "arm_" all_joints = self.list_joints() - # Try to infer prefix from task name + # Try to infer hardware_id from task name if "_" in task_name: - prefix = task_name.split("_", 1)[1] # "traj_left" -> "left" - task_joints = [j for j in all_joints if j.startswith(prefix + "_")] + suffix = task_name.split("_", 1)[1] # "traj_left" -> "left" + # Try both patterns: exact suffix (e.g., "arm_") and with "_arm" suffix (e.g., "left_arm_") + task_joints = [j for j in all_joints if j.startswith(suffix + "_")] + if not task_joints: + # Try with "_arm" suffix for dual-arm setups (left -> left_arm) + task_joints = [j for j in all_joints if j.startswith(suffix + "_arm_")] else: task_joints = all_joints From 2a351697a8085cc145a2b06da6b92a7b903a2f53 Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Thu, 15 Jan 2026 18:55:07 -0800 Subject: [PATCH 4/8] added JointState dataclass and JointName str alias for better readability --- dimos/control/__init__.py | 6 +-- dimos/control/components.py | 60 ++++++++--------------------- dimos/control/hardware_interface.py | 32 ++++++++------- dimos/control/test_control.py | 8 ++-- dimos/control/tick_loop.py | 8 ++-- 5 files changed, 46 insertions(+), 68 deletions(-) diff --git a/dimos/control/__init__.py b/dimos/control/__init__.py index 48f2ad4329..b41daa2cc8 100644 --- a/dimos/control/__init__.py +++ b/dimos/control/__init__.py @@ -52,8 +52,7 @@ from dimos.control.components import ( HardwareComponent, HardwareType, - JointConfig, - JointType, + JointState, make_joints, ) from dimos.control.hardware_interface import ( @@ -89,9 +88,8 @@ "HardwareInterface", "HardwareType", "JointCommandOutput", - "JointConfig", + "JointState", "JointStateSnapshot", - "JointType", "OrchestratorState", "ResourceClaim", "TaskConfig", diff --git a/dimos/control/components.py b/dimos/control/components.py index 9094db3ae5..ddc81b83c1 100644 --- a/dimos/control/components.py +++ b/dimos/control/components.py @@ -14,11 +14,9 @@ """Hardware component schema for the ControlOrchestrator.""" -from dataclasses import dataclass +from dataclasses import dataclass, field from enum import Enum -from dimos.hardware.manipulators.spec import ControlMode - HardwareId = str JointName = str @@ -29,20 +27,13 @@ class HardwareType(Enum): GRIPPER = "gripper" -class JointType(Enum): - REVOLUTE = "revolute" # Rotary with limits (radians) - PRISMATIC = "prismatic" # Linear with limits (meters) - CONTINUOUS = "continuous" # Rotary no limits (wheels) - VELOCITY = "velocity" # Velocity-only (base vx/vy/wz) - - @dataclass(frozen=True) -class JointConfig: - joint_name: JointName - joint_type: JointType - supported_modes: tuple[ControlMode, ...] - limits: tuple[float, float] | None = None - default_on_timeout: float | None = None # None=hold, 0.0=zero velocity +class JointState: + """State of a single joint.""" + + position: float + velocity: float + effort: float @dataclass @@ -50,57 +41,40 @@ class HardwareComponent: """Configuration for a hardware component. Attributes: - hardware_id: Unique hardware identifier (e.g., "arm", "left_arm") + hardware_id: Unique identifier, also used as joint name prefix hardware_type: Type of hardware (MANIPULATOR, BASE, GRIPPER) - joints: List of joint configurations + joints: List of joint names (e.g., ["arm_joint1", "arm_joint2", ...]) backend_type: Backend type ("mock", "xarm", "piper") - address: Connection address - IP for TCP backends, port name for CAN backends - auto_enable: Whether to auto-enable servos (default True) - description: Optional description + address: Connection address - IP for TCP, port for CAN + auto_enable: Whether to auto-enable servos """ hardware_id: HardwareId hardware_type: HardwareType - joints: list[JointConfig] + joints: list[JointName] = field(default_factory=list) backend_type: str = "mock" address: str | None = None auto_enable: bool = True - description: str = "" -def make_joints( - hardware_id: HardwareId, - dof: int, - joint_type: JointType = JointType.REVOLUTE, - supported_modes: tuple[ControlMode, ...] = (ControlMode.POSITION, ControlMode.SERVO_POSITION), -) -> list[JointConfig]: - """Create joint configs for hardware. +def make_joints(hardware_id: HardwareId, dof: int) -> list[JointName]: + """Create joint names for hardware. Args: hardware_id: The hardware identifier (e.g., "left_arm") dof: Degrees of freedom - joint_type: Type of joints (default: REVOLUTE) - supported_modes: Control modes the joints support Returns: - List of JointConfig with names like "left_arm_joint1", "left_arm_joint2", ... + List of joint names like ["left_arm_joint1", "left_arm_joint2", ...] """ - return [ - JointConfig( - joint_name=f"{hardware_id}_joint{i + 1}", - joint_type=joint_type, - supported_modes=supported_modes, - ) - for i in range(dof) - ] + return [f"{hardware_id}_joint{i + 1}" for i in range(dof)] __all__ = [ "HardwareComponent", "HardwareId", "HardwareType", - "JointConfig", "JointName", - "JointType", + "JointState", "make_joints", ] diff --git a/dimos/control/hardware_interface.py b/dimos/control/hardware_interface.py index cf90e0b8e4..afe53c73f2 100644 --- a/dimos/control/hardware_interface.py +++ b/dimos/control/hardware_interface.py @@ -29,7 +29,7 @@ from dimos.hardware.manipulators.spec import ControlMode, ManipulatorBackend if TYPE_CHECKING: - from dimos.control.components import HardwareComponent + from dimos.control.components import HardwareComponent, HardwareId, JointName, JointState logger = logging.getLogger(__name__) @@ -45,24 +45,24 @@ class HardwareInterface(Protocol): """ @property - def hardware_id(self) -> str: + def hardware_id(self) -> HardwareId: """Unique ID for this hardware (e.g., 'left_arm').""" ... @property - def joint_names(self) -> list[str]: + def joint_names(self) -> list[JointName]: """Ordered list of fully-qualified joint names this hardware controls.""" ... - def read_state(self) -> dict[str, tuple[float, float, float]]: + def read_state(self) -> dict[JointName, JointState]: """Read current state. Returns: - Dict of joint_name -> (position, velocity, effort) + Dict of joint_name -> JointState(position, velocity, effort) """ ... - def write_command(self, commands: dict[str, float], mode: ControlMode) -> bool: + def write_command(self, commands: dict[JointName, float], mode: ControlMode) -> bool: """Write commands to hardware. IMPORTANT: Accepts partial joint sets. Missing joints hold last value. @@ -106,7 +106,7 @@ def __init__( self._backend = backend self._component = component - self._joint_names = [j.joint_name for j in component.joints] + self._joint_names = component.joints # Track last commanded values for hold-last behavior self._last_commanded: dict[str, float] = {} @@ -115,12 +115,12 @@ def __init__( self._current_mode: ControlMode | None = None @property - def hardware_id(self) -> str: + def hardware_id(self) -> HardwareId: """Unique ID for this hardware.""" return self._component.hardware_id @property - def joint_names(self) -> list[str]: + def joint_names(self) -> list[JointName]: """Ordered list of joint names.""" return self._joint_names @@ -138,18 +138,24 @@ def disconnect(self) -> None: """Disconnect the underlying backend.""" self._backend.disconnect() - def read_state(self) -> dict[str, tuple[float, float, float]]: - """Read state as {joint_name: (position, velocity, effort)}. + def read_state(self) -> dict[JointName, JointState]: + """Read state as {joint_name: JointState}. Returns: - Dict mapping joint name to (position, velocity, effort) tuple + Dict mapping joint name to JointState with position, velocity, effort """ + from dimos.control.components import JointState + positions = self._backend.read_joint_positions() velocities = self._backend.read_joint_velocities() efforts = self._backend.read_joint_efforts() return { - name: (positions[i], velocities[i], efforts[i]) + name: JointState( + position=positions[i], + velocity=velocities[i], + effort=efforts[i], + ) for i, name in enumerate(self._joint_names) } diff --git a/dimos/control/test_control.py b/dimos/control/test_control.py index 58dffd05e7..cf41775d49 100644 --- a/dimos/control/test_control.py +++ b/dimos/control/test_control.py @@ -192,10 +192,10 @@ def test_read_state(self, hardware_interface): state = hardware_interface.read_state() assert "arm_joint1" in state assert len(state) == 6 - pos, vel, eff = state["arm_joint1"] - assert pos == 0.0 - assert vel == 0.0 - assert eff == 0.0 + joint_state = state["arm_joint1"] + assert joint_state.position == 0.0 + assert joint_state.velocity == 0.0 + assert joint_state.effort == 0.0 def test_write_command(self, hardware_interface, mock_backend): commands = { diff --git a/dimos/control/tick_loop.py b/dimos/control/tick_loop.py index 03e4e0ebd0..a15388f31e 100644 --- a/dimos/control/tick_loop.py +++ b/dimos/control/tick_loop.py @@ -213,10 +213,10 @@ def _read_all_hardware(self) -> JointStateSnapshot: for hw in self._hardware.values(): try: state = hw.read_state() - for joint_name, (pos, vel, eff) in state.items(): - joint_positions[joint_name] = pos - joint_velocities[joint_name] = vel - joint_efforts[joint_name] = eff + for joint_name, joint_state in state.items(): + joint_positions[joint_name] = joint_state.position + joint_velocities[joint_name] = joint_state.velocity + joint_efforts[joint_name] = joint_state.effort except Exception as e: logger.error(f"Failed to read {hw.hardware_id}: {e}") From 01989c7e828f728200d8f144dcf001d0c4939f1e Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Thu, 15 Jan 2026 19:26:20 -0800 Subject: [PATCH 5/8] renamed control orchestrator to control coordinator as its easier to say --- dimos/control/__init__.py | 42 ++++---- dimos/control/blueprints.py | 100 +++++++++--------- .../{orchestrator.py => coordinator.py} | 58 +++++----- dimos/control/hardware_interface.py | 16 +-- dimos/control/task.py | 40 +++---- dimos/control/tasks/trajectory_task.py | 24 ++--- dimos/control/test_control.py | 38 ++++--- dimos/control/tick_loop.py | 16 +-- ...strator.py => test_control_coordinator.py} | 80 +++++++------- ...trator_client.py => coordinator_client.py} | 78 +++++++------- dimos/robot/all_blueprints.py | 20 ++-- 11 files changed, 255 insertions(+), 257 deletions(-) rename dimos/control/{orchestrator.py => coordinator.py} (91%) rename dimos/e2e_tests/{test_control_orchestrator.py => test_control_coordinator.py} (74%) rename dimos/manipulation/control/{orchestrator_client.py => coordinator_client.py} (90%) diff --git a/dimos/control/__init__.py b/dimos/control/__init__.py index b41daa2cc8..b35f86c9e6 100644 --- a/dimos/control/__init__.py +++ b/dimos/control/__init__.py @@ -12,9 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""ControlOrchestrator - Centralized control for multi-arm coordination. +"""ControlCoordinator - Centralized control for multi-arm coordination. -This module provides a centralized control orchestrator that replaces +This module provides a centralized control coordinator that replaces per-driver/per-controller loops with a single deterministic tick-based system. Features: @@ -25,28 +25,28 @@ - Aggregated preemption notifications Example: - >>> from dimos.control import ControlOrchestrator + >>> from dimos.control import ControlCoordinator >>> from dimos.control.tasks import JointTrajectoryTask, JointTrajectoryTaskConfig >>> from dimos.hardware.manipulators.xarm import XArmBackend >>> - >>> # Create orchestrator - >>> orch = ControlOrchestrator(tick_rate=100.0) + >>> # Create coordinator + >>> coord = ControlCoordinator(tick_rate=100.0) >>> >>> # Add hardware >>> backend = XArmBackend(ip="192.168.1.185", dof=7) >>> backend.connect() - >>> orch.add_hardware("left_arm", backend, joint_prefix="left") + >>> coord.add_hardware("left_arm", backend) >>> >>> # Add task - >>> joints = [f"left_joint{i+1}" for i in range(7)] + >>> joints = [f"left_arm_joint{i+1}" for i in range(7)] >>> task = JointTrajectoryTask( ... "traj_left", ... JointTrajectoryTaskConfig(joint_names=joints, priority=10), ... ) - >>> orch.add_task(task) + >>> coord.add_task(task) >>> >>> # Start - >>> orch.start() + >>> coord.start() """ from dimos.control.components import ( @@ -55,22 +55,22 @@ JointState, make_joints, ) +from dimos.control.coordinator import ( + ControlCoordinator, + ControlCoordinatorConfig, + TaskConfig, + control_coordinator, +) from dimos.control.hardware_interface import ( BackendHardwareInterface, HardwareInterface, ) -from dimos.control.orchestrator import ( - ControlOrchestrator, - ControlOrchestratorConfig, - TaskConfig, - control_orchestrator, -) from dimos.control.task import ( ControlMode, ControlTask, + CoordinatorState, JointCommandOutput, JointStateSnapshot, - OrchestratorState, ResourceClaim, ) from dimos.control.tick_loop import TickLoop @@ -78,23 +78,23 @@ __all__ = [ # Hardware interface "BackendHardwareInterface", + # Coordinator + "ControlCoordinator", + "ControlCoordinatorConfig", "ControlMode", - # Orchestrator - "ControlOrchestrator", - "ControlOrchestratorConfig", # Task protocol and types "ControlTask", + "CoordinatorState", "HardwareComponent", "HardwareInterface", "HardwareType", "JointCommandOutput", "JointState", "JointStateSnapshot", - "OrchestratorState", "ResourceClaim", "TaskConfig", # Tick loop "TickLoop", - "control_orchestrator", + "control_coordinator", "make_joints", ] diff --git a/dimos/control/blueprints.py b/dimos/control/blueprints.py index 8c8b0e62c5..f2365fbef6 100644 --- a/dimos/control/blueprints.py +++ b/dimos/control/blueprints.py @@ -12,36 +12,36 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Pre-configured blueprints for the ControlOrchestrator. +"""Pre-configured blueprints for the ControlCoordinator. -This module provides ready-to-use orchestrator blueprints for common setups. +This module provides ready-to-use coordinator blueprints for common setups. Usage: # Run via CLI: - dimos run orchestrator-mock # Mock 7-DOF arm - dimos run orchestrator-xarm7 # XArm7 real hardware - dimos run orchestrator-dual-mock # Dual mock arms + dimos run coordinator-mock # Mock 7-DOF arm + dimos run coordinator-xarm7 # XArm7 real hardware + dimos run coordinator-dual-mock # Dual mock arms # Or programmatically: - from dimos.control.blueprints import orchestrator_mock - coordinator = orchestrator_mock.build() + from dimos.control.blueprints import coordinator_mock + coordinator = coordinator_mock.build() coordinator.loop() Example with trajectory setter: - # Terminal 1: Run the orchestrator - dimos run orchestrator-mock + # Terminal 1: Run the coordinator + dimos run coordinator-mock # Terminal 2: Send trajectories via RPC - python -m dimos.control.examples.orchestrator_trajectory_setter --task traj_arm + python -m dimos.control.examples.coordinator_trajectory_setter --task traj_arm """ from __future__ import annotations from dimos.control.components import HardwareComponent, HardwareType, make_joints -from dimos.control.orchestrator import ( - ControlOrchestrator, +from dimos.control.coordinator import ( + ControlCoordinator, TaskConfig, - control_orchestrator, + control_coordinator, ) from dimos.core.transport import LCMTransport from dimos.msgs.sensor_msgs import JointState @@ -57,10 +57,10 @@ def _joint_names(hardware_id: str, dof: int) -> list[str]: # ============================================================================= # Mock 7-DOF arm (for testing) -orchestrator_mock = control_orchestrator( +coordinator_mock = control_coordinator( tick_rate=100.0, publish_joint_state=True, - joint_state_frame_id="orchestrator", + joint_state_frame_id="coordinator", hardware=[ HardwareComponent( hardware_id="arm", @@ -79,15 +79,15 @@ def _joint_names(hardware_id: str, dof: int) -> list[str]: ], ).transports( { - ("joint_state", JointState): LCMTransport("/orchestrator/joint_state", JointState), + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), } ) # XArm7 real hardware (requires IP configuration) -orchestrator_xarm7 = control_orchestrator( +coordinator_xarm7 = control_coordinator( tick_rate=100.0, publish_joint_state=True, - joint_state_frame_id="orchestrator", + joint_state_frame_id="coordinator", hardware=[ HardwareComponent( hardware_id="arm", @@ -108,15 +108,15 @@ def _joint_names(hardware_id: str, dof: int) -> list[str]: ], ).transports( { - ("joint_state", JointState): LCMTransport("/orchestrator/joint_state", JointState), + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), } ) # XArm6 real hardware -orchestrator_xarm6 = control_orchestrator( +coordinator_xarm6 = control_coordinator( tick_rate=100.0, publish_joint_state=True, - joint_state_frame_id="orchestrator", + joint_state_frame_id="coordinator", hardware=[ HardwareComponent( hardware_id="arm", @@ -137,15 +137,15 @@ def _joint_names(hardware_id: str, dof: int) -> list[str]: ], ).transports( { - ("joint_state", JointState): LCMTransport("/orchestrator/joint_state", JointState), + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), } ) # Piper arm (6-DOF, CAN bus) -orchestrator_piper = control_orchestrator( +coordinator_piper = control_coordinator( tick_rate=100.0, publish_joint_state=True, - joint_state_frame_id="orchestrator", + joint_state_frame_id="coordinator", hardware=[ HardwareComponent( hardware_id="arm", @@ -166,7 +166,7 @@ def _joint_names(hardware_id: str, dof: int) -> list[str]: ], ).transports( { - ("joint_state", JointState): LCMTransport("/orchestrator/joint_state", JointState), + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), } ) @@ -175,10 +175,10 @@ def _joint_names(hardware_id: str, dof: int) -> list[str]: # ============================================================================= # Dual mock arms (7-DOF left, 6-DOF right) for testing -orchestrator_dual_mock = control_orchestrator( +coordinator_dual_mock = control_coordinator( tick_rate=100.0, publish_joint_state=True, - joint_state_frame_id="orchestrator", + joint_state_frame_id="coordinator", hardware=[ HardwareComponent( hardware_id="left_arm", @@ -209,15 +209,15 @@ def _joint_names(hardware_id: str, dof: int) -> list[str]: ], ).transports( { - ("joint_state", JointState): LCMTransport("/orchestrator/joint_state", JointState), + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), } ) # Dual XArm setup (XArm7 left, XArm6 right) -orchestrator_dual_xarm = control_orchestrator( +coordinator_dual_xarm = control_coordinator( tick_rate=100.0, publish_joint_state=True, - joint_state_frame_id="orchestrator", + joint_state_frame_id="coordinator", hardware=[ HardwareComponent( hardware_id="left_arm", @@ -252,15 +252,15 @@ def _joint_names(hardware_id: str, dof: int) -> list[str]: ], ).transports( { - ("joint_state", JointState): LCMTransport("/orchestrator/joint_state", JointState), + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), } ) # Dual Arm setup (XArm6 , Piper ) -orchestrator_piper_xarm = control_orchestrator( +coordinator_piper_xarm = control_coordinator( tick_rate=100.0, publish_joint_state=True, - joint_state_frame_id="orchestrator", + joint_state_frame_id="coordinator", hardware=[ HardwareComponent( hardware_id="xarm_arm", @@ -295,7 +295,7 @@ def _joint_names(hardware_id: str, dof: int) -> list[str]: ], ).transports( { - ("joint_state", JointState): LCMTransport("/orchestrator/joint_state", JointState), + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), } ) @@ -304,10 +304,10 @@ def _joint_names(hardware_id: str, dof: int) -> list[str]: # ============================================================================= # High-frequency mock for demanding applications -orchestrator_highfreq_mock = control_orchestrator( +coordinator_highfreq_mock = control_coordinator( tick_rate=200.0, publish_joint_state=True, - joint_state_frame_id="orchestrator", + joint_state_frame_id="coordinator", hardware=[ HardwareComponent( hardware_id="arm", @@ -326,7 +326,7 @@ def _joint_names(hardware_id: str, dof: int) -> list[str]: ], ).transports( { - ("joint_state", JointState): LCMTransport("/orchestrator/joint_state", JointState), + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), } ) @@ -334,30 +334,30 @@ def _joint_names(hardware_id: str, dof: int) -> list[str]: # Raw Blueprints (no hardware/tasks configured - for programmatic setup) # ============================================================================= -# Basic orchestrator with transport only (add hardware/tasks programmatically) -orchestrator_basic = control_orchestrator( +# Basic coordinator with transport only (add hardware/tasks programmatically) +coordinator_basic = control_coordinator( tick_rate=100.0, publish_joint_state=True, - joint_state_frame_id="orchestrator", + joint_state_frame_id="coordinator", ).transports( { - ("joint_state", JointState): LCMTransport("/orchestrator/joint_state", JointState), + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), } ) __all__ = [ # Raw blueprints (for programmatic setup) - "orchestrator_basic", + "coordinator_basic", # Dual arm blueprints - "orchestrator_dual_mock", - "orchestrator_dual_xarm", + "coordinator_dual_mock", + "coordinator_dual_xarm", # High-frequency blueprints - "orchestrator_highfreq_mock", + "coordinator_highfreq_mock", # Single arm blueprints - "orchestrator_mock", - "orchestrator_piper", - "orchestrator_piper_xarm", - "orchestrator_xarm6", - "orchestrator_xarm7", + "coordinator_mock", + "coordinator_piper", + "coordinator_piper_xarm", + "coordinator_xarm6", + "coordinator_xarm7", ] diff --git a/dimos/control/orchestrator.py b/dimos/control/coordinator.py similarity index 91% rename from dimos/control/orchestrator.py rename to dimos/control/coordinator.py index 619969fa89..8976ed9e66 100644 --- a/dimos/control/orchestrator.py +++ b/dimos/control/coordinator.py @@ -12,9 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""ControlOrchestrator module. +"""ControlCoordinator module. -Centralized control orchestrator that replaces per-driver/per-controller +Centralized control coordinator that replaces per-driver/per-controller loops with a single deterministic tick-based system. Features: @@ -89,8 +89,8 @@ class TaskStatus: @dataclass -class ControlOrchestratorConfig(ModuleConfig): - """Configuration for the ControlOrchestrator. +class ControlCoordinatorConfig(ModuleConfig): + """Configuration for the ControlCoordinator. Attributes: tick_rate: Control loop frequency in Hz (default: 100) @@ -103,19 +103,19 @@ class ControlOrchestratorConfig(ModuleConfig): tick_rate: float = 100.0 publish_joint_state: bool = True - joint_state_frame_id: str = "orchestrator" + joint_state_frame_id: str = "coordinator" log_ticks: bool = False hardware: list[HardwareComponent] = field(default_factory=lambda: []) tasks: list[TaskConfig] = field(default_factory=lambda: []) # ============================================================================= -# ControlOrchestrator Module +# ControlCoordinator Module # ============================================================================= -class ControlOrchestrator(Module[ControlOrchestratorConfig]): - """Centralized control orchestrator with per-joint arbitration. +class ControlCoordinator(Module[ControlCoordinatorConfig]): + """Centralized control coordinator with per-joint arbitration. Single tick loop that: 1. Reads state from all hardware @@ -132,10 +132,10 @@ class ControlOrchestrator(Module[ControlOrchestratorConfig]): - Aggregated preemption (one notification per task per tick) Example: - >>> from dimos.control import ControlOrchestrator + >>> from dimos.control import ControlCoordinator >>> from dimos.hardware.manipulators.xarm import XArmBackend >>> - >>> orch = ControlOrchestrator(tick_rate=100.0) + >>> orch = ControlCoordinator(tick_rate=100.0) >>> backend = XArmBackend(ip="192.168.1.185", dof=7) >>> backend.connect() >>> orch.add_hardware("left_arm", backend, joint_prefix="left") @@ -145,8 +145,8 @@ class ControlOrchestrator(Module[ControlOrchestratorConfig]): # Output: Aggregated joint state for external consumers joint_state: Out[JointState] - config: ControlOrchestratorConfig - default_config = ControlOrchestratorConfig + config: ControlCoordinatorConfig + default_config = ControlCoordinatorConfig def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) @@ -165,7 +165,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: # Tick loop (created on start) self._tick_loop: TickLoop | None = None - logger.info(f"ControlOrchestrator initialized at {self.config.tick_rate}Hz") + logger.info(f"ControlCoordinator initialized at {self.config.tick_rate}Hz") # ========================================================================= # Config-based Setup @@ -258,7 +258,7 @@ def add_hardware( backend: ManipulatorBackend, component: HardwareComponent, ) -> bool: - """Register a hardware backend with the orchestrator.""" + """Register a hardware backend with the coordinator.""" with self._hardware_lock: if component.hardware_id in self._hardware: logger.warning(f"Hardware {component.hardware_id} already registered") @@ -283,7 +283,7 @@ def remove_hardware(self, hardware_id: str) -> bool: """Remove a hardware interface. Note: For safety, call this only when no tasks are actively using this - hardware. Consider stopping the orchestrator before removing hardware. + hardware. Consider stopping the coordinator before removing hardware. """ with self._hardware_lock: if hardware_id not in self._hardware: @@ -330,9 +330,9 @@ def get_joint_positions(self) -> dict[str, float]: with self._hardware_lock: positions: dict[str, float] = {} for hw in self._hardware.values(): - state = hw.read_state() # {joint_name: (pos, vel, effort)} - for joint_name, (pos, _vel, _effort) in state.items(): - positions[joint_name] = pos + state = hw.read_state() # {joint_name: JointState} + for joint_name, joint_state in state.items(): + positions[joint_name] = joint_state.position return positions # ========================================================================= @@ -341,7 +341,7 @@ def get_joint_positions(self) -> dict[str, float]: @rpc def add_task(self, task: ControlTask) -> bool: - """Register a task with the orchestrator.""" + """Register a task with the coordinator.""" if not isinstance(task, ControlTask): raise TypeError("task must implement ControlTask") @@ -448,9 +448,9 @@ def cancel_trajectory(self, task_name: str) -> bool: @rpc def start(self) -> None: - """Start the orchestrator control loop.""" + """Start the coordinator control loop.""" if self._tick_loop and self._tick_loop.is_running: - logger.warning("Orchestrator already running") + logger.warning("Coordinator already running") return super().start() @@ -474,12 +474,12 @@ def start(self) -> None: ) self._tick_loop.start() - logger.info(f"ControlOrchestrator started at {self.config.tick_rate}Hz") + logger.info(f"ControlCoordinator started at {self.config.tick_rate}Hz") @rpc def stop(self) -> None: - """Stop the orchestrator.""" - logger.info("Stopping ControlOrchestrator...") + """Stop the coordinator.""" + logger.info("Stopping ControlCoordinator...") if self._tick_loop: self._tick_loop.stop() @@ -494,7 +494,7 @@ def stop(self) -> None: logger.error(f"Error disconnecting hardware {hw_id}: {e}") super().stop() - logger.info("ControlOrchestrator stopped") + logger.info("ControlCoordinator stopped") @rpc def get_tick_count(self) -> int: @@ -503,13 +503,13 @@ def get_tick_count(self) -> int: # Blueprint export -control_orchestrator = ControlOrchestrator.blueprint +control_coordinator = ControlCoordinator.blueprint __all__ = [ - "ControlOrchestrator", - "ControlOrchestratorConfig", + "ControlCoordinator", + "ControlCoordinatorConfig", "HardwareComponent", "TaskConfig", - "control_orchestrator", + "control_coordinator", ] diff --git a/dimos/control/hardware_interface.py b/dimos/control/hardware_interface.py index afe53c73f2..ed320abec5 100644 --- a/dimos/control/hardware_interface.py +++ b/dimos/control/hardware_interface.py @@ -12,9 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Hardware interface for the ControlOrchestrator. +"""Hardware interface for the ControlCoordinator. -Wraps ManipulatorBackend with orchestrator-specific features: +Wraps ManipulatorBackend with coordinator-specific features: - Namespaced joint names (e.g., "left_joint1") - Unified read/write interface - Hold-last-value for partial commands @@ -36,9 +36,9 @@ @runtime_checkable class HardwareInterface(Protocol): - """Protocol for hardware that the orchestrator can control. + """Protocol for hardware that the coordinator can control. - This wraps ManipulatorBackend with orchestrator-specific features: + This wraps ManipulatorBackend with coordinator-specific features: - Namespaced joint names (e.g., "left_arm_joint1") - Unified read/write interface - State caching @@ -184,7 +184,7 @@ def write_command(self, commands: dict[str, float], mode: ControlMode) -> bool: self._last_commanded[joint_name] = value elif joint_name not in self._warned_unknown_joints: logger.warning( - f"Hardware {self._hardware_id} received command for unknown joint " + f"Hardware {self.hardware_id} received command for unknown joint " f"{joint_name}. Valid joints: {self._joint_names}" ) self._warned_unknown_joints.add(joint_name) @@ -195,7 +195,7 @@ def write_command(self, commands: dict[str, float], mode: ControlMode) -> bool: # Switch control mode if needed if mode != self._current_mode: if not self._backend.set_control_mode(mode): - logger.warning(f"Hardware {self._hardware_id} failed to switch to {mode.name}") + logger.warning(f"Hardware {self.hardware_id} failed to switch to {mode.name}") return False self._current_mode = mode @@ -206,7 +206,7 @@ def write_command(self, commands: dict[str, float], mode: ControlMode) -> bool: case ControlMode.VELOCITY: return self._backend.write_joint_velocities(ordered) case ControlMode.TORQUE: - logger.warning(f"Hardware {self._hardware_id} does not support torque mode") + logger.warning(f"Hardware {self.hardware_id} does not support torque mode") return False case _: return False @@ -224,7 +224,7 @@ def _initialize_last_commanded(self) -> None: time.sleep(0.01) raise RuntimeError( - f"Hardware {self._hardware_id} failed to read initial positions after retries" + f"Hardware {self.hardware_id} failed to read initial positions after retries" ) def _build_ordered_command(self) -> list[float]: diff --git a/dimos/control/task.py b/dimos/control/task.py index 49589188d9..36e764453a 100644 --- a/dimos/control/task.py +++ b/dimos/control/task.py @@ -12,17 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""ControlTask protocol and types for the ControlOrchestrator. +"""ControlTask protocol and types for the ControlCoordinator. This module defines: -- Data types used by tasks and the orchestrator (ResourceClaim, JointStateSnapshot, etc.) +- Data types used by tasks and the coordinator (ResourceClaim, JointStateSnapshot, etc.) - ControlTask protocol that all tasks must implement -Tasks are "passive" - they don't own threads. The orchestrator calls +Tasks are "passive" - they don't own threads. The coordinator calls compute() at each tick, passing current state and time. CRITICAL: Tasks must NEVER call time.time() directly. -Use the t_now passed in OrchestratorState. +Use the t_now passed in CoordinatorState. """ from __future__ import annotations @@ -41,7 +41,7 @@ class ResourceClaim: """Declares which joints a task wants to control. - Used by the orchestrator to determine resource ownership and + Used by the coordinator to determine resource ownership and resolve conflicts between competing tasks. Attributes: @@ -94,7 +94,7 @@ def get_effort(self, joint_name: str) -> float | None: @dataclass -class OrchestratorState: +class CoordinatorState: """Complete state snapshot for tasks to read. Passed to each task's compute() method every tick. Contains @@ -109,7 +109,7 @@ class OrchestratorState: """ joints: JointStateSnapshot - t_now: float # Orchestrator time (perf_counter) - USE THIS, NOT time.time()! + t_now: float # Coordinator time (perf_counter) - USE THIS, NOT time.time()! dt: float # Time since last tick @@ -118,7 +118,7 @@ class JointCommandOutput: """Joint-centric command output from a task. Commands are addressed by joint name, NOT by hardware ID. - The orchestrator routes commands to the appropriate hardware. + The coordinator routes commands to the appropriate hardware. This design enables: - WBC spanning multiple hardware interfaces @@ -170,14 +170,14 @@ def get_values(self) -> list[float] | None: @runtime_checkable class ControlTask(Protocol): - """Protocol for passive tasks that run within the orchestrator. + """Protocol for passive tasks that run within the coordinator. - Tasks are "passive" - they don't own threads. The orchestrator + Tasks are "passive" - they don't own threads. The coordinator calls compute() at each tick, passing current state and time. Lifecycle: - 1. Task is added to orchestrator via add_task() - 2. Orchestrator calls claim() to understand resource needs + 1. Task is added to coordinator via add_task() + 2. Coordinator calls claim() to understand resource needs 3. Each tick: is_active() → compute() → output merged via arbitration 4. Task removed via remove_task() or transitions to inactive @@ -199,7 +199,7 @@ class ControlTask(Protocol): ... def is_active(self) -> bool: ... return self._executing ... - ... def compute(self, state: OrchestratorState) -> JointCommandOutput | None: + ... def compute(self, state: CoordinatorState) -> JointCommandOutput | None: ... # Use state.t_now, NOT time.time()! ... t_elapsed = state.t_now - self._start_time ... positions = self._trajectory.sample(t_elapsed) @@ -217,14 +217,14 @@ def name(self) -> str: """Unique identifier for this task instance. Used for logging, debugging, and task management. - Must be unique across all tasks in the orchestrator. + Must be unique across all tasks in the coordinator. """ ... def claim(self) -> ResourceClaim: """Declare resource requirements. - Called by orchestrator to determine: + Called by coordinator to determine: - Which joints this task wants to control - Priority for conflict resolution - Control mode (position/velocity/effort) @@ -233,7 +233,7 @@ def claim(self) -> ResourceClaim: ResourceClaim with joints (frozenset) and priority (int) Note: - The claim can change dynamically - orchestrator calls this + The claim can change dynamically - coordinator calls this every tick for active tasks. """ ... @@ -251,16 +251,16 @@ def is_active(self) -> bool: """ ... - def compute(self, state: OrchestratorState) -> JointCommandOutput | None: + def compute(self, state: CoordinatorState) -> JointCommandOutput | None: """Compute output command given current state. - Called by orchestrator for active tasks each tick. + Called by coordinator for active tasks each tick. CRITICAL: Use state.t_now for timing, NEVER time.time()! This ensures deterministic behavior and enables simulation. Args: - state: OrchestratorState containing: + state: CoordinatorState containing: - joints: JointStateSnapshot with all joint states - t_now: Current tick time (use this for all timing!) - dt: Time since last tick @@ -292,8 +292,8 @@ def on_preempted(self, by_task: str, joints: frozenset[str]) -> None: "ControlMode", # Protocol "ControlTask", + "CoordinatorState", "JointCommandOutput", "JointStateSnapshot", - "OrchestratorState", "ResourceClaim", ] diff --git a/dimos/control/tasks/trajectory_task.py b/dimos/control/tasks/trajectory_task.py index 08e3ae337e..676f1dac70 100644 --- a/dimos/control/tasks/trajectory_task.py +++ b/dimos/control/tasks/trajectory_task.py @@ -12,13 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Joint trajectory task for the ControlOrchestrator. +"""Joint trajectory task for the ControlCoordinator. -Passive trajectory execution - called by orchestrator each tick. +Passive trajectory execution - called by coordinator each tick. Unlike JointTrajectoryController which owns a thread, this task -is compute-only and relies on the orchestrator for timing. +is compute-only and relies on the coordinator for timing. -CRITICAL: Uses t_now from OrchestratorState, never calls time.time() +CRITICAL: Uses t_now from CoordinatorState, never calls time.time() """ from __future__ import annotations @@ -28,8 +28,8 @@ from dimos.control.task import ( ControlMode, ControlTask, + CoordinatorState, JointCommandOutput, - OrchestratorState, ResourceClaim, ) from dimos.msgs.trajectory_msgs import JointTrajectory, TrajectoryState @@ -55,9 +55,9 @@ class JointTrajectoryTask(ControlTask): """Passive trajectory execution task. Unlike JointTrajectoryController which owns a thread, this task - is called by the orchestrator at each tick. + is called by the coordinator at each tick. - CRITICAL: Uses t_now from OrchestratorState, never calls time.time() + CRITICAL: Uses t_now from CoordinatorState, never calls time.time() State Machine: IDLE ──execute()──► EXECUTING ──done──► COMPLETED @@ -74,8 +74,8 @@ class JointTrajectoryTask(ControlTask): ... priority=10, ... ), ... ) - >>> orchestrator.add_task(task) - >>> task.execute(my_trajectory, t_now=orchestrator_t_now) + >>> coordinator.add_task(task) + >>> task.execute(my_trajectory, t_now=coordinator_t_now) """ def __init__(self, name: str, config: JointTrajectoryTaskConfig) -> None: @@ -117,13 +117,13 @@ def is_active(self) -> bool: """Check if task should run this tick.""" return self._state == TrajectoryState.EXECUTING - def compute(self, state: OrchestratorState) -> JointCommandOutput | None: + def compute(self, state: CoordinatorState) -> JointCommandOutput | None: """Compute trajectory output for this tick. CRITICAL: Uses state.t_now for timing, NOT time.time()! Args: - state: Current orchestrator state + state: Current coordinator state Returns: JointCommandOutput with positions, or None if not executing @@ -244,7 +244,7 @@ def get_progress(self, t_now: float) -> float: """Get execution progress (0.0 to 1.0). Args: - t_now: Current orchestrator time + t_now: Current coordinator time Returns: Progress as fraction, or 0.0 if not executing diff --git a/dimos/control/test_control.py b/dimos/control/test_control.py index cf41775d49..a1633a3a98 100644 --- a/dimos/control/test_control.py +++ b/dimos/control/test_control.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Tests for the Control Orchestrator module.""" +"""Tests for the Control Coordinator module.""" from __future__ import annotations @@ -26,9 +26,9 @@ from dimos.control.hardware_interface import BackendHardwareInterface from dimos.control.task import ( ControlMode, + CoordinatorState, JointCommandOutput, JointStateSnapshot, - OrchestratorState, ResourceClaim, ) from dimos.control.tasks.trajectory_task import ( @@ -101,15 +101,15 @@ def simple_trajectory(): @pytest.fixture -def orchestrator_state(): - """Create a sample OrchestratorState.""" +def coordinator_state(): + """Create a sample CoordinatorState.""" joints = JointStateSnapshot( joint_positions={"arm_joint1": 0.0, "arm_joint2": 0.0, "arm_joint3": 0.0}, joint_velocities={"arm_joint1": 0.0, "arm_joint2": 0.0, "arm_joint3": 0.0}, joint_efforts={"arm_joint1": 0.0, "arm_joint2": 0.0, "arm_joint3": 0.0}, timestamp=time.perf_counter(), ) - return OrchestratorState(joints=joints, t_now=time.perf_counter(), dt=0.01) + return CoordinatorState(joints=joints, t_now=time.perf_counter(), dt=0.01) # ============================================================================= @@ -231,23 +231,21 @@ def test_execute_trajectory(self, trajectory_task, simple_trajectory): assert trajectory_task.is_active() assert trajectory_task.get_state() == TrajectoryState.EXECUTING - def test_compute_during_trajectory( - self, trajectory_task, simple_trajectory, orchestrator_state - ): + def test_compute_during_trajectory(self, trajectory_task, simple_trajectory, coordinator_state): t_start = time.perf_counter() trajectory_task.execute(simple_trajectory) # First compute sets start time (deferred start) - state0 = OrchestratorState( - joints=orchestrator_state.joints, + state0 = CoordinatorState( + joints=coordinator_state.joints, t_now=t_start, dt=0.01, ) trajectory_task.compute(state0) # Compute at 0.5s into trajectory - state = OrchestratorState( - joints=orchestrator_state.joints, + state = CoordinatorState( + joints=coordinator_state.joints, t_now=t_start + 0.5, dt=0.01, ) @@ -258,21 +256,21 @@ def test_compute_during_trajectory( assert len(output.positions) == 3 assert 0.4 < output.positions[0] < 0.6 - def test_trajectory_completes(self, trajectory_task, simple_trajectory, orchestrator_state): + def test_trajectory_completes(self, trajectory_task, simple_trajectory, coordinator_state): t_start = time.perf_counter() trajectory_task.execute(simple_trajectory) # First compute sets start time (deferred start) - state0 = OrchestratorState( - joints=orchestrator_state.joints, + state0 = CoordinatorState( + joints=coordinator_state.joints, t_now=t_start, dt=0.01, ) trajectory_task.compute(state0) # Compute past trajectory duration - state = OrchestratorState( - joints=orchestrator_state.joints, + state = CoordinatorState( + joints=coordinator_state.joints, t_now=t_start + 1.5, dt=0.01, ) @@ -299,13 +297,13 @@ def test_preemption(self, trajectory_task, simple_trajectory): assert trajectory_task.get_state() == TrajectoryState.ABORTED assert not trajectory_task.is_active() - def test_progress(self, trajectory_task, simple_trajectory, orchestrator_state): + def test_progress(self, trajectory_task, simple_trajectory, coordinator_state): t_start = time.perf_counter() trajectory_task.execute(simple_trajectory) # First compute sets start time (deferred start) - state0 = OrchestratorState( - joints=orchestrator_state.joints, + state0 = CoordinatorState( + joints=coordinator_state.joints, t_now=t_start, dt=0.01, ) diff --git a/dimos/control/tick_loop.py b/dimos/control/tick_loop.py index a15388f31e..7812325ddf 100644 --- a/dimos/control/tick_loop.py +++ b/dimos/control/tick_loop.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Tick loop for the ControlOrchestrator. +"""Tick loop for the ControlCoordinator. This module contains the core control loop logic: - Read state from all hardware @@ -21,7 +21,7 @@ - Route commands to hardware - Publish aggregated joint state -Separated from orchestrator.py following the DimOS pattern of +Separated from coordinator.py following the DimOS pattern of splitting coordination logic from module wrapper. """ @@ -33,9 +33,9 @@ from dimos.control.task import ( ControlTask, + CoordinatorState, JointCommandOutput, JointStateSnapshot, - OrchestratorState, ResourceClaim, ) from dimos.msgs.sensor_msgs import JointState @@ -60,7 +60,7 @@ class JointWinner(NamedTuple): class TickLoop: - """Core tick loop for the control orchestrator. + """Core tick loop for the control coordinator. Runs the deterministic control cycle: 1. READ: Collect joint state from all hardware @@ -92,7 +92,7 @@ def __init__( task_lock: threading.Lock, joint_to_hardware: dict[str, str], publish_callback: Callable[[JointState], None] | None = None, - frame_id: str = "orchestrator", + frame_id: str = "coordinator", log_ticks: bool = False, ) -> None: self._tick_rate = tick_rate @@ -133,7 +133,7 @@ def start(self) -> None: self._tick_thread = threading.Thread( target=self._loop, - name="ControlOrchestrator-Tick", + name="ControlCoordinator-Tick", daemon=True, ) self._tick_thread.start() @@ -173,7 +173,7 @@ def _tick(self) -> None: # === PHASE 1: READ ALL HARDWARE === joint_states = self._read_all_hardware() - state = OrchestratorState(joints=joint_states, t_now=t_now, dt=dt) + state = CoordinatorState(joints=joint_states, t_now=t_now, dt=dt) # === PHASE 2: COMPUTE ALL ACTIVE TASKS === commands = self._compute_all_tasks(state) @@ -228,7 +228,7 @@ def _read_all_hardware(self) -> JointStateSnapshot: ) def _compute_all_tasks( - self, state: OrchestratorState + self, state: CoordinatorState ) -> list[tuple[ControlTask, ResourceClaim, JointCommandOutput | None]]: """Compute outputs from all active tasks.""" results: list[tuple[ControlTask, ResourceClaim, JointCommandOutput | None]] = [] diff --git a/dimos/e2e_tests/test_control_orchestrator.py b/dimos/e2e_tests/test_control_coordinator.py similarity index 74% rename from dimos/e2e_tests/test_control_orchestrator.py rename to dimos/e2e_tests/test_control_coordinator.py index ae2bb17352..2347b7344d 100644 --- a/dimos/e2e_tests/test_control_orchestrator.py +++ b/dimos/e2e_tests/test_control_coordinator.py @@ -12,13 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""End-to-end tests for the ControlOrchestrator. +"""End-to-end tests for the ControlCoordinator. -These tests start a real orchestrator process and communicate via LCM/RPC. +These tests start a real coordinator process and communicate via LCM/RPC. Unlike unit tests, these verify the full system integration. Run with: - pytest dimos/e2e_tests/test_control_orchestrator.py -v -s + pytest dimos/e2e_tests/test_control_coordinator.py -v -s """ import os @@ -26,26 +26,26 @@ import pytest -from dimos.control.orchestrator import ControlOrchestrator +from dimos.control.coordinator import ControlCoordinator from dimos.core.rpc_client import RPCClient from dimos.msgs.sensor_msgs import JointState from dimos.msgs.trajectory_msgs import JointTrajectory, TrajectoryPoint, TrajectoryState @pytest.mark.skipif(bool(os.getenv("CI")), reason="LCM doesn't work in CI.") -class TestControlOrchestratorE2E: - """End-to-end tests for ControlOrchestrator.""" +class TestControlCoordinatorE2E: + """End-to-end tests for ControlCoordinator.""" - def test_orchestrator_starts_and_responds_to_rpc(self, lcm_spy, start_blueprint) -> None: - """Test that orchestrator starts and responds to RPC queries.""" + def test_coordinator_starts_and_responds_to_rpc(self, lcm_spy, start_blueprint) -> None: + """Test that coordinator starts and responds to RPC queries.""" # Save topics we care about (LCM topics include type suffix) - joint_state_topic = "/orchestrator/joint_state#sensor_msgs.JointState" + joint_state_topic = "/coordinator/joint_state#sensor_msgs.JointState" lcm_spy.save_topic(joint_state_topic) - lcm_spy.save_topic("/rpc/ControlOrchestrator/list_joints/res") - lcm_spy.save_topic("/rpc/ControlOrchestrator/list_tasks/res") + lcm_spy.save_topic("/rpc/ControlCoordinator/list_joints/res") + lcm_spy.save_topic("/rpc/ControlCoordinator/list_tasks/res") - # Start the mock orchestrator blueprint - start_blueprint("orchestrator-mock") + # Start the mock coordinator blueprint + start_blueprint("coordinator-mock") # Wait for joint state to be published (proves tick loop is running) lcm_spy.wait_for_saved_topic( @@ -54,7 +54,7 @@ def test_orchestrator_starts_and_responds_to_rpc(self, lcm_spy, start_blueprint) ) # Create RPC client and query - client = RPCClient(None, ControlOrchestrator) + client = RPCClient(None, ControlCoordinator) try: # Test list_joints RPC joints = client.list_joints() @@ -74,23 +74,23 @@ def test_orchestrator_starts_and_responds_to_rpc(self, lcm_spy, start_blueprint) finally: client.stop_rpc_client() - def test_orchestrator_executes_trajectory(self, lcm_spy, start_blueprint) -> None: - """Test that orchestrator executes a trajectory via RPC.""" + def test_coordinator_executes_trajectory(self, lcm_spy, start_blueprint) -> None: + """Test that coordinator executes a trajectory via RPC.""" # Save topics - lcm_spy.save_topic("/orchestrator/joint_state#sensor_msgs.JointState") - lcm_spy.save_topic("/rpc/ControlOrchestrator/execute_trajectory/res") - lcm_spy.save_topic("/rpc/ControlOrchestrator/get_trajectory_status/res") + lcm_spy.save_topic("/coordinator/joint_state#sensor_msgs.JointState") + lcm_spy.save_topic("/rpc/ControlCoordinator/execute_trajectory/res") + lcm_spy.save_topic("/rpc/ControlCoordinator/get_trajectory_status/res") - # Start orchestrator - start_blueprint("orchestrator-mock") + # Start coordinator + start_blueprint("coordinator-mock") # Wait for it to be ready lcm_spy.wait_for_saved_topic( - "/orchestrator/joint_state#sensor_msgs.JointState", timeout=10.0 + "/coordinator/joint_state#sensor_msgs.JointState", timeout=10.0 ) # Create RPC client - client = RPCClient(None, ControlOrchestrator) + client = RPCClient(None, ControlCoordinator) try: # Get initial joint positions initial_positions = client.get_joint_positions() @@ -133,13 +133,13 @@ def test_orchestrator_executes_trajectory(self, lcm_spy, start_blueprint) -> Non finally: client.stop_rpc_client() - def test_orchestrator_joint_state_published(self, lcm_spy, start_blueprint) -> None: + def test_coordinator_joint_state_published(self, lcm_spy, start_blueprint) -> None: """Test that joint state messages are published at expected rate.""" - joint_state_topic = "/orchestrator/joint_state#sensor_msgs.JointState" + joint_state_topic = "/coordinator/joint_state#sensor_msgs.JointState" lcm_spy.save_topic(joint_state_topic) - # Start orchestrator - start_blueprint("orchestrator-mock") + # Start coordinator + start_blueprint("coordinator-mock") # Wait for initial message lcm_spy.wait_for_saved_topic(joint_state_topic, timeout=10.0) @@ -163,17 +163,17 @@ def test_orchestrator_joint_state_published(self, lcm_spy, start_blueprint) -> N assert len(joint_state.position) == 7 assert "arm_joint1" in joint_state.name - def test_orchestrator_cancel_trajectory(self, lcm_spy, start_blueprint) -> None: + def test_coordinator_cancel_trajectory(self, lcm_spy, start_blueprint) -> None: """Test that a running trajectory can be cancelled.""" - lcm_spy.save_topic("/orchestrator/joint_state#sensor_msgs.JointState") + lcm_spy.save_topic("/coordinator/joint_state#sensor_msgs.JointState") - # Start orchestrator - start_blueprint("orchestrator-mock") + # Start coordinator + start_blueprint("coordinator-mock") lcm_spy.wait_for_saved_topic( - "/orchestrator/joint_state#sensor_msgs.JointState", timeout=10.0 + "/coordinator/joint_state#sensor_msgs.JointState", timeout=10.0 ) - client = RPCClient(None, ControlOrchestrator) + client = RPCClient(None, ControlCoordinator) try: # Create a long trajectory (5 seconds) trajectory = JointTrajectory( @@ -208,17 +208,17 @@ def test_orchestrator_cancel_trajectory(self, lcm_spy, start_blueprint) -> None: finally: client.stop_rpc_client() - def test_dual_arm_orchestrator(self, lcm_spy, start_blueprint) -> None: - """Test dual-arm orchestrator with independent trajectories.""" - lcm_spy.save_topic("/orchestrator/joint_state#sensor_msgs.JointState") + def test_dual_arm_coordinator(self, lcm_spy, start_blueprint) -> None: + """Test dual-arm coordinator with independent trajectories.""" + lcm_spy.save_topic("/coordinator/joint_state#sensor_msgs.JointState") - # Start dual-arm mock orchestrator - start_blueprint("orchestrator-dual-mock") + # Start dual-arm mock coordinator + start_blueprint("coordinator-dual-mock") lcm_spy.wait_for_saved_topic( - "/orchestrator/joint_state#sensor_msgs.JointState", timeout=10.0 + "/coordinator/joint_state#sensor_msgs.JointState", timeout=10.0 ) - client = RPCClient(None, ControlOrchestrator) + client = RPCClient(None, ControlCoordinator) try: # Verify both arms present joints = client.list_joints() diff --git a/dimos/manipulation/control/orchestrator_client.py b/dimos/manipulation/control/coordinator_client.py similarity index 90% rename from dimos/manipulation/control/orchestrator_client.py rename to dimos/manipulation/control/coordinator_client.py index 7f30e6ba8c..b30c1f46f7 100644 --- a/dimos/manipulation/control/orchestrator_client.py +++ b/dimos/manipulation/control/coordinator_client.py @@ -14,30 +14,30 @@ # limitations under the License. """ -Interactive client for the ControlOrchestrator. +Interactive client for the ControlCoordinator. -Interfaces with a running ControlOrchestrator via RPC to: +Interfaces with a running ControlCoordinator via RPC to: - Query hardware and task status - Plan and execute trajectories on single or multiple arms - Monitor execution progress Usage: - # Terminal 1: Start the orchestrator - dimos run orchestrator-mock # Single arm - dimos run orchestrator-dual-mock # Dual arm + # Terminal 1: Start the coordinator + dimos run coordinator-mock # Single arm + dimos run coordinator-dual-mock # Dual arm # Terminal 2: Run this client - python -m dimos.manipulation.control.orchestrator_client - python -m dimos.manipulation.control.orchestrator_client --task traj_left - python -m dimos.manipulation.control.orchestrator_client --task traj_right + python -m dimos.manipulation.control.coordinator_client + python -m dimos.manipulation.control.coordinator_client --task traj_left + python -m dimos.manipulation.control.coordinator_client --task traj_right How it works: - 1. Connects to ControlOrchestrator via LCM RPC + 1. Connects to ControlCoordinator via LCM RPC 2. Queries available hardware/tasks/joints 3. You add waypoints (joint positions) 4. Generates trajectory with trapezoidal velocity profile - 5. Sends trajectory to orchestrator via execute_trajectory() RPC - 6. Orchestrator's tick loop executes it at 100Hz + 5. Sends trajectory to coordinator via execute_trajectory() RPC + 6. Coordinator's tick loop executes it at 100Hz """ from __future__ import annotations @@ -47,7 +47,7 @@ import time from typing import TYPE_CHECKING, Any -from dimos.control.orchestrator import ControlOrchestrator +from dimos.control.coordinator import ControlCoordinator from dimos.core.rpc_client import RPCClient from dimos.manipulation.planning import JointTrajectoryGenerator @@ -55,17 +55,17 @@ from dimos.msgs.trajectory_msgs import JointTrajectory -class OrchestratorClient: +class CoordinatorClient: """ - RPC client for the ControlOrchestrator. + RPC client for the ControlCoordinator. - Connects to a running orchestrator and provides methods to: + Connects to a running coordinator and provides methods to: - Query state (joints, tasks, hardware) - Execute trajectories on any task - Monitor progress Example: - client = OrchestratorClient() + client = CoordinatorClient() # Query state print(client.list_hardware()) # ['left_arm', 'right_arm'] @@ -84,8 +84,8 @@ class OrchestratorClient: """ def __init__(self) -> None: - """Initialize connection to orchestrator via RPC.""" - self._rpc = RPCClient(None, ControlOrchestrator) + """Initialize connection to coordinator via RPC.""" + self._rpc = RPCClient(None, ControlCoordinator) # Per-task state self._current_task: str | None = None @@ -144,7 +144,7 @@ def select_task(self, task_name: str) -> bool: """ Select a task and setup its trajectory generator. - This queries the orchestrator to find which joints the task controls, + This queries the coordinator to find which joints the task controls, then creates a trajectory generator for those joints. """ tasks = self.list_tasks() @@ -317,7 +317,7 @@ def preview_trajectory(trajectory: JointTrajectory, joint_names: list[str]) -> N print("=" * 70) -def wait_for_completion(client: OrchestratorClient, task_name: str, timeout: float = 60.0) -> bool: +def wait_for_completion(client: CoordinatorClient, task_name: str, timeout: float = 60.0) -> bool: """Wait for trajectory to complete with progress display.""" start = time.time() last_progress = -1.0 @@ -343,10 +343,10 @@ def wait_for_completion(client: OrchestratorClient, task_name: str, timeout: flo return False -class OrchestratorShell: - """IPython shell interface for orchestrator control.""" +class CoordinatorShell: + """IPython shell interface for coordinator control.""" - def __init__(self, client: OrchestratorClient, initial_task: str) -> None: + def __init__(self, client: CoordinatorClient, initial_task: str) -> None: self._client = client self._current_task = initial_task self._waypoints: list[list[float]] = [] @@ -363,7 +363,7 @@ def _num_joints(self) -> int: def help(self) -> None: """Show available commands.""" - print("\nOrchestrator Client Commands:") + print("\nCoordinator Client Commands:") print("=" * 60) print("Waypoint Commands:") print(" here() - Add current position as waypoint") @@ -561,14 +561,14 @@ def accel(self, value: float | None = None) -> None: print(f"Max acceleration: {value:.2f} rad/s^2") -def interactive_mode(client: OrchestratorClient, initial_task: str) -> None: +def interactive_mode(client: CoordinatorClient, initial_task: str) -> None: """Start IPython interactive mode.""" import IPython - shell = OrchestratorShell(client, initial_task) + shell = CoordinatorShell(client, initial_task) print("\n" + "=" * 60) - print(f"Orchestrator Client (IPython) - Task: {initial_task}") + print(f"Coordinator Client (IPython) - Task: {initial_task}") print("=" * 60) print(f"Joints: {', '.join(shell._joints())}") print("\nType help() for available commands") @@ -600,15 +600,15 @@ def interactive_mode(client: OrchestratorClient, initial_task: str) -> None: ) -def _run_client(client: OrchestratorClient, task: str, vel: float, accel: float) -> int: +def _run_client(client: CoordinatorClient, task: str, vel: float, accel: float) -> int: """Run the client with the given configuration.""" try: hardware = client.list_hardware() tasks = client.list_tasks() if not hardware: - print("\nWarning: No hardware found. Is the orchestrator running?") - print("Start with: dimos run orchestrator-mock") + print("\nWarning: No hardware found. Is the coordinator running?") + print("Start with: dimos run coordinator-mock") response = input("Continue anyway? [y/N]: ").strip().lower() if response != "y": return 0 @@ -618,7 +618,7 @@ def _run_client(client: OrchestratorClient, task: str, vel: float, accel: float) except Exception as e: print(f"\nConnection error: {e}") - print("Make sure orchestrator is running: dimos run orchestrator-mock") + print("Make sure coordinator is running: dimos run coordinator-mock") return 1 if task not in tasks and tasks: @@ -640,18 +640,18 @@ def main() -> int: import argparse parser = argparse.ArgumentParser( - description="Interactive client for ControlOrchestrator", + description="Interactive client for ControlCoordinator", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: - # Single arm (with orchestrator-mock running) - python -m dimos.manipulation.control.orchestrator_client + # Single arm (with coordinator-mock running) + python -m dimos.manipulation.control.coordinator_client # Dual arm - control left arm - python -m dimos.manipulation.control.orchestrator_client --task traj_left + python -m dimos.manipulation.control.coordinator_client --task traj_left # Dual arm - control right arm - python -m dimos.manipulation.control.orchestrator_client --task traj_right + python -m dimos.manipulation.control.coordinator_client --task traj_right """, ) parser.add_argument( @@ -675,11 +675,11 @@ def main() -> int: args = parser.parse_args() print("\n" + "=" * 70) - print("Orchestrator Client") + print("Coordinator Client") print("=" * 70) - print("\nConnecting to ControlOrchestrator via RPC...") + print("\nConnecting to ControlCoordinator via RPC...") - client = OrchestratorClient() + client = CoordinatorClient() try: return _run_client(client, args.task, args.vel, args.accel) finally: diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index f4524b8ff2..124ae9a8bb 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -37,14 +37,14 @@ "unitree-g1-joystick": "dimos.robot.unitree_webrtc.unitree_g1_blueprints:with_joystick", "unitree-g1-full": "dimos.robot.unitree_webrtc.unitree_g1_blueprints:full_featured", "unitree-g1-detection": "dimos.robot.unitree_webrtc.unitree_g1_blueprints:detection", - # Control orchestrator blueprints - "orchestrator-mock": "dimos.control.blueprints:orchestrator_mock", - "orchestrator-xarm7": "dimos.control.blueprints:orchestrator_xarm7", - "orchestrator-xarm6": "dimos.control.blueprints:orchestrator_xarm6", - "orchestrator-piper": "dimos.control.blueprints:orchestrator_piper", - "orchestrator-dual-mock": "dimos.control.blueprints:orchestrator_dual_mock", - "orchestrator-dual-xarm": "dimos.control.blueprints:orchestrator_dual_xarm", - "orchestrator-piper-xarm": "dimos.control.blueprints:orchestrator_piper_xarm", + # Control coordinator blueprints + "coordinator-mock": "dimos.control.blueprints:coordinator_mock", + "coordinator-xarm7": "dimos.control.blueprints:coordinator_xarm7", + "coordinator-xarm6": "dimos.control.blueprints:coordinator_xarm6", + "coordinator-piper": "dimos.control.blueprints:coordinator_piper", + "coordinator-dual-mock": "dimos.control.blueprints:coordinator_dual_mock", + "coordinator-dual-xarm": "dimos.control.blueprints:coordinator_dual_xarm", + "coordinator-piper-xarm": "dimos.control.blueprints:coordinator_piper_xarm", # Demo blueprints "demo-camera": "dimos.hardware.sensors.camera.module:demo_camera", "demo-osm": "dimos.mapping.osm.demo_osm:demo_osm", @@ -82,8 +82,8 @@ "wavefront_frontier_explorer": "dimos.navigation.frontier_exploration.wavefront_frontier_goal_selector", "websocket_vis": "dimos.web.websocket_vis.websocket_vis_module", "web_input": "dimos.agents.cli.web", - # Control orchestrator module - "control_orchestrator": "dimos.control.orchestrator", + # Control coordinator module + "control_coordinator": "dimos.control.coordinator", } From 46438c9412fd1799d85c0bd23fe0e7be3fddaeb8 Mon Sep 17 00:00:00 2001 From: Mustafa Bhadsorawala <39084056+mustafab0@users.noreply.github.com> Date: Thu, 15 Jan 2026 20:39:22 -0800 Subject: [PATCH 6/8] Fix typo in component schema docstring --- dimos/control/components.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dimos/control/components.py b/dimos/control/components.py index ddc81b83c1..0d48561f8a 100644 --- a/dimos/control/components.py +++ b/dimos/control/components.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Hardware component schema for the ControlOrchestrator.""" +"""Hardware component schema for the ControlCoordnator.""" from dataclasses import dataclass, field from enum import Enum From ec2bc1e6742653f29331f607938b77e49fd86671 Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Fri, 16 Jan 2026 15:03:13 -0800 Subject: [PATCH 7/8] added explicit type alias for JointName in ControlTask --- dimos/control/__init__.py | 4 ++++ dimos/control/task.py | 20 +++++++++++--------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/dimos/control/__init__.py b/dimos/control/__init__.py index b35f86c9e6..755cdc6f70 100644 --- a/dimos/control/__init__.py +++ b/dimos/control/__init__.py @@ -51,7 +51,9 @@ from dimos.control.components import ( HardwareComponent, + HardwareId, HardwareType, + JointName, JointState, make_joints, ) @@ -86,9 +88,11 @@ "ControlTask", "CoordinatorState", "HardwareComponent", + "HardwareId", "HardwareInterface", "HardwareType", "JointCommandOutput", + "JointName", "JointState", "JointStateSnapshot", "ResourceClaim", diff --git a/dimos/control/task.py b/dimos/control/task.py index 36e764453a..e9d9c4a584 100644 --- a/dimos/control/task.py +++ b/dimos/control/task.py @@ -30,6 +30,7 @@ from dataclasses import dataclass, field from typing import Protocol, runtime_checkable +from dimos.control.components import JointName from dimos.hardware.manipulators.spec import ControlMode # ============================================================================= @@ -52,7 +53,7 @@ class ResourceClaim: mode: Control mode (POSITION, VELOCITY, TORQUE) """ - joints: frozenset[str] + joints: frozenset[JointName] priority: int = 0 mode: ControlMode = ControlMode.POSITION @@ -75,20 +76,20 @@ class JointStateSnapshot: timestamp: Unix timestamp when state was read """ - joint_positions: dict[str, float] = field(default_factory=dict) - joint_velocities: dict[str, float] = field(default_factory=dict) - joint_efforts: dict[str, float] = field(default_factory=dict) + joint_positions: dict[JointName, float] = field(default_factory=dict) + joint_velocities: dict[JointName, float] = field(default_factory=dict) + joint_efforts: dict[JointName, float] = field(default_factory=dict) timestamp: float = 0.0 - def get_position(self, joint_name: str) -> float | None: + def get_position(self, joint_name: JointName) -> float | None: """Get position for a specific joint.""" return self.joint_positions.get(joint_name) - def get_velocity(self, joint_name: str) -> float | None: + def get_velocity(self, joint_name: JointName) -> float | None: """Get velocity for a specific joint.""" return self.joint_velocities.get(joint_name) - def get_effort(self, joint_name: str) -> float | None: + def get_effort(self, joint_name: JointName) -> float | None: """Get effort for a specific joint.""" return self.joint_efforts.get(joint_name) @@ -133,7 +134,7 @@ class JointCommandOutput: mode: Control mode - must match which field is populated """ - joint_names: list[str] + joint_names: list[JointName] positions: list[float] | None = None velocities: list[float] | None = None efforts: list[float] | None = None @@ -271,7 +272,7 @@ def compute(self, state: CoordinatorState) -> JointCommandOutput | None: """ ... - def on_preempted(self, by_task: str, joints: frozenset[str]) -> None: + def on_preempted(self, by_task: str, joints: frozenset[JointName]) -> None: """Called ONCE per tick with ALL preempted joints aggregated. Called when a higher-priority task takes control of some of this @@ -294,6 +295,7 @@ def on_preempted(self, by_task: str, joints: frozenset[str]) -> None: "ControlTask", "CoordinatorState", "JointCommandOutput", + "JointName", "JointStateSnapshot", "ResourceClaim", ] From a711af0c11eff8159137795945d443b21c56d661 Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Fri, 23 Jan 2026 21:45:01 -0800 Subject: [PATCH 8/8] added Type Checking import --- dimos/control/hardware_interface.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dimos/control/hardware_interface.py b/dimos/control/hardware_interface.py index a74d81f16c..ed320abec5 100644 --- a/dimos/control/hardware_interface.py +++ b/dimos/control/hardware_interface.py @@ -24,7 +24,7 @@ import logging import time -from typing import Protocol, runtime_checkable +from typing import TYPE_CHECKING, Protocol, runtime_checkable from dimos.hardware.manipulators.spec import ControlMode, ManipulatorBackend