Skip to content
58 changes: 35 additions & 23 deletions dimos/control/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""ControlOrchestrator - Centralized control for multi-arm coordination.
"""ControlCoordinator - Centralized control for multi-arm coordination.

This module provides a centralized control orchestrator that replaces
This module provides a centralized control coordinator that replaces
per-driver/per-controller loops with a single deterministic tick-based system.

Features:
Expand All @@ -25,68 +25,80 @@
- Aggregated preemption notifications

Example:
>>> from dimos.control import ControlOrchestrator
>>> from dimos.control import ControlCoordinator
>>> from dimos.control.tasks import JointTrajectoryTask, JointTrajectoryTaskConfig
>>> from dimos.hardware.manipulators.xarm import XArmBackend
>>>
>>> # Create orchestrator
>>> orch = ControlOrchestrator(tick_rate=100.0)
>>> # Create coordinator
>>> coord = ControlCoordinator(tick_rate=100.0)
>>>
>>> # Add hardware
>>> backend = XArmBackend(ip="192.168.1.185", dof=7)
>>> backend.connect()
>>> orch.add_hardware("left_arm", backend, joint_prefix="left")
>>> coord.add_hardware("left_arm", backend)
>>>
>>> # Add task
>>> joints = [f"left_joint{i+1}" for i in range(7)]
>>> joints = [f"left_arm_joint{i+1}" for i in range(7)]
>>> task = JointTrajectoryTask(
... "traj_left",
... JointTrajectoryTaskConfig(joint_names=joints, priority=10),
... )
>>> orch.add_task(task)
>>> coord.add_task(task)
>>>
>>> # Start
>>> orch.start()
>>> coord.start()
"""

from dimos.control.components import (
HardwareComponent,
HardwareId,
HardwareType,
JointName,
JointState,
make_joints,
)
from dimos.control.coordinator import (
ControlCoordinator,
ControlCoordinatorConfig,
TaskConfig,
control_coordinator,
)
from dimos.control.hardware_interface import (
BackendHardwareInterface,
HardwareInterface,
)
from dimos.control.orchestrator import (
ControlOrchestrator,
ControlOrchestratorConfig,
HardwareConfig,
TaskConfig,
control_orchestrator,
)
from dimos.control.task import (
ControlMode,
ControlTask,
CoordinatorState,
JointCommandOutput,
JointStateSnapshot,
OrchestratorState,
ResourceClaim,
)
from dimos.control.tick_loop import TickLoop

__all__ = [
# Hardware interface
"BackendHardwareInterface",
# Coordinator
"ControlCoordinator",
"ControlCoordinatorConfig",
"ControlMode",
# Orchestrator
"ControlOrchestrator",
"ControlOrchestratorConfig",
# Task protocol and types
"ControlTask",
"HardwareConfig",
"CoordinatorState",
"HardwareComponent",
"HardwareId",
"HardwareInterface",
"HardwareType",
"JointCommandOutput",
"JointName",
"JointState",
"JointStateSnapshot",
"OrchestratorState",
"ResourceClaim",
"TaskConfig",
# Tick loop
"TickLoop",
"control_orchestrator",
"control_coordinator",
"make_joints",
]
Loading
Loading