diff --git a/dimos/control/__init__.py b/dimos/control/__init__.py index 3d7d647cd4..755cdc6f70 100644 --- a/dimos/control/__init__.py +++ b/dimos/control/__init__.py @@ -12,9 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""ControlOrchestrator - Centralized control for multi-arm coordination. +"""ControlCoordinator - Centralized control for multi-arm coordination. -This module provides a centralized control orchestrator that replaces +This module provides a centralized control coordinator that replaces per-driver/per-controller loops with a single deterministic tick-based system. Features: @@ -25,47 +25,54 @@ - Aggregated preemption notifications Example: - >>> from dimos.control import ControlOrchestrator + >>> from dimos.control import ControlCoordinator >>> from dimos.control.tasks import JointTrajectoryTask, JointTrajectoryTaskConfig >>> from dimos.hardware.manipulators.xarm import XArmBackend >>> - >>> # Create orchestrator - >>> orch = ControlOrchestrator(tick_rate=100.0) + >>> # Create coordinator + >>> coord = ControlCoordinator(tick_rate=100.0) >>> >>> # Add hardware >>> backend = XArmBackend(ip="192.168.1.185", dof=7) >>> backend.connect() - >>> orch.add_hardware("left_arm", backend, joint_prefix="left") + >>> coord.add_hardware("left_arm", backend) >>> >>> # Add task - >>> joints = [f"left_joint{i+1}" for i in range(7)] + >>> joints = [f"left_arm_joint{i+1}" for i in range(7)] >>> task = JointTrajectoryTask( ... "traj_left", ... JointTrajectoryTaskConfig(joint_names=joints, priority=10), ... ) - >>> orch.add_task(task) + >>> coord.add_task(task) >>> >>> # Start - >>> orch.start() + >>> coord.start() """ +from dimos.control.components import ( + HardwareComponent, + HardwareId, + HardwareType, + JointName, + JointState, + make_joints, +) +from dimos.control.coordinator import ( + ControlCoordinator, + ControlCoordinatorConfig, + TaskConfig, + control_coordinator, +) from dimos.control.hardware_interface import ( BackendHardwareInterface, HardwareInterface, ) -from dimos.control.orchestrator import ( - ControlOrchestrator, - ControlOrchestratorConfig, - HardwareConfig, - TaskConfig, - control_orchestrator, -) from dimos.control.task import ( ControlMode, ControlTask, + CoordinatorState, JointCommandOutput, JointStateSnapshot, - OrchestratorState, ResourceClaim, ) from dimos.control.tick_loop import TickLoop @@ -73,20 +80,25 @@ __all__ = [ # Hardware interface "BackendHardwareInterface", + # Coordinator + "ControlCoordinator", + "ControlCoordinatorConfig", "ControlMode", - # Orchestrator - "ControlOrchestrator", - "ControlOrchestratorConfig", # Task protocol and types "ControlTask", - "HardwareConfig", + "CoordinatorState", + "HardwareComponent", + "HardwareId", "HardwareInterface", + "HardwareType", "JointCommandOutput", + "JointName", + "JointState", "JointStateSnapshot", - "OrchestratorState", "ResourceClaim", "TaskConfig", # Tick loop "TickLoop", - "control_orchestrator", + "control_coordinator", + "make_joints", ] diff --git a/dimos/control/blueprints.py b/dimos/control/blueprints.py index d38ac1f81f..f2365fbef6 100644 --- a/dimos/control/blueprints.py +++ b/dimos/control/blueprints.py @@ -12,47 +12,44 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Pre-configured blueprints for the ControlOrchestrator. +"""Pre-configured blueprints for the ControlCoordinator. -This module provides ready-to-use orchestrator blueprints for common setups. +This module provides ready-to-use coordinator blueprints for common setups. Usage: # Run via CLI: - dimos run orchestrator-mock # Mock 7-DOF arm - dimos run orchestrator-xarm7 # XArm7 real hardware - dimos run orchestrator-dual-mock # Dual mock arms + dimos run coordinator-mock # Mock 7-DOF arm + dimos run coordinator-xarm7 # XArm7 real hardware + dimos run coordinator-dual-mock # Dual mock arms # Or programmatically: - from dimos.control.blueprints import orchestrator_mock - coordinator = orchestrator_mock.build() + from dimos.control.blueprints import coordinator_mock + coordinator = coordinator_mock.build() coordinator.loop() Example with trajectory setter: - # Terminal 1: Run the orchestrator - dimos run orchestrator-mock + # Terminal 1: Run the coordinator + dimos run coordinator-mock # Terminal 2: Send trajectories via RPC - python -m dimos.control.examples.orchestrator_trajectory_setter --task traj_arm + python -m dimos.control.examples.coordinator_trajectory_setter --task traj_arm """ from __future__ import annotations -from dimos.control.orchestrator import ( - HardwareConfig, +from dimos.control.components import HardwareComponent, HardwareType, make_joints +from dimos.control.coordinator import ( + ControlCoordinator, TaskConfig, - control_orchestrator, + control_coordinator, ) from dimos.core.transport import LCMTransport from dimos.msgs.sensor_msgs import JointState -# ============================================================================= -# Helper function to generate joint names -# ============================================================================= - -def _joint_names(prefix: str, dof: int) -> list[str]: - """Generate joint names with prefix.""" - return [f"{prefix}_joint{i + 1}" for i in range(dof)] +def _joint_names(hardware_id: str, dof: int) -> list[str]: + """Generate joint names for a hardware component.""" + return [f"{hardware_id}_joint{i + 1}" for i in range(dof)] # ============================================================================= @@ -60,16 +57,16 @@ def _joint_names(prefix: str, dof: int) -> list[str]: # ============================================================================= # Mock 7-DOF arm (for testing) -orchestrator_mock = control_orchestrator( +coordinator_mock = control_coordinator( tick_rate=100.0, publish_joint_state=True, - joint_state_frame_id="orchestrator", + joint_state_frame_id="coordinator", hardware=[ - HardwareConfig( - id="arm", - type="mock", - dof=7, - joint_prefix="arm", + HardwareComponent( + hardware_id="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("arm", 7), + backend_type="mock", ), ], tasks=[ @@ -82,22 +79,22 @@ def _joint_names(prefix: str, dof: int) -> list[str]: ], ).transports( { - ("joint_state", JointState): LCMTransport("/orchestrator/joint_state", JointState), + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), } ) # XArm7 real hardware (requires IP configuration) -orchestrator_xarm7 = control_orchestrator( +coordinator_xarm7 = control_coordinator( tick_rate=100.0, publish_joint_state=True, - joint_state_frame_id="orchestrator", + joint_state_frame_id="coordinator", hardware=[ - HardwareConfig( - id="arm", - type="xarm", - dof=7, - joint_prefix="arm", - ip="192.168.2.235", # Default IP, override via env or config + HardwareComponent( + hardware_id="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("arm", 7), + backend_type="xarm", + address="192.168.2.235", # Default IP, override via env or config auto_enable=True, ), ], @@ -111,22 +108,22 @@ def _joint_names(prefix: str, dof: int) -> list[str]: ], ).transports( { - ("joint_state", JointState): LCMTransport("/orchestrator/joint_state", JointState), + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), } ) # XArm6 real hardware -orchestrator_xarm6 = control_orchestrator( +coordinator_xarm6 = control_coordinator( tick_rate=100.0, publish_joint_state=True, - joint_state_frame_id="orchestrator", + joint_state_frame_id="coordinator", hardware=[ - HardwareConfig( - id="arm", - type="xarm", - dof=6, - joint_prefix="arm", - ip="192.168.1.210", + HardwareComponent( + hardware_id="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("arm", 6), + backend_type="xarm", + address="192.168.1.210", auto_enable=True, ), ], @@ -140,22 +137,22 @@ def _joint_names(prefix: str, dof: int) -> list[str]: ], ).transports( { - ("joint_state", JointState): LCMTransport("/orchestrator/joint_state", JointState), + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), } ) # Piper arm (6-DOF, CAN bus) -orchestrator_piper = control_orchestrator( +coordinator_piper = control_coordinator( tick_rate=100.0, publish_joint_state=True, - joint_state_frame_id="orchestrator", + joint_state_frame_id="coordinator", hardware=[ - HardwareConfig( - id="arm", - type="piper", - dof=6, - joint_prefix="arm", - can_port="can0", + HardwareComponent( + hardware_id="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("arm", 6), + backend_type="piper", + address="can0", auto_enable=True, ), ], @@ -169,7 +166,7 @@ def _joint_names(prefix: str, dof: int) -> list[str]: ], ).transports( { - ("joint_state", JointState): LCMTransport("/orchestrator/joint_state", JointState), + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), } ) @@ -178,64 +175,64 @@ def _joint_names(prefix: str, dof: int) -> list[str]: # ============================================================================= # Dual mock arms (7-DOF left, 6-DOF right) for testing -orchestrator_dual_mock = control_orchestrator( +coordinator_dual_mock = control_coordinator( tick_rate=100.0, publish_joint_state=True, - joint_state_frame_id="orchestrator", + joint_state_frame_id="coordinator", hardware=[ - HardwareConfig( - id="left_arm", - type="mock", - dof=7, - joint_prefix="left", + HardwareComponent( + hardware_id="left_arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("left_arm", 7), + backend_type="mock", ), - HardwareConfig( - id="right_arm", - type="mock", - dof=6, - joint_prefix="right", + HardwareComponent( + hardware_id="right_arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("right_arm", 6), + backend_type="mock", ), ], tasks=[ TaskConfig( name="traj_left", type="trajectory", - joint_names=_joint_names("left", 7), + joint_names=_joint_names("left_arm", 7), priority=10, ), TaskConfig( name="traj_right", type="trajectory", - joint_names=_joint_names("right", 6), + joint_names=_joint_names("right_arm", 6), priority=10, ), ], ).transports( { - ("joint_state", JointState): LCMTransport("/orchestrator/joint_state", JointState), + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), } ) # Dual XArm setup (XArm7 left, XArm6 right) -orchestrator_dual_xarm = control_orchestrator( +coordinator_dual_xarm = control_coordinator( tick_rate=100.0, publish_joint_state=True, - joint_state_frame_id="orchestrator", + joint_state_frame_id="coordinator", hardware=[ - HardwareConfig( - id="left_arm", - type="xarm", - dof=7, - joint_prefix="left", - ip="192.168.2.235", + HardwareComponent( + hardware_id="left_arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("left_arm", 7), + backend_type="xarm", + address="192.168.2.235", auto_enable=True, ), - HardwareConfig( - id="right_arm", - type="xarm", - dof=6, - joint_prefix="right", - ip="192.168.1.210", + HardwareComponent( + hardware_id="right_arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("right_arm", 6), + backend_type="xarm", + address="192.168.1.210", auto_enable=True, ), ], @@ -243,42 +240,42 @@ def _joint_names(prefix: str, dof: int) -> list[str]: TaskConfig( name="traj_left", type="trajectory", - joint_names=_joint_names("left", 7), + joint_names=_joint_names("left_arm", 7), priority=10, ), TaskConfig( name="traj_right", type="trajectory", - joint_names=_joint_names("right", 6), + joint_names=_joint_names("right_arm", 6), priority=10, ), ], ).transports( { - ("joint_state", JointState): LCMTransport("/orchestrator/joint_state", JointState), + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), } ) # Dual Arm setup (XArm6 , Piper ) -orchestrator_piper_xarm = control_orchestrator( +coordinator_piper_xarm = control_coordinator( tick_rate=100.0, publish_joint_state=True, - joint_state_frame_id="orchestrator", + joint_state_frame_id="coordinator", hardware=[ - HardwareConfig( - id="xarm_arm", - type="xarm", - dof=6, - joint_prefix="xarm", - ip="192.168.1.210", + HardwareComponent( + hardware_id="xarm_arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("xarm_arm", 6), + backend_type="xarm", + address="192.168.1.210", auto_enable=True, ), - HardwareConfig( - id="piper_arm", - type="piper", - dof=6, - joint_prefix="piper", - can_port="can0", + HardwareComponent( + hardware_id="piper_arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("piper_arm", 6), + backend_type="piper", + address="can0", auto_enable=True, ), ], @@ -286,19 +283,19 @@ def _joint_names(prefix: str, dof: int) -> list[str]: TaskConfig( name="traj_xarm", type="trajectory", - joint_names=_joint_names("xarm", 6), + joint_names=_joint_names("xarm_arm", 6), priority=10, ), TaskConfig( name="traj_piper", type="trajectory", - joint_names=_joint_names("piper", 6), + joint_names=_joint_names("piper_arm", 6), priority=10, ), ], ).transports( { - ("joint_state", JointState): LCMTransport("/orchestrator/joint_state", JointState), + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), } ) @@ -307,16 +304,16 @@ def _joint_names(prefix: str, dof: int) -> list[str]: # ============================================================================= # High-frequency mock for demanding applications -orchestrator_highfreq_mock = control_orchestrator( +coordinator_highfreq_mock = control_coordinator( tick_rate=200.0, publish_joint_state=True, - joint_state_frame_id="orchestrator", + joint_state_frame_id="coordinator", hardware=[ - HardwareConfig( - id="arm", - type="mock", - dof=7, - joint_prefix="arm", + HardwareComponent( + hardware_id="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("arm", 7), + backend_type="mock", ), ], tasks=[ @@ -329,7 +326,7 @@ def _joint_names(prefix: str, dof: int) -> list[str]: ], ).transports( { - ("joint_state", JointState): LCMTransport("/orchestrator/joint_state", JointState), + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), } ) @@ -337,30 +334,30 @@ def _joint_names(prefix: str, dof: int) -> list[str]: # Raw Blueprints (no hardware/tasks configured - for programmatic setup) # ============================================================================= -# Basic orchestrator with transport only (add hardware/tasks programmatically) -orchestrator_basic = control_orchestrator( +# Basic coordinator with transport only (add hardware/tasks programmatically) +coordinator_basic = control_coordinator( tick_rate=100.0, publish_joint_state=True, - joint_state_frame_id="orchestrator", + joint_state_frame_id="coordinator", ).transports( { - ("joint_state", JointState): LCMTransport("/orchestrator/joint_state", JointState), + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), } ) __all__ = [ # Raw blueprints (for programmatic setup) - "orchestrator_basic", + "coordinator_basic", # Dual arm blueprints - "orchestrator_dual_mock", - "orchestrator_dual_xarm", + "coordinator_dual_mock", + "coordinator_dual_xarm", # High-frequency blueprints - "orchestrator_highfreq_mock", + "coordinator_highfreq_mock", # Single arm blueprints - "orchestrator_mock", - "orchestrator_piper", - "orchestrator_piper_xarm", - "orchestrator_xarm6", - "orchestrator_xarm7", + "coordinator_mock", + "coordinator_piper", + "coordinator_piper_xarm", + "coordinator_xarm6", + "coordinator_xarm7", ] diff --git a/dimos/control/components.py b/dimos/control/components.py new file mode 100644 index 0000000000..0d48561f8a --- /dev/null +++ b/dimos/control/components.py @@ -0,0 +1,80 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Hardware component schema for the ControlCoordnator.""" + +from dataclasses import dataclass, field +from enum import Enum + +HardwareId = str +JointName = str + + +class HardwareType(Enum): + MANIPULATOR = "manipulator" + BASE = "base" + GRIPPER = "gripper" + + +@dataclass(frozen=True) +class JointState: + """State of a single joint.""" + + position: float + velocity: float + effort: float + + +@dataclass +class HardwareComponent: + """Configuration for a hardware component. + + Attributes: + hardware_id: Unique identifier, also used as joint name prefix + hardware_type: Type of hardware (MANIPULATOR, BASE, GRIPPER) + joints: List of joint names (e.g., ["arm_joint1", "arm_joint2", ...]) + backend_type: Backend type ("mock", "xarm", "piper") + address: Connection address - IP for TCP, port for CAN + auto_enable: Whether to auto-enable servos + """ + + hardware_id: HardwareId + hardware_type: HardwareType + joints: list[JointName] = field(default_factory=list) + backend_type: str = "mock" + address: str | None = None + auto_enable: bool = True + + +def make_joints(hardware_id: HardwareId, dof: int) -> list[JointName]: + """Create joint names for hardware. + + Args: + hardware_id: The hardware identifier (e.g., "left_arm") + dof: Degrees of freedom + + Returns: + List of joint names like ["left_arm_joint1", "left_arm_joint2", ...] + """ + return [f"{hardware_id}_joint{i + 1}" for i in range(dof)] + + +__all__ = [ + "HardwareComponent", + "HardwareId", + "HardwareType", + "JointName", + "JointState", + "make_joints", +] diff --git a/dimos/control/orchestrator.py b/dimos/control/coordinator.py similarity index 79% rename from dimos/control/orchestrator.py rename to dimos/control/coordinator.py index 2d64620b13..6e8788381d 100644 --- a/dimos/control/orchestrator.py +++ b/dimos/control/coordinator.py @@ -12,9 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""ControlOrchestrator module. +"""ControlCoordinator module. -Centralized control orchestrator that replaces per-driver/per-controller +Centralized control coordinator that replaces per-driver/per-controller loops with a single deterministic tick-based system. Features: @@ -32,6 +32,7 @@ import time from typing import TYPE_CHECKING, Any +from dimos.control.components import HardwareComponent from dimos.control.hardware_interface import BackendHardwareInterface, HardwareInterface from dimos.control.task import ControlTask from dimos.control.tick_loop import TickLoop @@ -54,29 +55,6 @@ # ============================================================================= -@dataclass -class HardwareConfig: - """Configuration for a hardware backend. - - Attributes: - id: Unique hardware identifier (e.g., "arm", "left_arm") - type: Backend type ("mock", "xarm", "piper") - dof: Degrees of freedom - joint_prefix: Prefix for joint names (defaults to id) - ip: IP address (required for xarm) - can_port: CAN port (for piper, default "can0") - auto_enable: Whether to auto-enable servos (default True) - """ - - id: str - type: str = "mock" - dof: int = 7 - joint_prefix: str | None = None - ip: str | None = None - can_port: str | None = None - auto_enable: bool = True - - @dataclass class TaskConfig: """Configuration for a control task. @@ -110,8 +88,8 @@ class TaskStatus: @dataclass -class ControlOrchestratorConfig(ModuleConfig): - """Configuration for the ControlOrchestrator. +class ControlCoordinatorConfig(ModuleConfig): + """Configuration for the ControlCoordinator. Attributes: tick_rate: Control loop frequency in Hz (default: 100) @@ -124,19 +102,19 @@ class ControlOrchestratorConfig(ModuleConfig): tick_rate: float = 100.0 publish_joint_state: bool = True - joint_state_frame_id: str = "orchestrator" + joint_state_frame_id: str = "coordinator" log_ticks: bool = False - hardware: list[HardwareConfig] = field(default_factory=lambda: []) + hardware: list[HardwareComponent] = field(default_factory=lambda: []) tasks: list[TaskConfig] = field(default_factory=lambda: []) # ============================================================================= -# ControlOrchestrator Module +# ControlCoordinator Module # ============================================================================= -class ControlOrchestrator(Module[ControlOrchestratorConfig]): - """Centralized control orchestrator with per-joint arbitration. +class ControlCoordinator(Module[ControlCoordinatorConfig]): + """Centralized control coordinator with per-joint arbitration. Single tick loop that: 1. Reads state from all hardware @@ -153,10 +131,10 @@ class ControlOrchestrator(Module[ControlOrchestratorConfig]): - Aggregated preemption (one notification per task per tick) Example: - >>> from dimos.control import ControlOrchestrator + >>> from dimos.control import ControlCoordinator >>> from dimos.hardware.manipulators.xarm import XArmBackend >>> - >>> orch = ControlOrchestrator(tick_rate=100.0) + >>> orch = ControlCoordinator(tick_rate=100.0) >>> backend = XArmBackend(ip="192.168.1.185", dof=7) >>> backend.connect() >>> orch.add_hardware("left_arm", backend, joint_prefix="left") @@ -166,8 +144,8 @@ class ControlOrchestrator(Module[ControlOrchestratorConfig]): # Output: Aggregated joint state for external consumers joint_state: Out[JointState] - config: ControlOrchestratorConfig - default_config = ControlOrchestratorConfig + config: ControlCoordinatorConfig + default_config = ControlCoordinatorConfig def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) @@ -186,7 +164,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: # Tick loop (created on start) self._tick_loop: TickLoop | None = None - logger.info(f"ControlOrchestrator initialized at {self.config.tick_rate}Hz") + logger.info(f"ControlCoordinator initialized at {self.config.tick_rate}Hz") # ========================================================================= # Config-based Setup @@ -197,9 +175,9 @@ def _setup_from_config(self) -> None: hardware_added: list[str] = [] try: - for hw_cfg in self.config.hardware: - self._setup_hardware(hw_cfg) - hardware_added.append(hw_cfg.id) + for component in self.config.hardware: + self._setup_hardware(component) + hardware_added.append(component.hardware_id) for task_cfg in self.config.tasks: task = self._create_task_from_config(task_cfg) @@ -214,44 +192,42 @@ def _setup_from_config(self) -> None: pass raise - def _setup_hardware(self, hw_cfg: HardwareConfig) -> None: + def _setup_hardware(self, component: HardwareComponent) -> None: """Connect and add a single hardware backend.""" - backend = self._create_backend_from_config(hw_cfg) + backend = self._create_backend(component) if not backend.connect(): - raise RuntimeError(f"Failed to connect to {hw_cfg.type} backend") + raise RuntimeError(f"Failed to connect to {component.backend_type} backend") try: - if hw_cfg.auto_enable and hasattr(backend, "write_enable"): + if component.auto_enable and hasattr(backend, "write_enable"): backend.write_enable(True) - self.add_hardware( - hw_cfg.id, - backend, - joint_prefix=hw_cfg.joint_prefix or hw_cfg.id, - ) + + self.add_hardware(backend, component) except Exception: backend.disconnect() raise - def _create_backend_from_config(self, cfg: HardwareConfig) -> ManipulatorBackend: - """Create a manipulator backend from config.""" - match cfg.type.lower(): + def _create_backend(self, component: HardwareComponent) -> ManipulatorBackend: + """Create a manipulator backend from component config.""" + dof = len(component.joints) + match component.backend_type.lower(): case "mock": from dimos.hardware.manipulators.mock import MockBackend - return MockBackend(dof=cfg.dof) + return MockBackend(dof=dof) case "xarm": - if cfg.ip is None: - raise ValueError("ip is required for xarm backend") + if component.address is None: + raise ValueError("address (IP) is required for xarm backend") from dimos.hardware.manipulators.xarm import XArmBackend - return XArmBackend(ip=cfg.ip, dof=cfg.dof) + return XArmBackend(ip=component.address, dof=dof) case "piper": from dimos.hardware.manipulators.piper import PiperBackend - return PiperBackend(can_port=cfg.can_port or "can0", dof=cfg.dof) + return PiperBackend(can_port=component.address or "can0", dof=dof) case _: - raise ValueError(f"Unknown backend type: {cfg.type}") + raise ValueError(f"Unknown backend type: {component.backend_type}") def _create_task_from_config(self, cfg: TaskConfig) -> ControlTask: """Create a control task from config.""" @@ -278,27 +254,27 @@ def _create_task_from_config(self, cfg: TaskConfig) -> ControlTask: @rpc def add_hardware( self, - hardware_id: str, backend: ManipulatorBackend, - joint_prefix: str | None = None, + component: HardwareComponent, ) -> bool: - """Register a hardware backend with the orchestrator.""" + """Register a hardware backend with the coordinator.""" with self._hardware_lock: - if hardware_id in self._hardware: - logger.warning(f"Hardware {hardware_id} already registered") + if component.hardware_id in self._hardware: + logger.warning(f"Hardware {component.hardware_id} already registered") return False interface = BackendHardwareInterface( backend=backend, - hardware_id=hardware_id, - joint_prefix=joint_prefix, + component=component, ) - self._hardware[hardware_id] = interface + self._hardware[component.hardware_id] = interface for joint_name in interface.joint_names: - self._joint_to_hardware[joint_name] = hardware_id + self._joint_to_hardware[joint_name] = component.hardware_id - logger.info(f"Added hardware {hardware_id} with joints: {interface.joint_names}") + logger.info( + f"Added hardware {component.hardware_id} with joints: {interface.joint_names}" + ) return True @rpc @@ -306,7 +282,7 @@ def remove_hardware(self, hardware_id: str) -> bool: """Remove a hardware interface. Note: For safety, call this only when no tasks are actively using this - hardware. Consider stopping the orchestrator before removing hardware. + hardware. Consider stopping the coordinator before removing hardware. """ with self._hardware_lock: if hardware_id not in self._hardware: @@ -353,9 +329,9 @@ def get_joint_positions(self) -> dict[str, float]: with self._hardware_lock: positions: dict[str, float] = {} for hw in self._hardware.values(): - state = hw.read_state() # {joint_name: (pos, vel, effort)} - for joint_name, (pos, _vel, _effort) in state.items(): - positions[joint_name] = pos + state = hw.read_state() # {joint_name: JointState} + for joint_name, joint_state in state.items(): + positions[joint_name] = joint_state.position return positions # ========================================================================= @@ -364,7 +340,7 @@ def get_joint_positions(self) -> dict[str, float]: @rpc def add_task(self, task: ControlTask) -> bool: - """Register a task with the orchestrator.""" + """Register a task with the coordinator.""" if not isinstance(task, ControlTask): raise TypeError("task must implement ControlTask") @@ -471,9 +447,9 @@ def cancel_trajectory(self, task_name: str) -> bool: @rpc def start(self) -> None: - """Start the orchestrator control loop.""" + """Start the coordinator control loop.""" if self._tick_loop and self._tick_loop.is_running: - logger.warning("Orchestrator already running") + logger.warning("Coordinator already running") return super().start() @@ -497,12 +473,12 @@ def start(self) -> None: ) self._tick_loop.start() - logger.info(f"ControlOrchestrator started at {self.config.tick_rate}Hz") + logger.info(f"ControlCoordinator started at {self.config.tick_rate}Hz") @rpc def stop(self) -> None: - """Stop the orchestrator.""" - logger.info("Stopping ControlOrchestrator...") + """Stop the coordinator.""" + logger.info("Stopping ControlCoordinator...") if self._tick_loop: self._tick_loop.stop() @@ -517,7 +493,7 @@ def stop(self) -> None: logger.error(f"Error disconnecting hardware {hw_id}: {e}") super().stop() - logger.info("ControlOrchestrator stopped") + logger.info("ControlCoordinator stopped") @rpc def get_tick_count(self) -> int: @@ -526,13 +502,13 @@ def get_tick_count(self) -> int: # Blueprint export -control_orchestrator = ControlOrchestrator.blueprint +control_coordinator = ControlCoordinator.blueprint __all__ = [ - "ControlOrchestrator", - "ControlOrchestratorConfig", - "HardwareConfig", + "ControlCoordinator", + "ControlCoordinatorConfig", + "HardwareComponent", "TaskConfig", - "control_orchestrator", + "control_coordinator", ] diff --git a/dimos/control/hardware_interface.py b/dimos/control/hardware_interface.py index ef62f974c6..ed320abec5 100644 --- a/dimos/control/hardware_interface.py +++ b/dimos/control/hardware_interface.py @@ -12,9 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Hardware interface for the ControlOrchestrator. +"""Hardware interface for the ControlCoordinator. -Wraps ManipulatorBackend with orchestrator-specific features: +Wraps ManipulatorBackend with coordinator-specific features: - Namespaced joint names (e.g., "left_joint1") - Unified read/write interface - Hold-last-value for partial commands @@ -24,42 +24,45 @@ import logging import time -from typing import Protocol, runtime_checkable +from typing import TYPE_CHECKING, Protocol, runtime_checkable from dimos.hardware.manipulators.spec import ControlMode, ManipulatorBackend +if TYPE_CHECKING: + from dimos.control.components import HardwareComponent, HardwareId, JointName, JointState + logger = logging.getLogger(__name__) @runtime_checkable class HardwareInterface(Protocol): - """Protocol for hardware that the orchestrator can control. + """Protocol for hardware that the coordinator can control. - This wraps ManipulatorBackend with orchestrator-specific features: + This wraps ManipulatorBackend with coordinator-specific features: - Namespaced joint names (e.g., "left_arm_joint1") - Unified read/write interface - State caching """ @property - def hardware_id(self) -> str: + def hardware_id(self) -> HardwareId: """Unique ID for this hardware (e.g., 'left_arm').""" ... @property - def joint_names(self) -> list[str]: + def joint_names(self) -> list[JointName]: """Ordered list of fully-qualified joint names this hardware controls.""" ... - def read_state(self) -> dict[str, tuple[float, float, float]]: + def read_state(self) -> dict[JointName, JointState]: """Read current state. Returns: - Dict of joint_name -> (position, velocity, effort) + Dict of joint_name -> JointState(position, velocity, effort) """ ... - def write_command(self, commands: dict[str, float], mode: ControlMode) -> bool: + def write_command(self, commands: dict[JointName, float], mode: ControlMode) -> bool: """Write commands to hardware. IMPORTANT: Accepts partial joint sets. Missing joints hold last value. @@ -82,7 +85,7 @@ class BackendHardwareInterface: """Concrete implementation wrapping a ManipulatorBackend. Features: - - Generates namespaced joint names (prefix_joint1, prefix_joint2, ...) + - Uses joint names from HardwareComponent - Holds last commanded value for partial commands - On first tick, reads current position from hardware for missing joints """ @@ -90,26 +93,20 @@ class BackendHardwareInterface: def __init__( self, backend: ManipulatorBackend, - hardware_id: str, - joint_prefix: str | None = None, + component: HardwareComponent, ) -> None: """Initialize hardware interface. Args: backend: ManipulatorBackend instance (XArmBackend, PiperBackend, etc.) - hardware_id: Unique identifier for this hardware - joint_prefix: Prefix for joint names (defaults to hardware_id) + component: Hardware component with joints config """ if not isinstance(backend, ManipulatorBackend): raise TypeError("backend must implement ManipulatorBackend") self._backend = backend - self._hardware_id = hardware_id - self._prefix = joint_prefix or hardware_id - self._dof = backend.get_dof() - - # Generate joint names: prefix_joint1, prefix_joint2, ... - self._joint_names = [f"{self._prefix}_joint{i + 1}" for i in range(self._dof)] + self._component = component + self._joint_names = component.joints # Track last commanded values for hold-last behavior self._last_commanded: dict[str, float] = {} @@ -118,36 +115,47 @@ def __init__( self._current_mode: ControlMode | None = None @property - def hardware_id(self) -> str: + def hardware_id(self) -> HardwareId: """Unique ID for this hardware.""" - return self._hardware_id + return self._component.hardware_id @property - def joint_names(self) -> list[str]: + def joint_names(self) -> list[JointName]: """Ordered list of joint names.""" return self._joint_names + @property + def component(self) -> HardwareComponent: + """The hardware component config.""" + return self._component + @property def dof(self) -> int: """Degrees of freedom.""" - return self._dof + return len(self._joint_names) def disconnect(self) -> None: """Disconnect the underlying backend.""" self._backend.disconnect() - def read_state(self) -> dict[str, tuple[float, float, float]]: - """Read state as {joint_name: (position, velocity, effort)}. + def read_state(self) -> dict[JointName, JointState]: + """Read state as {joint_name: JointState}. Returns: - Dict mapping joint name to (position, velocity, effort) tuple + Dict mapping joint name to JointState with position, velocity, effort """ + from dimos.control.components import JointState + positions = self._backend.read_joint_positions() velocities = self._backend.read_joint_velocities() efforts = self._backend.read_joint_efforts() return { - name: (positions[i], velocities[i], efforts[i]) + name: JointState( + position=positions[i], + velocity=velocities[i], + effort=efforts[i], + ) for i, name in enumerate(self._joint_names) } @@ -176,7 +184,7 @@ def write_command(self, commands: dict[str, float], mode: ControlMode) -> bool: self._last_commanded[joint_name] = value elif joint_name not in self._warned_unknown_joints: logger.warning( - f"Hardware {self._hardware_id} received command for unknown joint " + f"Hardware {self.hardware_id} received command for unknown joint " f"{joint_name}. Valid joints: {self._joint_names}" ) self._warned_unknown_joints.add(joint_name) @@ -187,7 +195,7 @@ def write_command(self, commands: dict[str, float], mode: ControlMode) -> bool: # Switch control mode if needed if mode != self._current_mode: if not self._backend.set_control_mode(mode): - logger.warning(f"Hardware {self._hardware_id} failed to switch to {mode.name}") + logger.warning(f"Hardware {self.hardware_id} failed to switch to {mode.name}") return False self._current_mode = mode @@ -198,7 +206,7 @@ def write_command(self, commands: dict[str, float], mode: ControlMode) -> bool: case ControlMode.VELOCITY: return self._backend.write_joint_velocities(ordered) case ControlMode.TORQUE: - logger.warning(f"Hardware {self._hardware_id} does not support torque mode") + logger.warning(f"Hardware {self.hardware_id} does not support torque mode") return False case _: return False @@ -216,7 +224,7 @@ def _initialize_last_commanded(self) -> None: time.sleep(0.01) raise RuntimeError( - f"Hardware {self._hardware_id} failed to read initial positions after retries" + f"Hardware {self.hardware_id} failed to read initial positions after retries" ) def _build_ordered_command(self) -> list[float]: diff --git a/dimos/control/task.py b/dimos/control/task.py index 49589188d9..e9d9c4a584 100644 --- a/dimos/control/task.py +++ b/dimos/control/task.py @@ -12,17 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""ControlTask protocol and types for the ControlOrchestrator. +"""ControlTask protocol and types for the ControlCoordinator. This module defines: -- Data types used by tasks and the orchestrator (ResourceClaim, JointStateSnapshot, etc.) +- Data types used by tasks and the coordinator (ResourceClaim, JointStateSnapshot, etc.) - ControlTask protocol that all tasks must implement -Tasks are "passive" - they don't own threads. The orchestrator calls +Tasks are "passive" - they don't own threads. The coordinator calls compute() at each tick, passing current state and time. CRITICAL: Tasks must NEVER call time.time() directly. -Use the t_now passed in OrchestratorState. +Use the t_now passed in CoordinatorState. """ from __future__ import annotations @@ -30,6 +30,7 @@ from dataclasses import dataclass, field from typing import Protocol, runtime_checkable +from dimos.control.components import JointName from dimos.hardware.manipulators.spec import ControlMode # ============================================================================= @@ -41,7 +42,7 @@ class ResourceClaim: """Declares which joints a task wants to control. - Used by the orchestrator to determine resource ownership and + Used by the coordinator to determine resource ownership and resolve conflicts between competing tasks. Attributes: @@ -52,7 +53,7 @@ class ResourceClaim: mode: Control mode (POSITION, VELOCITY, TORQUE) """ - joints: frozenset[str] + joints: frozenset[JointName] priority: int = 0 mode: ControlMode = ControlMode.POSITION @@ -75,26 +76,26 @@ class JointStateSnapshot: timestamp: Unix timestamp when state was read """ - joint_positions: dict[str, float] = field(default_factory=dict) - joint_velocities: dict[str, float] = field(default_factory=dict) - joint_efforts: dict[str, float] = field(default_factory=dict) + joint_positions: dict[JointName, float] = field(default_factory=dict) + joint_velocities: dict[JointName, float] = field(default_factory=dict) + joint_efforts: dict[JointName, float] = field(default_factory=dict) timestamp: float = 0.0 - def get_position(self, joint_name: str) -> float | None: + def get_position(self, joint_name: JointName) -> float | None: """Get position for a specific joint.""" return self.joint_positions.get(joint_name) - def get_velocity(self, joint_name: str) -> float | None: + def get_velocity(self, joint_name: JointName) -> float | None: """Get velocity for a specific joint.""" return self.joint_velocities.get(joint_name) - def get_effort(self, joint_name: str) -> float | None: + def get_effort(self, joint_name: JointName) -> float | None: """Get effort for a specific joint.""" return self.joint_efforts.get(joint_name) @dataclass -class OrchestratorState: +class CoordinatorState: """Complete state snapshot for tasks to read. Passed to each task's compute() method every tick. Contains @@ -109,7 +110,7 @@ class OrchestratorState: """ joints: JointStateSnapshot - t_now: float # Orchestrator time (perf_counter) - USE THIS, NOT time.time()! + t_now: float # Coordinator time (perf_counter) - USE THIS, NOT time.time()! dt: float # Time since last tick @@ -118,7 +119,7 @@ class JointCommandOutput: """Joint-centric command output from a task. Commands are addressed by joint name, NOT by hardware ID. - The orchestrator routes commands to the appropriate hardware. + The coordinator routes commands to the appropriate hardware. This design enables: - WBC spanning multiple hardware interfaces @@ -133,7 +134,7 @@ class JointCommandOutput: mode: Control mode - must match which field is populated """ - joint_names: list[str] + joint_names: list[JointName] positions: list[float] | None = None velocities: list[float] | None = None efforts: list[float] | None = None @@ -170,14 +171,14 @@ def get_values(self) -> list[float] | None: @runtime_checkable class ControlTask(Protocol): - """Protocol for passive tasks that run within the orchestrator. + """Protocol for passive tasks that run within the coordinator. - Tasks are "passive" - they don't own threads. The orchestrator + Tasks are "passive" - they don't own threads. The coordinator calls compute() at each tick, passing current state and time. Lifecycle: - 1. Task is added to orchestrator via add_task() - 2. Orchestrator calls claim() to understand resource needs + 1. Task is added to coordinator via add_task() + 2. Coordinator calls claim() to understand resource needs 3. Each tick: is_active() → compute() → output merged via arbitration 4. Task removed via remove_task() or transitions to inactive @@ -199,7 +200,7 @@ class ControlTask(Protocol): ... def is_active(self) -> bool: ... return self._executing ... - ... def compute(self, state: OrchestratorState) -> JointCommandOutput | None: + ... def compute(self, state: CoordinatorState) -> JointCommandOutput | None: ... # Use state.t_now, NOT time.time()! ... t_elapsed = state.t_now - self._start_time ... positions = self._trajectory.sample(t_elapsed) @@ -217,14 +218,14 @@ def name(self) -> str: """Unique identifier for this task instance. Used for logging, debugging, and task management. - Must be unique across all tasks in the orchestrator. + Must be unique across all tasks in the coordinator. """ ... def claim(self) -> ResourceClaim: """Declare resource requirements. - Called by orchestrator to determine: + Called by coordinator to determine: - Which joints this task wants to control - Priority for conflict resolution - Control mode (position/velocity/effort) @@ -233,7 +234,7 @@ def claim(self) -> ResourceClaim: ResourceClaim with joints (frozenset) and priority (int) Note: - The claim can change dynamically - orchestrator calls this + The claim can change dynamically - coordinator calls this every tick for active tasks. """ ... @@ -251,16 +252,16 @@ def is_active(self) -> bool: """ ... - def compute(self, state: OrchestratorState) -> JointCommandOutput | None: + def compute(self, state: CoordinatorState) -> JointCommandOutput | None: """Compute output command given current state. - Called by orchestrator for active tasks each tick. + Called by coordinator for active tasks each tick. CRITICAL: Use state.t_now for timing, NEVER time.time()! This ensures deterministic behavior and enables simulation. Args: - state: OrchestratorState containing: + state: CoordinatorState containing: - joints: JointStateSnapshot with all joint states - t_now: Current tick time (use this for all timing!) - dt: Time since last tick @@ -271,7 +272,7 @@ def compute(self, state: OrchestratorState) -> JointCommandOutput | None: """ ... - def on_preempted(self, by_task: str, joints: frozenset[str]) -> None: + def on_preempted(self, by_task: str, joints: frozenset[JointName]) -> None: """Called ONCE per tick with ALL preempted joints aggregated. Called when a higher-priority task takes control of some of this @@ -292,8 +293,9 @@ def on_preempted(self, by_task: str, joints: frozenset[str]) -> None: "ControlMode", # Protocol "ControlTask", + "CoordinatorState", "JointCommandOutput", + "JointName", "JointStateSnapshot", - "OrchestratorState", "ResourceClaim", ] diff --git a/dimos/control/tasks/trajectory_task.py b/dimos/control/tasks/trajectory_task.py index 08e3ae337e..676f1dac70 100644 --- a/dimos/control/tasks/trajectory_task.py +++ b/dimos/control/tasks/trajectory_task.py @@ -12,13 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Joint trajectory task for the ControlOrchestrator. +"""Joint trajectory task for the ControlCoordinator. -Passive trajectory execution - called by orchestrator each tick. +Passive trajectory execution - called by coordinator each tick. Unlike JointTrajectoryController which owns a thread, this task -is compute-only and relies on the orchestrator for timing. +is compute-only and relies on the coordinator for timing. -CRITICAL: Uses t_now from OrchestratorState, never calls time.time() +CRITICAL: Uses t_now from CoordinatorState, never calls time.time() """ from __future__ import annotations @@ -28,8 +28,8 @@ from dimos.control.task import ( ControlMode, ControlTask, + CoordinatorState, JointCommandOutput, - OrchestratorState, ResourceClaim, ) from dimos.msgs.trajectory_msgs import JointTrajectory, TrajectoryState @@ -55,9 +55,9 @@ class JointTrajectoryTask(ControlTask): """Passive trajectory execution task. Unlike JointTrajectoryController which owns a thread, this task - is called by the orchestrator at each tick. + is called by the coordinator at each tick. - CRITICAL: Uses t_now from OrchestratorState, never calls time.time() + CRITICAL: Uses t_now from CoordinatorState, never calls time.time() State Machine: IDLE ──execute()──► EXECUTING ──done──► COMPLETED @@ -74,8 +74,8 @@ class JointTrajectoryTask(ControlTask): ... priority=10, ... ), ... ) - >>> orchestrator.add_task(task) - >>> task.execute(my_trajectory, t_now=orchestrator_t_now) + >>> coordinator.add_task(task) + >>> task.execute(my_trajectory, t_now=coordinator_t_now) """ def __init__(self, name: str, config: JointTrajectoryTaskConfig) -> None: @@ -117,13 +117,13 @@ def is_active(self) -> bool: """Check if task should run this tick.""" return self._state == TrajectoryState.EXECUTING - def compute(self, state: OrchestratorState) -> JointCommandOutput | None: + def compute(self, state: CoordinatorState) -> JointCommandOutput | None: """Compute trajectory output for this tick. CRITICAL: Uses state.t_now for timing, NOT time.time()! Args: - state: Current orchestrator state + state: Current coordinator state Returns: JointCommandOutput with positions, or None if not executing @@ -244,7 +244,7 @@ def get_progress(self, t_now: float) -> float: """Get execution progress (0.0 to 1.0). Args: - t_now: Current orchestrator time + t_now: Current coordinator time Returns: Progress as fraction, or 0.0 if not executing diff --git a/dimos/control/test_control.py b/dimos/control/test_control.py index 2522affa60..8d51537bb3 100644 --- a/dimos/control/test_control.py +++ b/dimos/control/test_control.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Tests for the Control Orchestrator module.""" +"""Tests for the Control Coordinator module.""" from __future__ import annotations @@ -22,12 +22,13 @@ import pytest +from dimos.control.components import HardwareComponent, HardwareType, make_joints from dimos.control.hardware_interface import BackendHardwareInterface from dimos.control.task import ( ControlMode, + CoordinatorState, JointCommandOutput, JointStateSnapshot, - OrchestratorState, ResourceClaim, ) from dimos.control.tasks.trajectory_task import ( @@ -61,11 +62,12 @@ def mock_backend(): @pytest.fixture def hardware_interface(mock_backend): """Create a BackendHardwareInterface with mock backend.""" - return BackendHardwareInterface( - backend=mock_backend, + component = HardwareComponent( hardware_id="test_arm", - joint_prefix="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("arm", 6), ) + return BackendHardwareInterface(backend=mock_backend, component=component) @pytest.fixture @@ -99,15 +101,15 @@ def simple_trajectory(): @pytest.fixture -def orchestrator_state(): - """Create a sample OrchestratorState.""" +def coordinator_state(): + """Create a sample CoordinatorState.""" joints = JointStateSnapshot( joint_positions={"arm_joint1": 0.0, "arm_joint2": 0.0, "arm_joint3": 0.0}, joint_velocities={"arm_joint1": 0.0, "arm_joint2": 0.0, "arm_joint3": 0.0}, joint_efforts={"arm_joint1": 0.0, "arm_joint2": 0.0, "arm_joint3": 0.0}, timestamp=time.perf_counter(), ) - return OrchestratorState(joints=joints, t_now=time.perf_counter(), dt=0.01) + return CoordinatorState(joints=joints, t_now=time.perf_counter(), dt=0.01) # ============================================================================= @@ -190,10 +192,10 @@ def test_read_state(self, hardware_interface): state = hardware_interface.read_state() assert "arm_joint1" in state assert len(state) == 6 - pos, vel, eff = state["arm_joint1"] - assert pos == 0.0 - assert vel == 0.0 - assert eff == 0.0 + joint_state = state["arm_joint1"] + assert joint_state.position == 0.0 + assert joint_state.velocity == 0.0 + assert joint_state.effort == 0.0 def test_write_command(self, hardware_interface, mock_backend): commands = { @@ -229,23 +231,21 @@ def test_execute_trajectory(self, trajectory_task, simple_trajectory): assert trajectory_task.is_active() assert trajectory_task.get_state() == TrajectoryState.EXECUTING - def test_compute_during_trajectory( - self, trajectory_task, simple_trajectory, orchestrator_state - ): + def test_compute_during_trajectory(self, trajectory_task, simple_trajectory, coordinator_state): t_start = time.perf_counter() trajectory_task.execute(simple_trajectory) # First compute sets start time (deferred start) - state0 = OrchestratorState( - joints=orchestrator_state.joints, + state0 = CoordinatorState( + joints=coordinator_state.joints, t_now=t_start, dt=0.01, ) trajectory_task.compute(state0) # Compute at 0.5s into trajectory - state = OrchestratorState( - joints=orchestrator_state.joints, + state = CoordinatorState( + joints=coordinator_state.joints, t_now=t_start + 0.5, dt=0.01, ) @@ -256,21 +256,21 @@ def test_compute_during_trajectory( assert len(output.positions) == 3 assert 0.4 < output.positions[0] < 0.6 - def test_trajectory_completes(self, trajectory_task, simple_trajectory, orchestrator_state): + def test_trajectory_completes(self, trajectory_task, simple_trajectory, coordinator_state): t_start = time.perf_counter() trajectory_task.execute(simple_trajectory) # First compute sets start time (deferred start) - state0 = OrchestratorState( - joints=orchestrator_state.joints, + state0 = CoordinatorState( + joints=coordinator_state.joints, t_now=t_start, dt=0.01, ) trajectory_task.compute(state0) # Compute past trajectory duration - state = OrchestratorState( - joints=orchestrator_state.joints, + state = CoordinatorState( + joints=coordinator_state.joints, t_now=t_start + 1.5, dt=0.01, ) @@ -297,13 +297,13 @@ def test_preemption(self, trajectory_task, simple_trajectory): assert trajectory_task.get_state() == TrajectoryState.ABORTED assert not trajectory_task.is_active() - def test_progress(self, trajectory_task, simple_trajectory, orchestrator_state): + def test_progress(self, trajectory_task, simple_trajectory, coordinator_state): t_start = time.perf_counter() trajectory_task.execute(simple_trajectory) # First compute sets start time (deferred start) - state0 = OrchestratorState( - joints=orchestrator_state.joints, + state0 = CoordinatorState( + joints=coordinator_state.joints, t_now=t_start, dt=0.01, ) @@ -429,7 +429,12 @@ def test_non_overlapping_joints(self): class TestTickLoop: def test_tick_loop_starts_and_stops(self, mock_backend): - hw = BackendHardwareInterface(mock_backend, "arm", "arm") + component = HardwareComponent( + hardware_id="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("arm", 6), + ) + hw = BackendHardwareInterface(mock_backend, component) hardware = {"arm": hw} tasks: dict = {} joint_to_hardware = {f"arm_joint{i + 1}": "arm" for i in range(6)} @@ -453,7 +458,12 @@ def test_tick_loop_starts_and_stops(self, mock_backend): assert tick_loop.tick_count == final_count def test_tick_loop_calls_compute(self, mock_backend): - hw = BackendHardwareInterface(mock_backend, "arm", "arm") + component = HardwareComponent( + hardware_id="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("arm", 6), + ) + hw = BackendHardwareInterface(mock_backend, component) hardware = {"arm": hw} mock_task = MagicMock() @@ -495,7 +505,12 @@ def test_tick_loop_calls_compute(self, mock_backend): class TestIntegration: def test_full_trajectory_execution(self, mock_backend): - hw = BackendHardwareInterface(mock_backend, "arm", "arm") + component = HardwareComponent( + hardware_id="arm", + hardware_type=HardwareType.MANIPULATOR, + joints=make_joints("arm", 6), + ) + hw = BackendHardwareInterface(mock_backend, component) hardware = {"arm": hw} config = JointTrajectoryTaskConfig( diff --git a/dimos/control/tick_loop.py b/dimos/control/tick_loop.py index 03e4e0ebd0..7812325ddf 100644 --- a/dimos/control/tick_loop.py +++ b/dimos/control/tick_loop.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Tick loop for the ControlOrchestrator. +"""Tick loop for the ControlCoordinator. This module contains the core control loop logic: - Read state from all hardware @@ -21,7 +21,7 @@ - Route commands to hardware - Publish aggregated joint state -Separated from orchestrator.py following the DimOS pattern of +Separated from coordinator.py following the DimOS pattern of splitting coordination logic from module wrapper. """ @@ -33,9 +33,9 @@ from dimos.control.task import ( ControlTask, + CoordinatorState, JointCommandOutput, JointStateSnapshot, - OrchestratorState, ResourceClaim, ) from dimos.msgs.sensor_msgs import JointState @@ -60,7 +60,7 @@ class JointWinner(NamedTuple): class TickLoop: - """Core tick loop for the control orchestrator. + """Core tick loop for the control coordinator. Runs the deterministic control cycle: 1. READ: Collect joint state from all hardware @@ -92,7 +92,7 @@ def __init__( task_lock: threading.Lock, joint_to_hardware: dict[str, str], publish_callback: Callable[[JointState], None] | None = None, - frame_id: str = "orchestrator", + frame_id: str = "coordinator", log_ticks: bool = False, ) -> None: self._tick_rate = tick_rate @@ -133,7 +133,7 @@ def start(self) -> None: self._tick_thread = threading.Thread( target=self._loop, - name="ControlOrchestrator-Tick", + name="ControlCoordinator-Tick", daemon=True, ) self._tick_thread.start() @@ -173,7 +173,7 @@ def _tick(self) -> None: # === PHASE 1: READ ALL HARDWARE === joint_states = self._read_all_hardware() - state = OrchestratorState(joints=joint_states, t_now=t_now, dt=dt) + state = CoordinatorState(joints=joint_states, t_now=t_now, dt=dt) # === PHASE 2: COMPUTE ALL ACTIVE TASKS === commands = self._compute_all_tasks(state) @@ -213,10 +213,10 @@ def _read_all_hardware(self) -> JointStateSnapshot: for hw in self._hardware.values(): try: state = hw.read_state() - for joint_name, (pos, vel, eff) in state.items(): - joint_positions[joint_name] = pos - joint_velocities[joint_name] = vel - joint_efforts[joint_name] = eff + for joint_name, joint_state in state.items(): + joint_positions[joint_name] = joint_state.position + joint_velocities[joint_name] = joint_state.velocity + joint_efforts[joint_name] = joint_state.effort except Exception as e: logger.error(f"Failed to read {hw.hardware_id}: {e}") @@ -228,7 +228,7 @@ def _read_all_hardware(self) -> JointStateSnapshot: ) def _compute_all_tasks( - self, state: OrchestratorState + self, state: CoordinatorState ) -> list[tuple[ControlTask, ResourceClaim, JointCommandOutput | None]]: """Compute outputs from all active tasks.""" results: list[tuple[ControlTask, ResourceClaim, JointCommandOutput | None]] = [] diff --git a/dimos/e2e_tests/test_control_orchestrator.py b/dimos/e2e_tests/test_control_coordinator.py similarity index 72% rename from dimos/e2e_tests/test_control_orchestrator.py rename to dimos/e2e_tests/test_control_coordinator.py index aa820d66ec..06f25f7a93 100644 --- a/dimos/e2e_tests/test_control_orchestrator.py +++ b/dimos/e2e_tests/test_control_coordinator.py @@ -12,13 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""End-to-end tests for the ControlOrchestrator. +"""End-to-end tests for the ControlCoordinator. -These tests start a real orchestrator process and communicate via LCM/RPC. +These tests start a real coordinator process and communicate via LCM/RPC. Unlike unit tests, these verify the full system integration. Run with: - pytest dimos/e2e_tests/test_control_orchestrator.py -v -s + pytest dimos/e2e_tests/test_control_coordinator.py -v -s """ import os @@ -26,27 +26,26 @@ import pytest -from dimos.control.orchestrator import ControlOrchestrator +from dimos.control.coordinator import ControlCoordinator from dimos.core.rpc_client import RPCClient from dimos.msgs.sensor_msgs import JointState from dimos.msgs.trajectory_msgs import JointTrajectory, TrajectoryPoint, TrajectoryState @pytest.mark.skipif(bool(os.getenv("CI")), reason="LCM doesn't work in CI.") -@pytest.mark.e2e -class TestControlOrchestratorE2E: - """End-to-end tests for ControlOrchestrator.""" +class TestControlCoordinatorE2E: + """End-to-end tests for ControlCoordinator.""" - def test_orchestrator_starts_and_responds_to_rpc(self, lcm_spy, start_blueprint) -> None: - """Test that orchestrator starts and responds to RPC queries.""" + def test_coordinator_starts_and_responds_to_rpc(self, lcm_spy, start_blueprint) -> None: + """Test that coordinator starts and responds to RPC queries.""" # Save topics we care about (LCM topics include type suffix) - joint_state_topic = "/orchestrator/joint_state#sensor_msgs.JointState" + joint_state_topic = "/coordinator/joint_state#sensor_msgs.JointState" lcm_spy.save_topic(joint_state_topic) - lcm_spy.save_topic("/rpc/ControlOrchestrator/list_joints/res") - lcm_spy.save_topic("/rpc/ControlOrchestrator/list_tasks/res") + lcm_spy.save_topic("/rpc/ControlCoordinator/list_joints/res") + lcm_spy.save_topic("/rpc/ControlCoordinator/list_tasks/res") - # Start the mock orchestrator blueprint - start_blueprint("orchestrator-mock") + # Start the mock coordinator blueprint + start_blueprint("coordinator-mock") # Wait for joint state to be published (proves tick loop is running) lcm_spy.wait_for_saved_topic( @@ -55,7 +54,7 @@ def test_orchestrator_starts_and_responds_to_rpc(self, lcm_spy, start_blueprint) ) # Create RPC client and query - client = RPCClient(None, ControlOrchestrator) + client = RPCClient(None, ControlCoordinator) try: # Test list_joints RPC joints = client.list_joints() @@ -75,23 +74,23 @@ def test_orchestrator_starts_and_responds_to_rpc(self, lcm_spy, start_blueprint) finally: client.stop_rpc_client() - def test_orchestrator_executes_trajectory(self, lcm_spy, start_blueprint) -> None: - """Test that orchestrator executes a trajectory via RPC.""" + def test_coordinator_executes_trajectory(self, lcm_spy, start_blueprint) -> None: + """Test that coordinator executes a trajectory via RPC.""" # Save topics - lcm_spy.save_topic("/orchestrator/joint_state#sensor_msgs.JointState") - lcm_spy.save_topic("/rpc/ControlOrchestrator/execute_trajectory/res") - lcm_spy.save_topic("/rpc/ControlOrchestrator/get_trajectory_status/res") + lcm_spy.save_topic("/coordinator/joint_state#sensor_msgs.JointState") + lcm_spy.save_topic("/rpc/ControlCoordinator/execute_trajectory/res") + lcm_spy.save_topic("/rpc/ControlCoordinator/get_trajectory_status/res") - # Start orchestrator - start_blueprint("orchestrator-mock") + # Start coordinator + start_blueprint("coordinator-mock") # Wait for it to be ready lcm_spy.wait_for_saved_topic( - "/orchestrator/joint_state#sensor_msgs.JointState", timeout=10.0 + "/coordinator/joint_state#sensor_msgs.JointState", timeout=10.0 ) # Create RPC client - client = RPCClient(None, ControlOrchestrator) + client = RPCClient(None, ControlCoordinator) try: # Get initial joint positions initial_positions = client.get_joint_positions() @@ -134,13 +133,13 @@ def test_orchestrator_executes_trajectory(self, lcm_spy, start_blueprint) -> Non finally: client.stop_rpc_client() - def test_orchestrator_joint_state_published(self, lcm_spy, start_blueprint) -> None: + def test_coordinator_joint_state_published(self, lcm_spy, start_blueprint) -> None: """Test that joint state messages are published at expected rate.""" - joint_state_topic = "/orchestrator/joint_state#sensor_msgs.JointState" + joint_state_topic = "/coordinator/joint_state#sensor_msgs.JointState" lcm_spy.save_topic(joint_state_topic) - # Start orchestrator - start_blueprint("orchestrator-mock") + # Start coordinator + start_blueprint("coordinator-mock") # Wait for initial message lcm_spy.wait_for_saved_topic(joint_state_topic, timeout=10.0) @@ -164,17 +163,17 @@ def test_orchestrator_joint_state_published(self, lcm_spy, start_blueprint) -> N assert len(joint_state.position) == 7 assert "arm_joint1" in joint_state.name - def test_orchestrator_cancel_trajectory(self, lcm_spy, start_blueprint) -> None: + def test_coordinator_cancel_trajectory(self, lcm_spy, start_blueprint) -> None: """Test that a running trajectory can be cancelled.""" - lcm_spy.save_topic("/orchestrator/joint_state#sensor_msgs.JointState") + lcm_spy.save_topic("/coordinator/joint_state#sensor_msgs.JointState") - # Start orchestrator - start_blueprint("orchestrator-mock") + # Start coordinator + start_blueprint("coordinator-mock") lcm_spy.wait_for_saved_topic( - "/orchestrator/joint_state#sensor_msgs.JointState", timeout=10.0 + "/coordinator/joint_state#sensor_msgs.JointState", timeout=10.0 ) - client = RPCClient(None, ControlOrchestrator) + client = RPCClient(None, ControlCoordinator) try: # Create a long trajectory (5 seconds) trajectory = JointTrajectory( @@ -209,22 +208,22 @@ def test_orchestrator_cancel_trajectory(self, lcm_spy, start_blueprint) -> None: finally: client.stop_rpc_client() - def test_dual_arm_orchestrator(self, lcm_spy, start_blueprint) -> None: - """Test dual-arm orchestrator with independent trajectories.""" - lcm_spy.save_topic("/orchestrator/joint_state#sensor_msgs.JointState") + def test_dual_arm_coordinator(self, lcm_spy, start_blueprint) -> None: + """Test dual-arm coordinator with independent trajectories.""" + lcm_spy.save_topic("/coordinator/joint_state#sensor_msgs.JointState") - # Start dual-arm mock orchestrator - start_blueprint("orchestrator-dual-mock") + # Start dual-arm mock coordinator + start_blueprint("coordinator-dual-mock") lcm_spy.wait_for_saved_topic( - "/orchestrator/joint_state#sensor_msgs.JointState", timeout=10.0 + "/coordinator/joint_state#sensor_msgs.JointState", timeout=10.0 ) - client = RPCClient(None, ControlOrchestrator) + client = RPCClient(None, ControlCoordinator) try: # Verify both arms present joints = client.list_joints() - assert "left_joint1" in joints - assert "right_joint1" in joints + assert "left_arm_joint1" in joints + assert "right_arm_joint1" in joints tasks = client.list_tasks() assert "traj_left" in tasks @@ -232,7 +231,7 @@ def test_dual_arm_orchestrator(self, lcm_spy, start_blueprint) -> None: # Create trajectories for both arms left_trajectory = JointTrajectory( - joint_names=[f"left_joint{i + 1}" for i in range(7)], + joint_names=[f"left_arm_joint{i + 1}" for i in range(7)], points=[ TrajectoryPoint(time_from_start=0.0, positions=[0.0] * 7), TrajectoryPoint(time_from_start=0.5, positions=[0.2] * 7), @@ -240,7 +239,7 @@ def test_dual_arm_orchestrator(self, lcm_spy, start_blueprint) -> None: ) right_trajectory = JointTrajectory( - joint_names=[f"right_joint{i + 1}" for i in range(6)], + joint_names=[f"right_arm_joint{i + 1}" for i in range(6)], points=[ TrajectoryPoint(time_from_start=0.0, positions=[0.0] * 6), TrajectoryPoint(time_from_start=0.5, positions=[0.3] * 6), diff --git a/dimos/manipulation/control/orchestrator_client.py b/dimos/manipulation/control/coordinator_client.py similarity index 88% rename from dimos/manipulation/control/orchestrator_client.py rename to dimos/manipulation/control/coordinator_client.py index 84e85dfb3d..b30c1f46f7 100644 --- a/dimos/manipulation/control/orchestrator_client.py +++ b/dimos/manipulation/control/coordinator_client.py @@ -14,30 +14,30 @@ # limitations under the License. """ -Interactive client for the ControlOrchestrator. +Interactive client for the ControlCoordinator. -Interfaces with a running ControlOrchestrator via RPC to: +Interfaces with a running ControlCoordinator via RPC to: - Query hardware and task status - Plan and execute trajectories on single or multiple arms - Monitor execution progress Usage: - # Terminal 1: Start the orchestrator - dimos run orchestrator-mock # Single arm - dimos run orchestrator-dual-mock # Dual arm + # Terminal 1: Start the coordinator + dimos run coordinator-mock # Single arm + dimos run coordinator-dual-mock # Dual arm # Terminal 2: Run this client - python -m dimos.manipulation.control.orchestrator_client - python -m dimos.manipulation.control.orchestrator_client --task traj_left - python -m dimos.manipulation.control.orchestrator_client --task traj_right + python -m dimos.manipulation.control.coordinator_client + python -m dimos.manipulation.control.coordinator_client --task traj_left + python -m dimos.manipulation.control.coordinator_client --task traj_right How it works: - 1. Connects to ControlOrchestrator via LCM RPC + 1. Connects to ControlCoordinator via LCM RPC 2. Queries available hardware/tasks/joints 3. You add waypoints (joint positions) 4. Generates trajectory with trapezoidal velocity profile - 5. Sends trajectory to orchestrator via execute_trajectory() RPC - 6. Orchestrator's tick loop executes it at 100Hz + 5. Sends trajectory to coordinator via execute_trajectory() RPC + 6. Coordinator's tick loop executes it at 100Hz """ from __future__ import annotations @@ -47,7 +47,7 @@ import time from typing import TYPE_CHECKING, Any -from dimos.control.orchestrator import ControlOrchestrator +from dimos.control.coordinator import ControlCoordinator from dimos.core.rpc_client import RPCClient from dimos.manipulation.planning import JointTrajectoryGenerator @@ -55,17 +55,17 @@ from dimos.msgs.trajectory_msgs import JointTrajectory -class OrchestratorClient: +class CoordinatorClient: """ - RPC client for the ControlOrchestrator. + RPC client for the ControlCoordinator. - Connects to a running orchestrator and provides methods to: + Connects to a running coordinator and provides methods to: - Query state (joints, tasks, hardware) - Execute trajectories on any task - Monitor progress Example: - client = OrchestratorClient() + client = CoordinatorClient() # Query state print(client.list_hardware()) # ['left_arm', 'right_arm'] @@ -84,8 +84,8 @@ class OrchestratorClient: """ def __init__(self) -> None: - """Initialize connection to orchestrator via RPC.""" - self._rpc = RPCClient(None, ControlOrchestrator) + """Initialize connection to coordinator via RPC.""" + self._rpc = RPCClient(None, ControlCoordinator) # Per-task state self._current_task: str | None = None @@ -144,7 +144,7 @@ def select_task(self, task_name: str) -> bool: """ Select a task and setup its trajectory generator. - This queries the orchestrator to find which joints the task controls, + This queries the coordinator to find which joints the task controls, then creates a trajectory generator for those joints. """ tasks = self.list_tasks() @@ -155,14 +155,18 @@ def select_task(self, task_name: str) -> bool: self._current_task = task_name # Get joints for this task (infer from task name pattern) - # e.g., "traj_left" -> joints starting with "left_" + # e.g., "traj_left" -> joints starting with "left_arm_" (hardware_id based naming) # e.g., "traj_arm" -> joints starting with "arm_" all_joints = self.list_joints() - # Try to infer prefix from task name + # Try to infer hardware_id from task name if "_" in task_name: - prefix = task_name.split("_", 1)[1] # "traj_left" -> "left" - task_joints = [j for j in all_joints if j.startswith(prefix + "_")] + suffix = task_name.split("_", 1)[1] # "traj_left" -> "left" + # Try both patterns: exact suffix (e.g., "arm_") and with "_arm" suffix (e.g., "left_arm_") + task_joints = [j for j in all_joints if j.startswith(suffix + "_")] + if not task_joints: + # Try with "_arm" suffix for dual-arm setups (left -> left_arm) + task_joints = [j for j in all_joints if j.startswith(suffix + "_arm_")] else: task_joints = all_joints @@ -313,7 +317,7 @@ def preview_trajectory(trajectory: JointTrajectory, joint_names: list[str]) -> N print("=" * 70) -def wait_for_completion(client: OrchestratorClient, task_name: str, timeout: float = 60.0) -> bool: +def wait_for_completion(client: CoordinatorClient, task_name: str, timeout: float = 60.0) -> bool: """Wait for trajectory to complete with progress display.""" start = time.time() last_progress = -1.0 @@ -339,10 +343,10 @@ def wait_for_completion(client: OrchestratorClient, task_name: str, timeout: flo return False -class OrchestratorShell: - """IPython shell interface for orchestrator control.""" +class CoordinatorShell: + """IPython shell interface for coordinator control.""" - def __init__(self, client: OrchestratorClient, initial_task: str) -> None: + def __init__(self, client: CoordinatorClient, initial_task: str) -> None: self._client = client self._current_task = initial_task self._waypoints: list[list[float]] = [] @@ -359,7 +363,7 @@ def _num_joints(self) -> int: def help(self) -> None: """Show available commands.""" - print("\nOrchestrator Client Commands:") + print("\nCoordinator Client Commands:") print("=" * 60) print("Waypoint Commands:") print(" here() - Add current position as waypoint") @@ -557,14 +561,14 @@ def accel(self, value: float | None = None) -> None: print(f"Max acceleration: {value:.2f} rad/s^2") -def interactive_mode(client: OrchestratorClient, initial_task: str) -> None: +def interactive_mode(client: CoordinatorClient, initial_task: str) -> None: """Start IPython interactive mode.""" import IPython - shell = OrchestratorShell(client, initial_task) + shell = CoordinatorShell(client, initial_task) print("\n" + "=" * 60) - print(f"Orchestrator Client (IPython) - Task: {initial_task}") + print(f"Coordinator Client (IPython) - Task: {initial_task}") print("=" * 60) print(f"Joints: {', '.join(shell._joints())}") print("\nType help() for available commands") @@ -596,15 +600,15 @@ def interactive_mode(client: OrchestratorClient, initial_task: str) -> None: ) -def _run_client(client: OrchestratorClient, task: str, vel: float, accel: float) -> int: +def _run_client(client: CoordinatorClient, task: str, vel: float, accel: float) -> int: """Run the client with the given configuration.""" try: hardware = client.list_hardware() tasks = client.list_tasks() if not hardware: - print("\nWarning: No hardware found. Is the orchestrator running?") - print("Start with: dimos run orchestrator-mock") + print("\nWarning: No hardware found. Is the coordinator running?") + print("Start with: dimos run coordinator-mock") response = input("Continue anyway? [y/N]: ").strip().lower() if response != "y": return 0 @@ -614,7 +618,7 @@ def _run_client(client: OrchestratorClient, task: str, vel: float, accel: float) except Exception as e: print(f"\nConnection error: {e}") - print("Make sure orchestrator is running: dimos run orchestrator-mock") + print("Make sure coordinator is running: dimos run coordinator-mock") return 1 if task not in tasks and tasks: @@ -636,18 +640,18 @@ def main() -> int: import argparse parser = argparse.ArgumentParser( - description="Interactive client for ControlOrchestrator", + description="Interactive client for ControlCoordinator", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: - # Single arm (with orchestrator-mock running) - python -m dimos.manipulation.control.orchestrator_client + # Single arm (with coordinator-mock running) + python -m dimos.manipulation.control.coordinator_client # Dual arm - control left arm - python -m dimos.manipulation.control.orchestrator_client --task traj_left + python -m dimos.manipulation.control.coordinator_client --task traj_left # Dual arm - control right arm - python -m dimos.manipulation.control.orchestrator_client --task traj_right + python -m dimos.manipulation.control.coordinator_client --task traj_right """, ) parser.add_argument( @@ -671,11 +675,11 @@ def main() -> int: args = parser.parse_args() print("\n" + "=" * 70) - print("Orchestrator Client") + print("Coordinator Client") print("=" * 70) - print("\nConnecting to ControlOrchestrator via RPC...") + print("\nConnecting to ControlCoordinator via RPC...") - client = OrchestratorClient() + client = CoordinatorClient() try: return _run_client(client, args.task, args.vel, args.accel) finally: diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index c241485472..c6d45e8008 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -38,14 +38,14 @@ "unitree-g1-joystick": "dimos.robot.unitree_webrtc.unitree_g1_blueprints:with_joystick", "unitree-g1-full": "dimos.robot.unitree_webrtc.unitree_g1_blueprints:full_featured", "unitree-g1-detection": "dimos.robot.unitree_webrtc.unitree_g1_blueprints:detection", - # Control orchestrator blueprints - "orchestrator-mock": "dimos.control.blueprints:orchestrator_mock", - "orchestrator-xarm7": "dimos.control.blueprints:orchestrator_xarm7", - "orchestrator-xarm6": "dimos.control.blueprints:orchestrator_xarm6", - "orchestrator-piper": "dimos.control.blueprints:orchestrator_piper", - "orchestrator-dual-mock": "dimos.control.blueprints:orchestrator_dual_mock", - "orchestrator-dual-xarm": "dimos.control.blueprints:orchestrator_dual_xarm", - "orchestrator-piper-xarm": "dimos.control.blueprints:orchestrator_piper_xarm", + # Control coordinator blueprints + "coordinator-mock": "dimos.control.blueprints:coordinator_mock", + "coordinator-xarm7": "dimos.control.blueprints:coordinator_xarm7", + "coordinator-xarm6": "dimos.control.blueprints:coordinator_xarm6", + "coordinator-piper": "dimos.control.blueprints:coordinator_piper", + "coordinator-dual-mock": "dimos.control.blueprints:coordinator_dual_mock", + "coordinator-dual-xarm": "dimos.control.blueprints:coordinator_dual_xarm", + "coordinator-piper-xarm": "dimos.control.blueprints:coordinator_piper_xarm", # Demo blueprints "demo-camera": "dimos.hardware.sensors.camera.module:demo_camera", "demo-osm": "dimos.mapping.osm.demo_osm:demo_osm", @@ -83,8 +83,8 @@ "wavefront_frontier_explorer": "dimos.navigation.frontier_exploration.wavefront_frontier_goal_selector", "websocket_vis": "dimos.web.websocket_vis.websocket_vis_module", "web_input": "dimos.agents.cli.web", - # Control orchestrator module - "control_orchestrator": "dimos.control.orchestrator", + # Control coordinator module + "control_coordinator": "dimos.control.coordinator", }