diff --git a/assets/scenes/fr3_simple_pick_up/scene.xml b/assets/scenes/fr3_simple_pick_up/scene.xml
index 88b1f269..d077d3bb 100644
--- a/assets/scenes/fr3_simple_pick_up/scene.xml
+++ b/assets/scenes/fr3_simple_pick_up/scene.xml
@@ -28,7 +28,7 @@
-
+
diff --git a/examples/teleop/README.md b/examples/teleop/README.md
index d4a56f2c..4d6bd730 100644
--- a/examples/teleop/README.md
+++ b/examples/teleop/README.md
@@ -1,33 +1,35 @@
-# Teleoperation with Meta Quest
+# Franka Teleoperation
+
+## Teleoperation with Meta Quest 3
Teleoperate your robot (optinally dual arm) with the Meta Quest
-## How does it work?
-In the script [`quest_iris_dual_arm.py`](quest_iris_dual_arm.py) we use the [IRIS platform](https://intuitive-robots.github.io/iris-project-page/index.html) to get controller poses from the meta quest.
+### How does it work?
+In the script [`franka.py`](franka.py) we use the [IRIS platform](https://intuitive-robots.github.io/iris-project-page/index.html) to get controller poses from the meta quest.
With the relative space wrapper and the relative to configured origin setting theses poses are then apply to the robot in a delta fashion whenever the trigger button is pressed.
The buttons are used to start and stop data recording with the [`StorageWrapper`](robot-control-stack/python/rcs/envs/storage_wrapper.py).
-## Installation
+### Installation
[Install RCS](https://robotcontrolstack.org/getting_started/index.html) and the [FR3 extension](https://robotcontrolstack.org/extensions/rcs_fr3.html) (the script is writte for the FR3 as example but can be easily adapted for other robots).
-Install the IRIS APK on your quest following [these instructions](https://github.com/intuitive-robots/IRIS-Meta-Quest3).
-Finally, install [SimPub](https://github.com/intuitive-robots/SimPublisher) the IRIS python client by
+Install the IRIS APK on your quest following [these instructions](https://github.com/intuitive-robots/IRIS-Meta-Quest3) use the apk released [here](https://github.com/RobotControlStack/IRIS-Meta-Quest3/releases/tag/rcsv1) to ensure compatiblity.
+Finally, install [SimPub](https://github.com/intuitive-robots/SimPublisher) the IRIS python client by (make sure to install it via requirements file to have the correct version)
```shell
pip install -r requirements.txt
```
-## Configuration
+### Configuration
-### Teleoperating in sim
+#### Teleoperating in sim
-1. go to [`quest_iris_dual_arm.py`](quest_iris_dual_arm.py) and set `ROBOT_INSTANCE = RobotPlatform.SIMULATION`
+1. go to [`franka.py`](quest_iris_dual_arm.py) and set `ROBOT_INSTANCE = RobotPlatform.SIMULATION`
-### Teleoperating a real robot
+#### Teleoperating a real robot
Note that dual arm is only supported for a aloha like setup where the robot face each other (for more advanced setups you need to change the transformation between the robots yourself).
1. put your robots into FCI mode
-2. go to [`quest_iris_dual_arm.py`](quest_iris_dual_arm.py), set `ROBOT_INSTANCE = RobotPlatform.HARDWARE` and set your IP addresses of your robots. Remove the left robot if you only have one.
+2. go to [`franka.py`](quest_iris_dual_arm.py), set `ROBOT_INSTANCE = RobotPlatform.HARDWARE` and set your IP addresses of your robots. Remove the left robot if you only have one.
-## Running
+### Running
1. make sure your computer and quest is in the same subnetwork and they can ping each other.
2. start IRIS meta quest app on your quest (it should be located in the Library under "Unkown Sources" after installation)
3. run the [`quest_align_frame.py`](quest_align_frame.py) script once. Navigate to the link printed on the top likly [http://127.0.0.1:7000](http://127.0.0.1:7000).
@@ -39,5 +41,13 @@ Note that dual arm is only supported for a aloha like setup where the robot face
5. use the right controller to change the orientation of the coordinate axis to fit your right robot (for franka: x front, y left, z up)
6. click the "teleportation scene" button on the still open website
7. cancel the script
-8. start the teleoperation script [`quest_iris_dual_arm.py`](quest_iris_dual_arm.py) and enjoy.
+8. start the teleoperation script [`franka.py`](quest_iris_dual_arm.py) and enjoy.
+
+## Teleoperation with Franka GELLO Duo
+Teleoperate your Franka Duo using the [Franka GELLO Duo](https://franka.de/de-de/product-prototypes).
+Install dependencies via
+```shell
+pip install -r requirements.txt
+```
+and make sure the `GelloConfig` is commented in and the `QuestConfig` is commented out and adapt your USB IDs to it.
\ No newline at end of file
diff --git a/examples/teleop/franka.py b/examples/teleop/franka.py
new file mode 100644
index 00000000..042b60e9
--- /dev/null
+++ b/examples/teleop/franka.py
@@ -0,0 +1,158 @@
+import logging
+from typing import Any
+
+import numpy as np
+from rcs._core import common
+from rcs._core.common import RobotPlatform
+from rcs._core.sim import SimConfig
+from rcs.camera.hw import HardwareCameraSet
+from rcs.envs.base import ControlMode
+from rcs.envs.creators import SimMultiEnvCreator
+from rcs.envs.utils import default_digit, default_sim_gripper_cfg, default_sim_robot_cfg
+from rcs.operator.gello import GelloConfig, GelloOperator
+from rcs.operator.interface import TeleopLoop
+from rcs.operator.quest import QuestConfig, QuestOperator
+from rcs_fr3.creators import RCSFR3MultiEnvCreator
+from rcs_fr3.utils import default_fr3_hw_gripper_cfg, default_fr3_hw_robot_cfg
+from rcs_realsense.utils import default_realsense
+from simpub.sim.mj_publisher import MujocoPublisher
+
+import rcs
+
+logger = logging.getLogger(__name__)
+
+
+ROBOT2IP = {
+ "right": "192.168.102.1",
+ "left": "192.168.101.1",
+}
+ROBOT2ID = {
+ "left": "1",
+ "right": "0",
+}
+
+
+# ROBOT_INSTANCE = RobotPlatform.SIMULATION
+ROBOT_INSTANCE = RobotPlatform.HARDWARE
+
+RECORD_FPS = 30
+# set camera dict to none disable cameras
+# CAMERA_DICT = {
+# "left_wrist": "230422272017",
+# "right_wrist": "230422271040",
+# "side": "243522070385",
+# "bird_eye": "243522070364",
+# }
+CAMERA_DICT = None
+MQ3_ADDR = "10.42.0.1"
+
+# DIGIT_DICT = {
+# "digit_right_left": "D21182",
+# "digit_right_right": "D21193"
+# }
+DIGIT_DICT = None
+
+
+DATASET_PATH = "test_data_iris_dual_arm14"
+INSTRUCTION = "build a tower with the blocks in front of you"
+
+robot2world = {
+ "right": rcs.common.Pose(
+ translation=np.array([0, 0, 0]), rpy_vector=np.array([0.89360858, -0.17453293, 0.46425758])
+ ),
+ "left": rcs.common.Pose(
+ translation=np.array([0, 0, 0]), rpy_vector=np.array([-0.89360858, -0.17453293, -0.46425758])
+ ),
+}
+
+config: QuestConfig | GelloConfig
+config = QuestConfig(mq3_addr=MQ3_ADDR, simulation=ROBOT_INSTANCE == RobotPlatform.SIMULATION)
+# config = GelloConfig(
+# arms={
+# "right": GelloArmConfig(com_port="/dev/serial/by-id/usb-ROBOTIS_OpenRB-150_E505008B503059384C2E3120FF07332D-if00"),
+# "left": GelloArmConfig(com_port="/dev/serial/by-id/usb-ROBOTIS_OpenRB-150_ABA78B05503059384C2E3120FF062F26-if00"),
+# },
+# simulation=ROBOT_INSTANCE == RobotPlatform.SIMULATION,
+# )
+
+
+def get_env():
+ if ROBOT_INSTANCE == RobotPlatform.HARDWARE:
+
+ cams: list[Any] = []
+ if CAMERA_DICT is not None:
+ cams.append(default_realsense(CAMERA_DICT))
+ if DIGIT_DICT is not None:
+ cams.append(default_digit(DIGIT_DICT))
+
+ camera_set = HardwareCameraSet(cams) if cams else None
+
+ env_rel = RCSFR3MultiEnvCreator()(
+ name2ip=ROBOT2IP,
+ camera_set=camera_set,
+ robot_cfg=default_fr3_hw_robot_cfg(async_control=True),
+ control_mode=config.operator_class.control_mode[0],
+ gripper_cfg=default_fr3_hw_gripper_cfg(async_control=True),
+ max_relative_movement=(
+ 0.5 if config.operator_class.control_mode[0] == ControlMode.JOINTS else (0.5, np.deg2rad(90))
+ ),
+ relative_to=config.operator_class.control_mode[1],
+ robot2world=robot2world,
+ )
+ # env_rel = StorageWrapper(
+ # env_rel, DATASET_PATH, INSTRUCTION, batch_size=32, max_rows_per_group=100, max_rows_per_file=1000
+ # )
+ operator = GelloOperator(config) if isinstance(config, GelloConfig) else QuestOperator(config)
+ else:
+ # FR3
+ rcs.scenes["duo"] = rcs.Scene(
+ mjcf_scene="/ssd_data/juelg/rcs_modern/rcs_models/output/fr3_duo_flexible.xml",
+ mjcf_robot=rcs.scenes["fr3_simple_pick_up"].mjcf_robot,
+ robot_type=common.RobotType.FR3,
+ )
+
+ robot_cfg = default_sim_robot_cfg("duo", idx="")
+
+ # resolution = (256, 256)
+ # cameras = {
+ # cam: SimCameraConfig(
+ # identifier=cam,
+ # type=CameraType.fixed,
+ # resolution_height=resolution[1],
+ # resolution_width=resolution[0],
+ # frame_rate=0,
+ # )
+ # for cam in ["side", "wrist"]
+ # }
+
+ sim_cfg = SimConfig()
+ sim_cfg.async_control = True
+ env_rel = SimMultiEnvCreator()(
+ name2id=ROBOT2ID,
+ robot_cfg=robot_cfg,
+ control_mode=GelloOperator.control_mode[0],
+ gripper_cfg=default_sim_gripper_cfg(),
+ # cameras=default_mujoco_cameraset_cfg(),
+ max_relative_movement=0.5,
+ relative_to=GelloOperator.control_mode[1],
+ sim_cfg=sim_cfg,
+ robot2world=robot2world,
+ )
+ # sim = env_rel.unwrapped.envs[ROBOT2IP.keys().__iter__().__next__()].sim # type: ignore
+ sim = env_rel.get_wrapper_attr("sim")
+ operator = GelloOperator(config, sim) if isinstance(config, GelloConfig) else QuestOperator(config, sim)
+ sim.open_gui()
+ MujocoPublisher(sim.model, sim.data, MQ3_ADDR, visible_geoms_groups=list(range(1, 3)))
+ return env_rel, operator
+
+
+def main():
+ env_rel, operator = get_env()
+ env_rel.reset()
+ tele = TeleopLoop(env_rel, operator)
+ with env_rel, tele: # type: ignore
+ tele.environment_step_loop()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/examples/teleop/quest_iris_dual_arm.py b/examples/teleop/quest_iris_dual_arm.py
deleted file mode 100644
index 6deeb5ae..00000000
--- a/examples/teleop/quest_iris_dual_arm.py
+++ /dev/null
@@ -1,354 +0,0 @@
-import logging
-import threading
-from time import sleep
-
-import numpy as np
-from rcs._core.common import RPY, Pose, RobotPlatform
-from rcs._core.sim import SimConfig
-from rcs.camera.hw import HardwareCameraSet
-from rcs.envs.base import (
- ControlMode,
- GripperDictType,
- LimitedTQuatRelDictType,
- RelativeActionSpace,
- RelativeTo,
-)
-from rcs.envs.creators import SimMultiEnvCreator
-from rcs.envs.storage_wrapper import StorageWrapper
-from rcs.envs.utils import default_digit, default_sim_gripper_cfg, default_sim_robot_cfg
-from rcs.utils import SimpleFrameRate
-from rcs_fr3.creators import RCSFR3MultiEnvCreator
-from rcs_fr3.utils import default_fr3_hw_gripper_cfg, default_fr3_hw_robot_cfg
-from rcs_realsense.utils import default_realsense
-from simpub.core.simpub_server import SimPublisher
-from simpub.parser.simdata import SimObject, SimScene
-from simpub.sim.mj_publisher import MujocoPublisher
-from simpub.xr_device.meta_quest3 import MetaQuest3
-
-logger = logging.getLogger(__name__)
-
-# download the iris apk from the following repo release: https://github.com/intuitive-robots/IRIS-Meta-Quest3
-# in order to use usb connection install adb install adb
-# sudo apt install android-tools-adb
-# install it on your quest with
-# adb install IRIS-Meta-Quest3.apk
-
-
-INCLUDE_ROTATION = True
-ROBOT2IP = {
- # "left": "192.168.102.1",
- "right": "192.168.101.1",
-}
-
-
-# ROBOT_INSTANCE = RobotPlatform.SIMULATION
-ROBOT_INSTANCE = RobotPlatform.HARDWARE
-RECORD_FPS = 30
-# set camera dict to none disable cameras
-# CAMERA_DICT = {
-# "left_wrist": "230422272017",
-# "right_wrist": "230422271040",
-# "side": "243522070385",
-# "bird_eye": "243522070364",
-# }
-CAMERA_DICT = None
-MQ3_ADDR = "10.42.0.1"
-# DIGIT_DICT = {
-# "digit_right_left": "D21182",
-# "digit_right_right": "D21193"
-# }
-DIGIT_DICT = None
-
-DATASET_PATH = "test_data_iris_dual_arm14"
-INSTRUCTION = "build a tower with the blocks in front of you"
-
-
-class MySimPublisher(SimPublisher):
- def get_update(self):
- return {}
-
-
-class MySimScene(SimScene):
- def __init__(self):
- super().__init__()
- self.root = SimObject(name="root")
-
-
-class QuestReader(threading.Thread):
-
- transform_to_robot = Pose()
-
- def __init__(self, env: RelativeActionSpace):
- super().__init__()
- self._reader = MetaQuest3("RCSNode")
-
- self._resource_lock = threading.Lock()
- self._env_lock = threading.Lock()
- self._reset_lock = threading.Lock()
- self._env = env
-
- self.controller_names = ROBOT2IP.keys() if ROBOT_INSTANCE == RobotPlatform.HARDWARE else ["right"]
- self._trg_btn = {"left": "index_trigger", "right": "index_trigger"}
- self._grp_btn = {"left": "hand_trigger", "right": "hand_trigger"}
- self._start_btn = "A"
- self._stop_btn = "B"
- self._unsuccessful_btn = "Y"
-
- self._prev_data = None
- self._exit_requested = False
- self._grp_pos = {key: 1.0 for key in self.controller_names} # start with opened gripper
- self._last_controller_pose = {key: Pose() for key in self.controller_names}
- self._offset_pose = {key: Pose() for key in self.controller_names}
-
- for robot in ROBOT2IP:
- self._env.envs[robot].set_origin_to_current()
-
- self._step_env = False
- self._set_frame = {key: Pose() for key in self.controller_names}
-
- def next_action(self) -> tuple[dict[str, Pose], dict[str, float]]:
- with self._resource_lock:
- transforms = {}
- for controller in self.controller_names:
- transform = Pose(
- translation=(
- self._last_controller_pose[controller].translation() # type: ignore
- - self._offset_pose[controller].translation()
- ),
- quaternion=(
- self._last_controller_pose[controller] * self._offset_pose[controller].inverse()
- ).rotation_q(),
- )
-
- set_axes = Pose(quaternion=self._set_frame[controller].rotation_q())
-
- transform = (
- QuestReader.transform_to_robot
- * set_axes.inverse()
- * transform
- * set_axes
- * QuestReader.transform_to_robot.inverse()
- )
- if not INCLUDE_ROTATION:
- transform = Pose(translation=transform.translation()) # identity rotation
- transforms[controller] = transform
- return transforms, {key: self._grp_pos[key] for key in self.controller_names}
-
- def run(self):
- rate_limiter = SimpleFrameRate(90, "teleop readout")
- warning_raised = False
- while not self._exit_requested:
- # pos, buttons = self._reader.get_transformations_and_buttons()
- input_data = self._reader.get_controller_data()
- if not warning_raised and input_data is None:
- logger.warning("[Quest Reader] packets empty")
- warning_raised = True
- sleep(0.5)
- continue
- if input_data is None:
- sleep(0.5)
- continue
- if warning_raised:
- logger.warning("[Quest Reader] packets arriving again")
- warning_raised = False
-
- # start recording
- if input_data[self._start_btn] and (self._prev_data is None or not self._prev_data[self._start_btn]):
- print("start button pressed")
- with self._env_lock:
- self._env.get_wrapper_attr("start_record")()
-
- if input_data[self._stop_btn] and (self._prev_data is None or not self._prev_data[self._stop_btn]):
- print("reset successful pressed: resetting env")
- with self._reset_lock:
- # set successful
- self._env.get_wrapper_attr("success")()
- # sleep to allow to let the robot reach the goal
- sleep(1)
- # this might also move the robot to the home position
- self._env.reset()
- for controller in self.controller_names:
- self._offset_pose[controller] = Pose()
- self._last_controller_pose[controller] = Pose()
- self._grp_pos[controller] = 1
- continue
-
- # reset unsuccessful
- if input_data[self._unsuccessful_btn] and (
- self._prev_data is None or not self._prev_data[self._unsuccessful_btn]
- ):
- print("reset unsuccessful pressed: resetting env")
- with self._env_lock:
- self._env.reset()
-
- for controller in self.controller_names:
- last_controller_pose = Pose(
- translation=np.array(input_data[controller]["pos"]),
- quaternion=np.array(input_data[controller]["rot"]),
- )
- if controller == "left":
- last_controller_pose = (
- Pose(translation=np.array([0, 0, 0]), rpy=RPY(roll=0, pitch=0, yaw=np.deg2rad(180))) # type: ignore
- * last_controller_pose
- )
-
- if input_data[controller][self._trg_btn[controller]] and (
- self._prev_data is None or not self._prev_data[controller][self._trg_btn[controller]]
- ):
- # trigger just pressed (first data sample with button pressed)
-
- with self._resource_lock:
- self._offset_pose[controller] = last_controller_pose
- self._last_controller_pose[controller] = last_controller_pose
-
- elif not input_data[controller][self._trg_btn[controller]] and (
- self._prev_data is None or self._prev_data[controller][self._trg_btn[controller]]
- ):
- # released
- transform = Pose(
- translation=(
- self._last_controller_pose[controller].translation() # type: ignore
- - self._offset_pose[controller].translation()
- ),
- quaternion=(
- self._last_controller_pose[controller] * self._offset_pose[controller].inverse()
- ).rotation_q(),
- )
- print(np.linalg.norm(transform.translation()))
- print(np.rad2deg(transform.total_angle()))
- with self._resource_lock:
- self._last_controller_pose[controller] = Pose()
- self._offset_pose[controller] = Pose()
- with self._env_lock:
- self._env.envs[controller].set_origin_to_current()
-
- elif input_data[controller][self._trg_btn[controller]]:
- # button is pressed
- with self._resource_lock:
- self._last_controller_pose[controller] = last_controller_pose
- else:
- # trg button is not pressed
- # TODO: deactivated for now
- # if data["buttons"][self._home_btn] and (
- # self._prev_data is None or not self._prev_data["buttons"][self._home_btn]
- # ):
- # # home button just pressed
- # with self._env_lock:
- # self._env.unwrapped.robot.move_home()
- # self._env.set_origin_to_current()
- pass
-
- if input_data[controller][self._grp_btn[controller]] and (
- self._prev_data is None or not self._prev_data[controller][self._grp_btn[controller]]
- ):
- # just pressed
- self._grp_pos[controller] = 0
- if not input_data[controller][self._grp_btn[controller]] and (
- self._prev_data is None or self._prev_data[controller][self._grp_btn[controller]]
- ):
- # just released
- self._grp_pos[controller] = 1
-
- self._prev_data = input_data
- rate_limiter()
-
- def stop(self):
- self._exit_requested = True
- self.join()
-
- def __enter__(self):
- self.start()
- return self
-
- def __exit__(self, *_):
- self.stop()
-
- def stop_env_loop(self):
- self._step_env = False
-
- def environment_step_loop(self):
- rate_limiter = SimpleFrameRate(RECORD_FPS, "env loop")
- self._step_env = True
- while self._step_env:
- if self._exit_requested:
- self._step_env = False
- break
- with self._reset_lock:
- transforms, grippers = self.next_action()
- actions = {}
- for robot, transform in transforms.items():
- action = dict(
- LimitedTQuatRelDictType(tquat=np.concatenate([transform.translation(), transform.rotation_q()])) # type: ignore
- )
-
- action.update(GripperDictType(gripper=grippers[robot]))
- actions[robot] = action
-
- self._env.step(actions)
- rate_limiter()
-
-
-def main():
- if ROBOT_INSTANCE == RobotPlatform.HARDWARE:
-
- camera_set = HardwareCameraSet([default_realsense(CAMERA_DICT), default_digit(DIGIT_DICT)]) if CAMERA_DICT is not None else None # type: ignore
- env_rel = RCSFR3MultiEnvCreator()(
- name2ip=ROBOT2IP,
- camera_set=camera_set,
- robot_cfg=default_fr3_hw_robot_cfg(async_control=True),
- control_mode=ControlMode.CARTESIAN_TQuat,
- gripper_cfg=default_fr3_hw_gripper_cfg(async_control=True),
- max_relative_movement=(0.5, np.deg2rad(90)),
- relative_to=RelativeTo.CONFIGURED_ORIGIN,
- )
- env_rel = StorageWrapper(
- env_rel, DATASET_PATH, INSTRUCTION, batch_size=32, max_rows_per_group=100, max_rows_per_file=1000
- )
- MySimPublisher(MySimScene(), MQ3_ADDR)
-
- # DIGITAL TWIN: comment out the line above and uncomment the lines below to use a digital twin
- # Note: only supported for one arm at the moment
-
- # robot_cfg = default_sim_robot_cfg("fr3_empty_world")
- # sim_cfg = SimConfig()
- # sim_cfg.async_control = True
- # twin_env = SimMultiEnvCreator()(
- # name2id=ROBOT2IP,
- # robot_cfg=robot_cfg,
- # control_mode=ControlMode.JOINTS,
- # gripper_cfg=default_sim_gripper_cfg(),
- # sim_cfg=sim_cfg,
- # )
- # sim = env_rel.unwrapped.envs[ROBOT2IP.keys().__iter__().__next__()].sim
- # sim.open_gui()
- # MujocoPublisher(sim.model, sim.data, MQ3_ADDR, visible_geoms_groups=list(range(1, 3)))
- # env_rel = DigitalTwin(env_rel, twin_env)
-
- else:
- # FR3
- robot_cfg = default_sim_robot_cfg("fr3_empty_world")
-
- sim_cfg = SimConfig()
- sim_cfg.async_control = True
- env_rel = SimMultiEnvCreator()(
- name2id=ROBOT2IP,
- robot_cfg=robot_cfg,
- control_mode=ControlMode.CARTESIAN_TQuat,
- gripper_cfg=default_sim_gripper_cfg(),
- # cameras=default_mujoco_cameraset_cfg(),
- max_relative_movement=0.5,
- relative_to=RelativeTo.CONFIGURED_ORIGIN,
- sim_cfg=sim_cfg,
- )
- sim = env_rel.unwrapped.envs[ROBOT2IP.keys().__iter__().__next__()].sim # type: ignore
- sim.open_gui()
- MujocoPublisher(sim.model, sim.data, MQ3_ADDR, visible_geoms_groups=list(range(1, 3)))
-
- env_rel.reset()
-
- with env_rel, QuestReader(env_rel) as action_server: # type: ignore
- action_server.environment_step_loop()
-
-
-if __name__ == "__main__":
- main()
diff --git a/examples/teleop/requirements.txt b/examples/teleop/requirements.txt
index db25a19a..b6deb24d 100644
--- a/examples/teleop/requirements.txt
+++ b/examples/teleop/requirements.txt
@@ -1 +1,3 @@
-simpub @ git+https://github.com/intuitive-robots/SimPublisher.git
\ No newline at end of file
+simpub @ git+https://github.com/intuitive-robots/SimPublisher.git@a0ebdbfba86f58aaa6360eb3b9da7b74a9c9b1dd
+dynamixel_sdk
+pynput
\ No newline at end of file
diff --git a/extensions/rcs_fr3/src/rcs_fr3/creators.py b/extensions/rcs_fr3/src/rcs_fr3/creators.py
index 0f61e2cc..488dbffa 100644
--- a/extensions/rcs_fr3/src/rcs_fr3/creators.py
+++ b/extensions/rcs_fr3/src/rcs_fr3/creators.py
@@ -121,7 +121,7 @@ def __call__( # type: ignore
# sim_gui=True,
# truncate_on_collision=False,
# )
- if max_relative_movement is not None:
+ if relative_to != RelativeTo.NONE:
env = RelativeActionSpace(env, max_mov=max_relative_movement, relative_to=relative_to)
return env
@@ -137,6 +137,7 @@ def __call__( # type: ignore
camera_set: HardwareCameraSet | None = None,
max_relative_movement: float | tuple[float, float] | None = None,
relative_to: RelativeTo = RelativeTo.LAST_STEP,
+ robot2world: dict[str, rcs.common.Pose] | None = None,
) -> gym.Env:
ik = rcs.common.Pin(
@@ -163,7 +164,7 @@ def __call__( # type: ignore
env = RelativeActionSpace(env, max_mov=max_relative_movement, relative_to=relative_to)
envs[key] = env
- env = MultiRobotWrapper(envs)
+ env = MultiRobotWrapper(envs, robot2world)
if camera_set is not None:
camera_set.start()
camera_set.wait_for_frames()
diff --git a/python/rcs/_core/sim.pyi b/python/rcs/_core/sim.pyi
index 26a187fc..c70d9209 100644
--- a/python/rcs/_core/sim.pyi
+++ b/python/rcs/_core/sim.pyi
@@ -136,6 +136,8 @@ class SimGripperConfig(rcs._core.common.GripperConfig):
min_actuator_width: float
min_joint_width: float
seconds_between_callbacks: float
+ def __copy__(self) -> SimGripperConfig: ...
+ def __deepcopy__(self, arg0: dict) -> SimGripperConfig: ...
def __init__(self) -> None: ...
def add_id(self, id: str) -> None: ...
@@ -169,6 +171,8 @@ class SimRobotConfig(rcs._core.common.RobotConfig):
mjcf_scene_path: str
seconds_between_callbacks: float
trajectory_trace: bool
+ def __copy__(self) -> SimRobotConfig: ...
+ def __deepcopy__(self, arg0: dict) -> SimRobotConfig: ...
def __init__(self) -> None: ...
def add_id(self, id: str) -> None: ...
diff --git a/python/rcs/envs/base.py b/python/rcs/envs/base.py
index 1518909d..9ae52a89 100644
--- a/python/rcs/envs/base.py
+++ b/python/rcs/envs/base.py
@@ -12,6 +12,7 @@
from rcs.envs.space_utils import (
ActObsInfoWrapper,
RCSpaceType,
+ Vec1Type,
Vec6Type,
Vec7Type,
Vec18Type,
@@ -99,12 +100,12 @@ class LimitedJointsRelDictType(RCSpaceType):
class GripperDictType(RCSpaceType):
# 0 for closed, 1 for open (>=0.5 for open)
- gripper: Annotated[float, gym.spaces.Box(low=0, high=1, dtype=np.float32)]
+ gripper: Annotated[Vec1Type, gym.spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)]
class HandBinDictType(RCSpaceType):
# 0 for closed, 1 for open (>=0.5 for open)
- gripper: Annotated[float, gym.spaces.Box(low=0, high=1, dtype=np.float32)]
+ gripper: Annotated[Vec1Type, gym.spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)]
class HandVecDictType(RCSpaceType):
@@ -168,6 +169,9 @@ class ArmObsType(TQuatDictType, JointsDictType, TRPYDictType): ...
LimitedCartOrJointContType: TypeAlias = LimitedTQuatRelDictType | LimitedJointsRelDictType | LimitedTRPYRelDictType
+class ArmWithGripper(TQuatDictType, GripperDictType): ...
+
+
class ControlMode(Enum):
JOINTS = auto()
CARTESIAN_TRPY = auto()
@@ -310,9 +314,28 @@ def close(self):
class MultiRobotWrapper(gym.Env):
"""Wraps a dictionary of environments to allow for multi robot control."""
- def __init__(self, envs: dict[str, gym.Env] | dict[str, gym.Wrapper]):
+ def __init__(
+ self, envs: dict[str, gym.Env] | dict[str, gym.Wrapper], robot2world: dict[str, common.Pose] | None = None
+ ):
self.envs = envs
self.unwrapped_multi = cast(dict[str, RobotEnv], {key: env.unwrapped for key, env in envs.items()})
+ if robot2world is None:
+ self.robot2world = {}
+ else:
+ self.robot2world = robot2world
+
+ def _translate_pose(self, key, dic, to_world=True):
+ r2w = self.robot2world.get(key, common.Pose())
+ if not to_world:
+ r2w = r2w.inverse()
+ if "tquat" in dic:
+ p = r2w * common.Pose(translation=dic["tquat"][:3], quaternion=dic["tquat"][3:]) * r2w.inverse()
+ dic["tquat"] = np.concatenate([p.translation(), p.rotation_q()])
+ if "xyzrpy" in dic:
+ p = r2w * common.Pose(translation=dic["xyzrpy"][:3], rpy_vector=dic["xyzrpy"][3:]) * r2w.inverse()
+ dic["xyzrpy"] = p.xyzrpy()
+
+ return dic
def step(self, action: dict[str, Any]) -> tuple[dict[str, Any], float, bool, bool, dict[str, Any]]:
# follows gym env by combinding a dict of envs into a single env
@@ -322,7 +345,11 @@ def step(self, action: dict[str, Any]) -> tuple[dict[str, Any], float, bool, boo
truncated = False
info = {}
for key, env in self.envs.items():
- obs[key], r, t, tr, info[key] = env.step(action[key])
+ act = self._translate_pose(key, action[key], to_world=False)
+ ob, r, t, tr, info[key] = env.step(act)
+ obs[key] = self._translate_pose(key, ob, to_world=True)
+ # old
+ # obs[key], r, t, tr, info[key] = env.step(action[key])
reward += float(r)
terminated = terminated or t
truncated = truncated or tr
@@ -358,6 +385,7 @@ def close(self):
class RelativeTo(Enum):
LAST_STEP = auto()
CONFIGURED_ORIGIN = auto()
+ NONE = auto()
class RelativeActionSpace(gym.ActionWrapper):
@@ -734,7 +762,7 @@ def action(self, action: dict[str, Any]) -> dict[str, Any]:
if self.binary:
self.gripper.grasp() if gripper_action == self.BINARY_GRIPPER_CLOSED else self.gripper.open()
else:
- self.gripper.set_normalized_width(next(gripper_action))
+ self.gripper.set_normalized_width(gripper_action[0])
self._last_gripper_cmd = gripper_action
del action[self.gripper_key]
return action
diff --git a/python/rcs/envs/creators.py b/python/rcs/envs/creators.py
index 64bcce19..4a693bad 100644
--- a/python/rcs/envs/creators.py
+++ b/python/rcs/envs/creators.py
@@ -1,3 +1,4 @@
+import copy
import logging
import typing
from functools import partial
@@ -146,33 +147,38 @@ def __call__( # type: ignore
max_relative_movement: float | tuple[float, float] | None = None,
relative_to: RelativeTo = RelativeTo.LAST_STEP,
sim_wrapper: Type[SimWrapper] | None = None,
+ robot2world: dict[str, rcs.common.Pose] | None = None,
) -> gym.Env:
simulation = sim.Sim(robot_cfg.mjcf_scene_path, sim_cfg)
ik = rcs.common.Pin(
robot_cfg.kinematic_model_path,
- robot_cfg.attachment_site,
+ robot_cfg.attachment_site + "_0",
urdf=robot_cfg.kinematic_model_path.endswith(".urdf"),
)
# ik = rcs_robotics_library._core.rl.RoboticsLibraryIK(robot_cfg.kinematic_model_path)
robots: dict[str, rcs.sim.SimRobot] = {}
- for key in name2id:
- robots[key] = rcs.sim.SimRobot(sim=simulation, ik=ik, cfg=robot_cfg)
+ for key, mid in name2id.items():
+ cfg = copy.copy(robot_cfg)
+ cfg.add_id(mid)
+ robots[key] = rcs.sim.SimRobot(sim=simulation, ik=ik, cfg=cfg)
envs = {}
- for key in name2id:
+ for key, mid in name2id.items():
env: gym.Env = RobotEnv(robots[key], control_mode)
- env = RobotSimWrapper(env, simulation, sim_wrapper)
if gripper_cfg is not None:
- gripper = rcs.sim.SimGripper(simulation, gripper_cfg)
+ gripper_cfg_copy = copy.copy(gripper_cfg)
+ gripper_cfg_copy.add_id(mid)
+ gripper = rcs.sim.SimGripper(simulation, gripper_cfg_copy)
env = GripperWrapper(env, gripper, binary=True)
- if max_relative_movement is not None:
+ if relative_to != RelativeTo.NONE:
env = RelativeActionSpace(env, max_mov=max_relative_movement, relative_to=relative_to)
envs[key] = env
- env = MultiRobotWrapper(envs)
+ env = MultiRobotWrapper(envs, robot2world)
+ env = RobotSimWrapper(env, simulation, sim_wrapper)
if cameras is not None:
camera_set = typing.cast(
BaseCameraSet, SimCameraSet(simulation, cameras, physical_units=True, render_on_demand=True)
diff --git a/python/rcs/envs/sim.py b/python/rcs/envs/sim.py
index 589b49fd..1f157d50 100644
--- a/python/rcs/envs/sim.py
+++ b/python/rcs/envs/sim.py
@@ -68,6 +68,7 @@ def step(self, action: dict[str, Any]) -> tuple[dict[str, Any], float, bool, boo
# truncate episode if collision
obs.update(self.unwrapped.get_obs())
return obs, 0, False, info["collision"] or not state.ik_success, info
+ # return obs, 0, False, False, info
def reset(
self, *, seed: int | None = None, options: dict[str, Any] | None = None
diff --git a/python/rcs/envs/space_utils.py b/python/rcs/envs/space_utils.py
index 40c316d1..6f25fabe 100644
--- a/python/rcs/envs/space_utils.py
+++ b/python/rcs/envs/space_utils.py
@@ -17,6 +17,7 @@
M = TypeVar("M", bound=int)
VecType: TypeAlias = np.ndarray[M, np.dtype[np.float64]]
+Vec1Type: TypeAlias = np.ndarray[tuple[Literal[1]], np.dtype[np.float64]]
Vec7Type: TypeAlias = np.ndarray[tuple[Literal[7]], np.dtype[np.float64]]
Vec3Type: TypeAlias = np.ndarray[tuple[Literal[3]], np.dtype[np.float64]]
Vec6Type: TypeAlias = np.ndarray[tuple[Literal[6]], np.dtype[np.float64]]
diff --git a/python/rcs/envs/utils.py b/python/rcs/envs/utils.py
index f756670f..18b18639 100644
--- a/python/rcs/envs/utils.py
+++ b/python/rcs/envs/utils.py
@@ -21,8 +21,8 @@ def default_sim_robot_cfg(scene: str = "fr3_empty_world", idx: str = "0") -> sim
robot_cfg.robot_type = scene_cfg.robot_type
robot_cfg.tcp_offset = common.Pose(common.FrankaHandTCPOffset())
robot_cfg.add_id(idx)
- if scene_cfg.mjb is not None:
- robot_cfg.mjcf_scene_path = scene_cfg.mjb
+ if (mjb := rcs.scenes[scene].mjb) is not None:
+ robot_cfg.mjcf_scene_path = mjb
else:
robot_cfg.mjcf_scene_path = scene_cfg.mjcf_scene
robot_cfg.kinematic_model_path = scene_cfg.mjcf_robot
diff --git a/python/rcs/operator/__init__.py b/python/rcs/operator/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/python/rcs/operator/gello.py b/python/rcs/operator/gello.py
new file mode 100644
index 00000000..418c736b
--- /dev/null
+++ b/python/rcs/operator/gello.py
@@ -0,0 +1,440 @@
+import contextlib
+import copy
+import logging
+import threading
+import time
+from dataclasses import dataclass, field
+from typing import Any, ClassVar, Dict, Iterator, List, Sequence, Tuple
+
+import numpy as np
+
+try:
+ from dynamixel_sdk.group_sync_read import GroupSyncRead
+ from dynamixel_sdk.group_sync_write import GroupSyncWrite
+ from dynamixel_sdk.packet_handler import PacketHandler
+ from dynamixel_sdk.port_handler import PortHandler
+ from dynamixel_sdk.robotis_def import COMM_SUCCESS
+
+ HAS_DYNAMIXEL_SDK = True
+except ImportError:
+ HAS_DYNAMIXEL_SDK = False
+
+try:
+ from pynput import keyboard
+
+ HAS_PYNPUT = True
+except ImportError:
+ HAS_PYNPUT = False
+
+from rcs.envs.base import ControlMode, RelativeTo
+from rcs.operator.interface import BaseOperator, BaseOperatorConfig, TeleopCommands
+from rcs.sim.sim import Sim
+from rcs.utils import SimpleFrameRate
+
+logger = logging.getLogger(__name__)
+
+# --- Dynamixel Driver Constants and Helpers ---
+
+XL330_CONTROL_TABLE = {
+ "model_number": {"addr": 0, "len": 2},
+ "operating_mode": {"addr": 11, "len": 1},
+ "torque_enable": {"addr": 64, "len": 1},
+ "kp_d": {"addr": 80, "len": 2},
+ "kp_i": {"addr": 82, "len": 2},
+ "kp_p": {"addr": 84, "len": 2},
+ "goal_current": {"addr": 102, "len": 2},
+ "goal_position": {"addr": 116, "len": 4},
+ "present_position": {"addr": 132, "len": 4},
+}
+
+
+class DynamixelDriver:
+ """Simplified Dynamixel driver adapted for RCS."""
+
+ def __init__(
+ self,
+ ids: Sequence[int],
+ port: str = "/dev/ttyUSB0",
+ baudrate: int = 57600,
+ pulses_per_revolution: int = 4095,
+ ):
+ if not HAS_DYNAMIXEL_SDK:
+ msg = "dynamixel_sdk is not installed. Please install it to use GelloOperator."
+ raise ImportError(msg)
+
+ self._ids = ids
+ self._port = port
+ self._baudrate = baudrate
+ self._pulses_per_revolution = pulses_per_revolution
+ self._lock = threading.Lock()
+ self._buffered_joint_positions: np.ndarray | None = None
+
+ self._portHandler = PortHandler(self._port)
+ self._packetHandler = PacketHandler(2.0)
+
+ self._groupSyncReadHandlers = {}
+ self._groupSyncWriteHandlers = {}
+
+ for key, entry in XL330_CONTROL_TABLE.items():
+ self._groupSyncReadHandlers[key] = GroupSyncRead(
+ self._portHandler, self._packetHandler, entry["addr"], entry["len"]
+ )
+ for dxl_id in self._ids:
+ self._groupSyncReadHandlers[key].addParam(dxl_id)
+
+ if key not in {"model_number", "present_position"}:
+ self._groupSyncWriteHandlers[key] = GroupSyncWrite(
+ self._portHandler, self._packetHandler, entry["addr"], entry["len"]
+ )
+
+ if not self._portHandler.openPort():
+ msg = f"Failed to open port {self._port}"
+ raise ConnectionError(msg)
+ if not self._portHandler.setBaudRate(self._baudrate):
+ msg = f"Failed to set baudrate {self._baudrate}"
+ raise ConnectionError(msg)
+
+ self._stop_thread = threading.Event()
+ self._polling_thread: threading.Thread | None = None
+ self._is_polling = False
+
+ def write_value_by_name(self, name: str, values: Sequence[int | None]):
+ if len(values) != len(self._ids):
+ msg = f"The length of {name} must match the number of servos"
+ raise ValueError(msg)
+
+ handler = self._groupSyncWriteHandlers[name]
+ value_len = XL330_CONTROL_TABLE[name]["len"]
+
+ with self._lock:
+ for dxl_id, value in zip(self._ids, values, strict=False):
+ if value is None:
+ continue
+ param = [(value >> (8 * i)) & 0xFF for i in range(value_len)]
+ handler.addParam(dxl_id, param)
+
+ comm_result = handler.txPacket()
+ if comm_result != COMM_SUCCESS:
+ handler.clearParam()
+ msg = f"Failed to syncwrite {name}: {self._packetHandler.getTxRxResult(comm_result)}"
+ raise RuntimeError(msg)
+ handler.clearParam()
+
+ def read_value_by_name(self, name: str) -> List[int]:
+ handler = self._groupSyncReadHandlers[name]
+ value_len = XL330_CONTROL_TABLE[name]["len"]
+ addr = XL330_CONTROL_TABLE[name]["addr"]
+
+ with self._lock:
+ comm_result = handler.txRxPacket()
+ if comm_result != COMM_SUCCESS:
+ msg = f"Failed to sync read {name}: {self._packetHandler.getTxRxResult(comm_result)}"
+ raise RuntimeError(msg)
+
+ values = []
+ for dxl_id in self._ids:
+ if handler.isAvailable(dxl_id, addr, value_len):
+ value = handler.getData(dxl_id, addr, value_len)
+ value = int(np.int32(np.uint32(value)))
+ values.append(value)
+ else:
+ msg = f"Failed to get {name} for ID {dxl_id}"
+ raise RuntimeError(msg)
+ return values
+
+ def start_joint_polling(self):
+ if self._is_polling:
+ return
+ self._stop_thread.clear()
+ self._polling_thread = threading.Thread(target=self._joint_polling_loop, daemon=True)
+ self._polling_thread.start()
+ self._is_polling = True
+
+ def stop_joint_polling(self):
+ if not self._is_polling:
+ return
+ self._stop_thread.set()
+ if self._polling_thread:
+ self._polling_thread.join()
+ self._is_polling = False
+
+ def _joint_polling_loop(self):
+ while not self._stop_thread.is_set():
+ time.sleep(0.001)
+ try:
+ self._buffered_joint_positions = np.array(self.read_value_by_name("present_position"), dtype=int)
+ except RuntimeError as e:
+ logger.warning(f"Polling error: {e}")
+
+ def get_joints(self) -> np.ndarray:
+ if self._is_polling:
+ while self._buffered_joint_positions is None:
+ time.sleep(0.01)
+ return self._pulses_to_rad(self._buffered_joint_positions.copy())
+ return self._pulses_to_rad(np.array(self.read_value_by_name("present_position"), dtype=int))
+
+ def _pulses_to_rad(self, pulses) -> np.ndarray:
+ return np.array(pulses) / self._pulses_per_revolution * 2 * np.pi
+
+ def _rad_to_pulses(self, rad: float) -> int:
+ return int(rad / (2 * np.pi) * self._pulses_per_revolution)
+
+ def close(self):
+ self.stop_joint_polling()
+ if self._portHandler:
+ self._portHandler.closePort()
+
+
+# --- Gello Hardware Interface Logic ---
+
+
+@dataclass
+class GelloArmConfig:
+ com_port: str = "/dev/ttyUSB0"
+ num_arm_joints: int = 7
+ joint_signs: List[int] = field(default_factory=lambda: [1, -1, 1, -1, 1, 1, 1])
+ gripper: bool = True
+ gripper_range_rad: List[float] = field(default_factory=lambda: [2.23, 3.22])
+ assembly_offsets: List[float] = field(default_factory=lambda: [0.000, 0.000, 3.142, 3.142, 3.142, 4.712, 0.000])
+ dynamixel_kp_p: List[int] = field(default_factory=lambda: [30, 60, 0, 30, 0, 0, 0, 50])
+ dynamixel_kp_i: List[int] = field(default_factory=lambda: [0, 0, 0, 0, 0, 0, 0, 0])
+ dynamixel_kp_d: List[int] = field(default_factory=lambda: [250, 100, 80, 60, 30, 10, 5, 0])
+ dynamixel_torque_enable: List[int] = field(default_factory=lambda: [0, 0, 0, 0, 0, 0, 0, 0])
+ dynamixel_goal_position: List[float] = field(
+ default_factory=lambda: [0.0, 0.0, 0.0, -1.571, 0.0, 1.571, 0.0, 3.509]
+ )
+
+
+@dataclass
+class DynamixelControlConfig:
+ kp_p: List[int] = field(default_factory=list)
+ kp_i: List[int] = field(default_factory=list)
+ kp_d: List[int] = field(default_factory=list)
+ torque_enable: List[int] = field(default_factory=list)
+ goal_position: List[int] = field(default_factory=list)
+ goal_current: List[int] = field(default_factory=list)
+ operating_mode: List[int] = field(default_factory=list)
+
+ _UPDATE_ORDER: ClassVar[list[str]] = [
+ "operating_mode",
+ "goal_current",
+ "kp_p",
+ "kp_i",
+ "kp_d",
+ "torque_enable",
+ "goal_position",
+ ]
+
+ def __iter__(self) -> Iterator[Tuple[str, List[int]]]:
+ for param_name in self._UPDATE_ORDER:
+ if hasattr(self, param_name):
+ yield param_name, getattr(self, param_name)
+
+ def __getitem__(self, param_name: str) -> List[int]:
+ return getattr(self, param_name)
+
+ def __setitem__(self, param_name: str, value: List[int]) -> None:
+ setattr(self, param_name, value)
+
+
+class GelloHardware:
+ JOINT_POSITION_LIMITS = np.array(
+ [
+ [-2.9007, 2.9007],
+ [-1.8361, 1.8361],
+ [-2.9007, 2.9007],
+ [-3.0770, -0.1169],
+ [-2.8763, 2.8763],
+ [0.4398, 4.6216],
+ [-3.0508, 3.0508],
+ ]
+ )
+ MID_JOINT_POSITIONS = JOINT_POSITION_LIMITS.mean(axis=1)
+ OPERATING_MODE = 5
+ CURRENT_LIMIT = 600
+
+ def __init__(self, config: GelloArmConfig):
+ self._com_port = config.com_port
+ self._num_arm_joints = config.num_arm_joints
+ self._joint_signs = np.array(config.joint_signs)
+ self._gripper = config.gripper
+ self._num_total_joints = self._num_arm_joints + (1 if self._gripper else 0)
+ self._gripper_range_rad = config.gripper_range_rad
+ self._assembly_offsets = np.array(config.assembly_offsets)
+
+ self._driver = DynamixelDriver(
+ ids=list(range(1, self._num_total_joints + 1)),
+ port=self._com_port,
+ )
+
+ self._initial_arm_joints_raw = self._driver.get_joints()[: self._num_arm_joints]
+ initial_arm_joints = self.normalize_joint_positions(
+ self._initial_arm_joints_raw, self._assembly_offsets, self._joint_signs
+ )
+ self._prev_arm_joints_raw = self._initial_arm_joints_raw.copy()
+ self._prev_arm_joints = initial_arm_joints.copy()
+
+ self._dynamixel_control_config = DynamixelControlConfig(
+ kp_p=config.dynamixel_kp_p.copy(),
+ kp_i=config.dynamixel_kp_i.copy(),
+ kp_d=config.dynamixel_kp_d.copy(),
+ torque_enable=config.dynamixel_torque_enable.copy(),
+ goal_position=self._goal_position_to_pulses(config.dynamixel_goal_position).copy(),
+ goal_current=[self.CURRENT_LIMIT] * self._num_total_joints,
+ operating_mode=[self.OPERATING_MODE] * self._num_total_joints,
+ )
+
+ self._initialize_parameters()
+ self._driver.start_joint_polling()
+
+ @staticmethod
+ def normalize_joint_positions(raw, offsets, signs):
+ return (
+ np.mod((raw - offsets) * signs - GelloHardware.MID_JOINT_POSITIONS, 2 * np.pi)
+ - np.pi
+ + GelloHardware.MID_JOINT_POSITIONS
+ )
+
+ def _initialize_parameters(self):
+ for name, value in self._dynamixel_control_config:
+ self._driver.write_value_by_name(name, value)
+ time.sleep(0.1)
+
+ def get_joint_and_gripper_positions(self) -> Tuple[np.ndarray, float]:
+ joints_raw = self._driver.get_joints()
+ arm_joints_raw = joints_raw[: self._num_arm_joints]
+
+ arm_joints_delta = (arm_joints_raw - self._prev_arm_joints_raw) * self._joint_signs
+ arm_joints = self._prev_arm_joints + arm_joints_delta
+ self._prev_arm_joints = arm_joints.copy()
+ self._prev_arm_joints_raw = arm_joints_raw.copy()
+
+ arm_joints_clipped = np.clip(arm_joints, self.JOINT_POSITION_LIMITS[:, 0], self.JOINT_POSITION_LIMITS[:, 1])
+
+ gripper_pos = 0.0
+ if self._gripper:
+ raw_grp = joints_raw[-1]
+ gripper_pos = (raw_grp - self._gripper_range_rad[0]) / (
+ self._gripper_range_rad[1] - self._gripper_range_rad[0]
+ )
+ gripper_pos = max(0.0, min(1.0, gripper_pos))
+
+ return arm_joints_clipped, gripper_pos
+
+ def _goal_position_to_pulses(self, goals):
+ arm_goals = np.array(goals[: self._num_arm_joints])
+ initial_rotations = np.floor_divide(
+ self._initial_arm_joints_raw - self._assembly_offsets - self.MID_JOINT_POSITIONS, 2 * np.pi
+ )
+ arm_goals_raw = (initial_rotations * 2 * np.pi + arm_goals + self._assembly_offsets) * self._joint_signs + np.pi
+ goals_raw = np.append(arm_goals_raw, goals[-1]) if self._gripper else arm_goals_raw
+ return [self._driver._rad_to_pulses(rad) for rad in goals_raw]
+
+ def close(self):
+ with contextlib.suppress(Exception):
+ self._driver.write_value_by_name("torque_enable", [0] * self._num_total_joints)
+ self._driver.close()
+
+
+# --- RCS Operator Implementation ---
+
+
+class GelloOperator(BaseOperator):
+ control_mode = (ControlMode.JOINTS, RelativeTo.NONE)
+
+ def __init__(self, config: "GelloConfig", sim: Sim | None = None):
+ super().__init__(config, sim)
+ self.config: GelloConfig
+ self._resource_lock = threading.Lock()
+ self._cmd_lock = threading.Lock()
+
+ self._exit_requested = False
+ self._commands = TeleopCommands()
+
+ self.controller_names = list(self.config.arms.keys())
+
+ self._last_joints: Dict[str, np.ndarray | None] = {name: None for name in self.controller_names}
+ self._last_gripper = {name: 1.0 for name in self.controller_names}
+ self._hws: Dict[str, GelloHardware] = {}
+
+ if HAS_PYNPUT:
+ self._listener = keyboard.Listener(on_press=self._on_press)
+ self._listener.start()
+ else:
+ logger.warning("pynput not found. Keyboard triggers disabled.")
+
+ def _on_press(self, key):
+ try:
+ if hasattr(key, "char"):
+ if key.char == "s":
+ with self._cmd_lock:
+ self._commands.sync_position = True
+ elif key.char == "r":
+ with self._cmd_lock:
+ self._commands.failure = True
+ except AttributeError:
+ pass
+
+ def consume_commands(self) -> TeleopCommands:
+ with self._cmd_lock:
+ cmds = copy.copy(self._commands)
+ self._commands = TeleopCommands()
+ return cmds
+
+ def reset_operator_state(self):
+ # GELLO is absolute, no internal state to reset typically
+ pass
+
+ def consume_action(self) -> Dict[str, Any]:
+ actions: Dict[str, Any] = {}
+ with self._resource_lock:
+ for name in self.controller_names:
+ joints = self._last_joints[name]
+ if joints is not None:
+ actions[name] = {
+ "joints": joints.copy(),
+ "gripper": np.array([self._last_gripper[name]]),
+ }
+ return actions
+
+ def run(self):
+ # Initialize all hardware instances
+ for name, arm_cfg in self.config.arms.items():
+ try:
+ self._hws[name] = GelloHardware(arm_cfg)
+ except Exception as e:
+ logger.error(f"Failed to initialize GELLO hardware for {name}: {e}")
+
+ if not self._hws:
+ logger.error("No GELLO hardware initialized. Exiting.")
+ return
+
+ rate_limiter = SimpleFrameRate(self.config.read_frequency, "gello readout")
+
+ while not self._exit_requested:
+ for name, hw in self._hws.items():
+ try:
+ joints, gripper = hw.get_joint_and_gripper_positions()
+ with self._resource_lock:
+ self._last_joints[name] = joints
+ self._last_gripper[name] = gripper
+ except Exception as e:
+ logger.warning(f"Error reading GELLO {name}: {e}")
+
+ rate_limiter()
+
+ def close(self):
+ self._exit_requested = True
+ if HAS_PYNPUT and hasattr(self, "_listener"):
+ self._listener.stop()
+ for hw in self._hws.values():
+ hw.close()
+ if self.is_alive() and threading.current_thread() != self:
+ self.join(timeout=1.0)
+
+
+@dataclass(kw_only=True)
+class GelloConfig(BaseOperatorConfig):
+ operator_class: type[BaseOperator] = field(default=GelloOperator)
+ # Dictionary for multi-arm setups: {"left": GelloArmConfig(...), "right": GelloArmConfig(...)}
+ arms: Dict[str, GelloArmConfig] = field(default_factory=lambda: {"right": GelloArmConfig()})
diff --git a/python/rcs/operator/interface.py b/python/rcs/operator/interface.py
new file mode 100644
index 00000000..e967d60e
--- /dev/null
+++ b/python/rcs/operator/interface.py
@@ -0,0 +1,228 @@
+import copy
+import logging
+import threading
+import time
+from abc import ABC
+from dataclasses import dataclass, field
+from time import sleep
+
+import gymnasium as gym
+from rcs.envs.base import ArmWithGripper, ControlMode, RelativeTo
+from rcs.sim.sim import Sim
+from rcs.utils import SimpleFrameRate
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class TeleopCommands:
+ """Semantic commands decoupled from specific hardware buttons."""
+
+ record: bool = False
+ success: bool = False
+ failure: bool = False
+ sync_position: bool = False
+ reset_origin_to_current: dict[str, bool] = field(default_factory=dict)
+
+
+class BaseOperator(ABC, threading.Thread):
+ control_mode: tuple[ControlMode, RelativeTo]
+ controller_names: list[str]
+
+ def __init__(self, config: "BaseOperatorConfig", sim: Sim | None = None):
+ threading.Thread.__init__(self)
+ self.config = config
+ self.sim = sim
+
+ def consume_commands(self) -> TeleopCommands:
+ """Returns the current commands and resets them (edge-triggered). Must be thread-safe."""
+ raise NotImplementedError()
+
+ def reset_operator_state(self):
+ """Hook for subclasses to reset their internal poses/offsets on env reset. Must be thread-safe."""
+
+ def run(self):
+ """Read out hardware, set states and process buttons."""
+ raise NotImplementedError()
+
+ def consume_action(self) -> dict[str, ArmWithGripper]:
+ """Returns the action dictionary to step the environment. Must be thread-safe."""
+ raise NotImplementedError()
+
+ def close(self):
+ pass
+
+
+@dataclass(kw_only=True)
+class BaseOperatorConfig:
+ operator_class: type[BaseOperator]
+ read_frequency: int = 30
+ simulation: bool = True
+
+
+class TeleopLoop:
+ """Interface for an operator device"""
+
+ # Define this as a class attribute so it can be accessed without instantiating
+ control_mode: tuple[ControlMode, RelativeTo]
+
+ def __init__(
+ self,
+ env: gym.Env,
+ operator: BaseOperator,
+ env_frequency: int = 30,
+ key_translation: dict[str, str] | None = None,
+ ):
+ super().__init__()
+ self.env = env
+ self.operator = operator
+ self._exit_requested = False
+ self.env_frequency = env_frequency
+ if key_translation is None:
+ # controller to robot translation
+ self.key_translation = {key: key for key in self.operator.controller_names}
+ else:
+ self.key_translation = key_translation
+
+ # Absolute operators (RelativeTo.NONE) need an initial sync
+ self._synced = self.operator.control_mode[1] != RelativeTo.NONE
+
+ def stop(self):
+ self.operator.close()
+ self._exit_requested = True
+ self.operator.join()
+
+ def __enter__(self):
+ self.operator.start()
+ # sleep(2)
+ return self
+
+ def __exit__(self, *_):
+ self.stop()
+
+ def _translate_keys(self, actions):
+ translated = {self.key_translation[key]: actions[key] for key in actions}
+ # Fill in missing robots with "hold" actions from last observation
+ # This is necessary because absolute environments (like MultiRobotWrapper)
+ # require actions for all configured robots in every step.
+ for robot_name in self.env.get_wrapper_attr("envs"):
+ if robot_name not in translated and robot_name in self._last_obs:
+ translated[robot_name] = {
+ "joints": self._last_obs[robot_name]["joints"].copy(),
+ "gripper": self._last_obs[robot_name].get("gripper", 1.0),
+ }
+ return translated
+
+ def environment_step_loop(self):
+ rate_limiter = SimpleFrameRate(self.env_frequency, "env loop")
+
+ # 0. Initial Reset to get current positions for untracked robots
+ self._last_obs, _ = self.env.reset()
+
+ while True:
+ if self._exit_requested:
+ break
+
+ # 1. Process Meta-Commands
+ cmds = self.operator.consume_commands()
+
+ if cmds.record:
+ print("Command: Start Recording")
+ self.env.get_wrapper_attr("start_record")()
+
+ if cmds.success:
+ print("Command: Success! Resetting env...")
+ self.env.get_wrapper_attr("success")()
+ sleep(1) # sleep to let the robot reach the goal
+ self._last_obs, _ = self.env.reset()
+ self.operator.reset_operator_state()
+ self._synced = self.operator.control_mode[1] != RelativeTo.NONE
+ # consume new commands because of potential origin reset
+ continue
+
+ if cmds.failure:
+ print("Command: Failure! Resetting env...")
+ self._last_obs, _ = self.env.reset()
+ self.operator.reset_operator_state()
+ self._synced = self.operator.control_mode[1] != RelativeTo.NONE
+ # consume new commands because of potential origin reset
+ continue
+
+ if cmds.sync_position:
+ self.sync_robot_to_operator()
+ self._synced = True
+ continue
+
+ if not self._synced:
+ # Still waiting for sync, step the env with "hold" actions
+ if int(time.time()) % 5 == 0 and int(time.time() * self.env_frequency) % self.env_frequency == 0:
+ print("Waiting for sync... (Press 's' on GELLO/Keyboard to sync)")
+
+ hold_actions = {}
+ for robot_name in self.env.get_wrapper_attr("envs"):
+ if robot_name in self._last_obs and "joints" in self._last_obs[robot_name]:
+ hold_actions[robot_name] = {
+ "joints": self._last_obs[robot_name]["joints"].copy(),
+ "gripper": self._last_obs[robot_name].get("gripper", 1.0),
+ }
+
+ self._last_obs, _, _, _, _ = self.env.step(hold_actions)
+ rate_limiter()
+ continue
+
+ for controller in cmds.reset_origin_to_current:
+ if cmds.reset_origin_to_current[controller]:
+ robot = self.key_translation[controller]
+ print(f"Command: Resetting origin for {robot}...")
+ assert (
+ self.operator.control_mode[1]
+ == RelativeTo.CONFIGURED_ORIGIN
+ # TODO the following is a dict and can thus not easily be used like this
+ # and self.env.get_wrapper_attr("relative_to") == RelativeTo.CONFIGURED_ORIGIN
+ ), "both robot env and operator must be configured to relative_to.CONFIGURED_ORIGIN"
+ self.env.get_wrapper_attr("envs")[robot].set_origin_to_current()
+
+ # 2. Step the Environment
+ actions = self.operator.consume_action()
+ actions = self._translate_keys(actions)
+ self._last_obs, _, _, _, _ = self.env.step(actions)
+
+ rate_limiter()
+
+ def sync_robot_to_operator(self, duration: float = 3.0):
+ print(f"Command: Syncing robot to operator (duration: {duration}s)...")
+ rate_limiter = SimpleFrameRate(self.env_frequency, "sync loop")
+ num_steps = int(duration * self.env_frequency)
+
+ # 1. Capture the initial state for interpolation
+ start_obs = copy.deepcopy(self._last_obs)
+
+ # 2. Interpolation Loop
+ for i in range(num_steps):
+ alpha = (i + 1) / num_steps
+ # Re-consume operator action to follow moving target!
+ target_actions = self._translate_keys(self.operator.consume_action())
+
+ interp_actions = {}
+ for robot_name, target in target_actions.items():
+ try:
+ # Interpolate from FIXED start towards MOVING target
+ s_joints = start_obs[robot_name]["joints"]
+ t_joints = target["joints"]
+ interp_joints = s_joints + alpha * (t_joints - s_joints)
+
+ s_gripper = start_obs[robot_name].get("gripper", 1.0)
+ t_gripper = target.get("gripper", 1.0)
+ interp_gripper = s_gripper + alpha * (t_gripper - s_gripper)
+
+ interp_actions[robot_name] = {
+ "joints": interp_joints,
+ "gripper": interp_gripper,
+ }
+ except (KeyError, TypeError):
+ continue
+
+ self._last_obs, _, _, _, _ = self.env.step(interp_actions)
+ rate_limiter()
+
+ print("Sync Complete.")
diff --git a/python/rcs/operator/pedals.py b/python/rcs/operator/pedals.py
new file mode 100644
index 00000000..2d529a92
--- /dev/null
+++ b/python/rcs/operator/pedals.py
@@ -0,0 +1,116 @@
+import threading
+import time
+
+import evdev
+from evdev import ecodes
+
+
+class FootPedal:
+ def __init__(self, device_name_substring="Foot Switch"):
+ """Initializes the foot pedal and starts the background reading thread."""
+ self.device_path = self._find_device(device_name_substring)
+
+ if not self.device_path:
+ msg = f"Could not find a device matching '{device_name_substring}'"
+ raise FileNotFoundError(msg)
+
+ self.device = evdev.InputDevice(self.device_path)
+ self.device.grab() # Prevent events from leaking into the OS/terminal
+
+ # Dictionary to hold the current state of each key.
+ # True = Pressed/Held, False = Released
+ self._key_states = {}
+ self._lock = threading.Lock()
+
+ # Start the background thread
+ self._running = True
+ self._thread = threading.Thread(target=self._read_events, daemon=True)
+ self._thread.start()
+ print(f"Connected to {self.device.name} at {self.device_path}")
+
+ def _find_device(self, substring):
+ """Finds the device path for the foot pedal."""
+ for path in evdev.list_devices():
+ dev = evdev.InputDevice(path)
+ if substring.lower() in dev.name.lower():
+ return path
+ return None
+
+ def _read_events(self):
+ """Background loop that updates the state dictionary."""
+ try:
+ for event in self.device.read_loop():
+ if not self._running:
+ break
+
+ if event.type == ecodes.EV_KEY:
+ key_event = evdev.categorize(event)
+
+ if isinstance(key_event, evdev.KeyEvent):
+ with self._lock:
+ # keystate: 1 is DOWN, 2 is HOLD, 0 is UP
+ is_pressed = key_event.keystate in [1, 2]
+
+ # Store state using the string name of the key (e.g., 'KEY_A')
+ # If a key resolves to a list (rare, but happens in evdev), take the first one
+ key_name = key_event.keycode
+ if isinstance(key_name, list):
+ key_name = key_name[0]
+
+ self._key_states[key_name] = is_pressed
+
+ except OSError:
+ pass # Device disconnected or closed
+
+ def get_states(self):
+ """
+ Returns a snapshot of the latest key states.
+ Example return: {'KEY_A': True, 'KEY_B': False, 'KEY_C': False}
+ """
+ with self._lock:
+ # Return a copy to ensure thread safety
+ return self._key_states.copy()
+
+ def get_key_state(self, key_name):
+ """Returns the state of a specific key, defaulting to False if never pressed."""
+ with self._lock:
+ return self._key_states.get(key_name, False)
+
+ def close(self):
+ """Cleans up the device and stops the thread."""
+ self._running = False
+ try:
+ self.device.ungrab()
+ self.device.close()
+ except OSError:
+ pass
+
+
+# ==========================================
+# Example Usage
+# ==========================================
+if __name__ == "__main__":
+ try:
+ # Initialize the pedal
+ pedal = FootPedal("Foot Switch")
+
+ # Simulate a typical robotics control loop running at 10Hz
+ print("Starting control loop... Press Ctrl+C to exit.")
+ while True:
+ # Grab the latest states instantly without blocking
+ states = pedal.get_states()
+
+ if states:
+ # Print only the keys that are currently pressed
+ pressed_keys = [key for key, is_pressed in states.items() if is_pressed]
+ print(f"Currently pressed: {pressed_keys}")
+
+ # Your teleoperation logic goes here...
+
+ time.sleep(0.1) # 10Hz loop
+
+ except KeyboardInterrupt:
+ print("\nShutting down...")
+ finally:
+ if "pedal" in locals():
+ pedal.close()
diff --git a/python/rcs/operator/quest.py b/python/rcs/operator/quest.py
new file mode 100644
index 00000000..bcde3f64
--- /dev/null
+++ b/python/rcs/operator/quest.py
@@ -0,0 +1,237 @@
+import copy
+import logging
+import threading
+from dataclasses import dataclass, field
+from time import sleep
+
+import numpy as np
+from rcs._core.common import Pose
+from rcs.envs.base import ArmWithGripper, ControlMode, RelativeTo
+from rcs.operator.interface import BaseOperator, BaseOperatorConfig, TeleopCommands
+from rcs.sim.sim import Sim
+from rcs.utils import SimpleFrameRate
+
+try:
+ from simpub.core.simpub_server import SimPublisher
+ from simpub.parser.simdata import SimObject, SimScene
+ from simpub.xr_device.meta_quest3 import MetaQuest3
+
+ HAS_SIMPUB = True
+except ImportError:
+ HAS_SIMPUB = False
+
+logger = logging.getLogger(__name__)
+
+# download the iris apk from the following repo release: https://github.com/intuitive-robots/IRIS-Meta-Quest3
+# in order to use usb connection install adb install adb
+# sudo apt install android-tools-adb
+# install it on your quest with
+# adb install IRIS-Meta-Quest3.apk
+
+if HAS_SIMPUB:
+
+ class FakeSimPublisher(SimPublisher):
+ def get_update(self):
+ return {}
+
+ class FakeSimScene(SimScene):
+ def __init__(self):
+ super().__init__()
+ self.root = SimObject(name="root")
+
+
+class QuestOperator(BaseOperator):
+
+ control_mode = (ControlMode.CARTESIAN_TQuat, RelativeTo.CONFIGURED_ORIGIN)
+ controller_names: list[str] = ["left", "right"] # noqa: RUF012
+
+ def __init__(self, config: "QuestConfig", sim: Sim | None = None):
+ super().__init__(config, sim)
+ if not HAS_SIMPUB:
+ msg = "simpub is not installed. Please install it to use QuestOperator."
+ raise ImportError(msg)
+
+ self.config: QuestConfig
+
+ self._resource_lock = threading.Lock()
+ self._cmd_lock = threading.Lock()
+
+ self._trg_btn = {"left": "index_trigger", "right": "index_trigger"}
+ self._grp_btn = {"left": "hand_trigger", "right": "hand_trigger"}
+ self._start_btn = "A"
+ self._stop_btn = "B"
+ self._unsuccessful_btn = "Y"
+
+ self._prev_data = None
+ self._exit_requested = False
+ self._grp_pos = {key: 1.0 for key in self.controller_names} # start with opened gripper
+ self._last_controller_pose = {key: Pose() for key in self.controller_names}
+ self._offset_pose = {key: Pose() for key in self.controller_names}
+
+ self._commands = TeleopCommands()
+ self._reset_origin_to_current()
+
+ self._step_env = False
+ self._set_frame = {key: Pose() for key in self.controller_names}
+ # if self.config.simulation:
+ # self._publisher = MujocoPublisher(self.sim.model, self.sim.data, self.config.mq3_addr, visible_geoms_groups=list(range(1, 3)))
+ if not self.config.simulation:
+ self._publisher = FakeSimPublisher(FakeSimScene(), self.config.mq3_addr)
+ # robot_cfg = default_sim_robot_cfg("fr3_empty_world")
+ # sim_cfg = SimConfig()
+ # sim_cfg.async_control = True
+ # twin_env = SimMultiEnvCreator()(
+ # name2id=ROBOT2IP,
+ # robot_cfg=robot_cfg,
+ # control_mode=ControlMode.JOINTS,
+ # gripper_cfg=default_sim_gripper_cfg(),
+ # sim_cfg=sim_cfg,
+ # )
+ # sim = env_rel.unwrapped.envs[ROBOT2IP.keys().__iter__().__next__()].sim
+ # sim.open_gui()
+ # MujocoPublisher(sim.model, sim.data, MQ3_ADDR, visible_geoms_groups=list(range(1, 3)))
+ # env_rel = DigitalTwin(env_rel, twin_env)
+ self._reader = MetaQuest3("RCSNode")
+
+ def _reset_origin_to_current(self, controller: str | None = None):
+ with self._cmd_lock:
+ if controller is None:
+ self._commands.reset_origin_to_current = {key: True for key in self.controller_names}
+ else:
+ self._commands.reset_origin_to_current[controller] = True
+
+ def _reset_state(self):
+ with self._resource_lock:
+ for controller in self.controller_names:
+ self._offset_pose[controller] = Pose()
+ self._last_controller_pose[controller] = Pose()
+ self._grp_pos[controller] = 1
+
+ def consume_commands(self) -> TeleopCommands:
+ # must be threadsafe
+ with self._cmd_lock:
+ cmds = copy.copy(self._commands)
+ self._commands = TeleopCommands()
+ return cmds
+
+ def reset_operator_state(self):
+ """Resets the hardware offsets when the environment resets."""
+ self._reset_state()
+ self._reset_origin_to_current()
+
+ def consume_action(self) -> dict[str, ArmWithGripper]:
+ transforms = {}
+ with self._resource_lock:
+ for controller in self.controller_names:
+ transform = Pose(
+ translation=(
+ self._last_controller_pose[controller].translation() # type: ignore
+ - self._offset_pose[controller].translation()
+ ),
+ quaternion=(
+ self._last_controller_pose[controller] * self._offset_pose[controller].inverse()
+ ).rotation_q(),
+ )
+
+ set_axes = Pose(quaternion=self._set_frame[controller].rotation_q())
+
+ transform = set_axes.inverse() * transform * set_axes
+ if not self.config.include_rotation:
+ transform = Pose(translation=transform.translation()) # identity rotation
+ transforms[controller] = ArmWithGripper(
+ tquat=np.concatenate([transform.translation(), transform.rotation_q()]),
+ gripper=np.array([self._grp_pos[controller]]),
+ )
+ return transforms
+
+ def close(self):
+ self._reader.disconnect()
+ self._publisher.shutdown()
+ self._exit_requested = True
+ self.join()
+
+ def run(self):
+ rate_limiter = SimpleFrameRate(self.config.read_frequency, "teleop readout")
+ warning_raised = False
+
+ while not self._exit_requested:
+ input_data = self._reader.get_controller_data()
+
+ if input_data is None:
+ if not warning_raised:
+ logger.warning("[Quest Reader] packets empty")
+ warning_raised = True
+ sleep(0.5)
+ continue
+
+ if warning_raised:
+ logger.warning("[Quest Reader] packets arriving again")
+ warning_raised = False
+
+ # === Update Semantic Commands ===
+ with self._cmd_lock:
+ if input_data[self._start_btn] and (self._prev_data is None or not self._prev_data[self._start_btn]):
+ self._commands.record = True
+
+ if input_data[self._stop_btn] and (self._prev_data is None or not self._prev_data[self._stop_btn]):
+ self._commands.success = True
+
+ if input_data[self._unsuccessful_btn] and (
+ self._prev_data is None or not self._prev_data[self._unsuccessful_btn]
+ ):
+ self._commands.failure = True
+
+ # === Update Poses & Grippers ===
+ for controller in self.controller_names:
+ last_controller_pose = Pose(
+ translation=np.array(input_data[controller]["pos"]),
+ quaternion=np.array(input_data[controller]["rot"]),
+ )
+ # if controller == "left":
+ # last_controller_pose = (
+ # Pose(translation=np.array([0, 0, 0]), rpy=RPY(roll=0, pitch=0, yaw=np.deg2rad(180))) # type: ignore
+ # * last_controller_pose
+ # )
+
+ if input_data[controller][self._trg_btn[controller]] and (
+ self._prev_data is None or not self._prev_data[controller][self._trg_btn[controller]]
+ ):
+ # trigger just pressed (first data sample with button pressed)
+
+ with self._resource_lock:
+ self._offset_pose[controller] = last_controller_pose
+ self._last_controller_pose[controller] = last_controller_pose
+
+ elif not input_data[controller][self._trg_btn[controller]] and (
+ self._prev_data is None or self._prev_data[controller][self._trg_btn[controller]]
+ ):
+ with self._resource_lock:
+ self._last_controller_pose[controller] = Pose()
+ self._offset_pose[controller] = Pose()
+ self._reset_origin_to_current(controller)
+
+ elif input_data[controller][self._trg_btn[controller]]:
+ # button is pressed
+ with self._resource_lock:
+ self._last_controller_pose[controller] = last_controller_pose
+
+ if input_data[controller][self._grp_btn[controller]] and (
+ self._prev_data is None or not self._prev_data[controller][self._grp_btn[controller]]
+ ):
+ # just pressed
+ self._grp_pos[controller] = 0
+ if not input_data[controller][self._grp_btn[controller]] and (
+ self._prev_data is None or self._prev_data[controller][self._grp_btn[controller]]
+ ):
+ # just released
+ self._grp_pos[controller] = 1
+
+ self._prev_data = input_data
+ rate_limiter()
+
+
+@dataclass(kw_only=True)
+class QuestConfig(BaseOperatorConfig):
+ operator_class: type[BaseOperator] = field(default=QuestOperator)
+ include_rotation: bool = True
+ mq3_addr: str = "10.42.0.1"
diff --git a/python/tests/test_sim_envs.py b/python/tests/test_sim_envs.py
index 364c1408..c997473d 100644
--- a/python/tests/test_sim_envs.py
+++ b/python/tests/test_sim_envs.py
@@ -98,7 +98,7 @@ def test_non_zero_action_trpy(self, cfg):
initial_pose = rcs.common.Pose(translation=np.array(t), quaternion=np.array(q))
xyzrpy = np.concatenate([t, initial_pose.rotation_rpy().as_vector()], axis=0)
non_zero_action = TRPYDictType(xyzrpy=np.array(xyzrpy))
- non_zero_action.update(GripperDictType(gripper=0))
+ non_zero_action.update(GripperDictType(gripper=np.array([0.0])))
expected_obs = obs_initial.copy()
expected_obs["tquat"][0] += x_pos_change
obs, _, _, _, info = env.step(non_zero_action)
@@ -113,7 +113,7 @@ def test_relative_zero_action_trpy(self, cfg, gripper_cfg):
obs_initial, _ = env.reset()
# action to be performed
zero_action = TRPYDictType(xyzrpy=np.array([0, 0, 0, 0, 0, 0], dtype=np.float32)) # type: ignore
- zero_action.update(GripperDictType(gripper=0))
+ zero_action.update(GripperDictType(gripper=np.array([0.0])))
obs, _, _, _, info = env.step(zero_action)
self.assert_no_pose_change(info, obs_initial, obs)
@@ -127,7 +127,7 @@ def test_relative_non_zero_action(self, cfg, gripper_cfg):
# action to be performed
x_pos_change = 0.2
non_zero_action = TRPYDictType(xyzrpy=np.array([x_pos_change, 0, 0, 0, 0, 0])) # type: ignore
- non_zero_action.update(GripperDictType(gripper=0))
+ non_zero_action.update(GripperDictType(gripper=np.array([0.0])))
expected_obs = obs_initial.copy()
expected_obs["tquat"][0] += x_pos_change
obs, _, _, _, info = env.step(non_zero_action)
@@ -146,7 +146,7 @@ def test_collision_trpy(self, cfg, gripper_cfg):
obs["xyzrpy"][0] = 0.4
obs["xyzrpy"][2] = -0.05
collision_action = TRPYDictType(xyzrpy=obs["xyzrpy"])
- collision_action.update(GripperDictType(gripper=0))
+ collision_action.update(GripperDictType(gripper=np.array([0.0])))
obs, _, _, _, info = env.step(collision_action)
self.assert_collision(info)
@@ -170,7 +170,7 @@ def test_collision_trpy(self, cfg, gripper_cfg):
# obs["xyzrpy"][0] = 0.4
# obs["xyzrpy"][2] = -0.05
# collision_action = TRPYDictType(xyzrpy=obs["xyzrpy"])
- # collision_action.update(GripperDictType(gripper=0))
+ # collision_action.update(GripperDictType(gripper=np.array([0.0])))
# _, _, _, _, info = env.step(collision_action)
# p2: np.ndarray = unwrapped.robot.get_joint_position()
# self.assert_collision(info)
@@ -245,7 +245,7 @@ def test_relative_zero_action_tquat(self, cfg, gripper_cfg):
)
obs_initial, _ = env_rel.reset()
zero_rel_action = TQuatDictType(tquat=np.array([0, 0, 0, 0, 0, 0, 1.0], dtype=np.float32)) # type: ignore
- zero_rel_action.update(GripperDictType(gripper=0))
+ zero_rel_action.update(GripperDictType(gripper=np.array([0.0])))
obs, _, _, _, info = env_rel.step(zero_rel_action)
self.assert_no_pose_change(info, obs_initial, obs)
@@ -266,7 +266,7 @@ def test_collision_tquat(self, cfg, gripper_cfg):
obs["tquat"][0] = 0.4
obs["tquat"][2] = -0.05
collision_action = TQuatDictType(tquat=obs["tquat"])
- collision_action.update(GripperDictType(gripper=0))
+ collision_action.update(GripperDictType(gripper=np.array([0.0])))
_, _, _, _, info = env.step(collision_action)
self.assert_collision(info)
@@ -290,7 +290,7 @@ def test_collision_tquat(self, cfg, gripper_cfg):
# obs["tquat"][0] = 0.4
# obs["tquat"][2] = -0.05
# collision_action = TQuatDictType(tquat=obs["tquat"])
- # collision_action.update(GripperDictType(gripper=0))
+ # collision_action.update(GripperDictType(gripper=np.array([0.0])))
# _, _, _, _, info = env.step(collision_action)
# p2: np.ndarray = unwrapped.robot.get_joint_position()
# self.assert_collision(info)
@@ -355,7 +355,7 @@ def test_collision_joints(self, cfg, gripper_cfg):
env.reset()
# the below action is a test_case where there is an obvious collision regardless of the gripper action
collision_act = JointsDictType(joints=np.array([0, 1.78, 0, -1.45, 0, 0, 0], dtype=np.float32))
- collision_act.update(GripperDictType(gripper=1))
+ collision_act.update(GripperDictType(gripper=np.array([1.0])))
_, _, _, _, info = env.step(collision_act)
self.assert_collision(info)
@@ -377,7 +377,7 @@ def test_collision_joints(self, cfg, gripper_cfg):
# p1: np.ndarray = unwrapped.robot.get_joint_position()
# # the below action is a test_case where there is an obvious collision regardless of the gripper action
# collision_act = JointsDictType(joints=np.array([0, 1.78, 0, -1.45, 0, 0, 0], dtype=np.float32))
- # collision_act.update(GripperDictType(gripper=1))
+ # collision_act.update(GripperDictType(gripper=np.array([1.0])))
# _, _, _, _, info = env.step(collision_act)
# p2: np.ndarray = unwrapped.robot.get_joint_position()
@@ -393,6 +393,6 @@ def test_relative_zero_action_joints(self, cfg, gripper_cfg):
env = SimEnvCreator()(ControlMode.JOINTS, cfg, gripper_cfg=gripper_cfg, cameras=None, max_relative_movement=0.5)
obs_initial, _ = env.reset()
act = JointsDictType(joints=np.array([0, 0, 0, 0, 0, 0, 0], dtype=np.float32))
- act.update(GripperDictType(gripper=1))
+ act.update(GripperDictType(gripper=np.array([1.0])))
obs, _, _, _, info = env.step(act)
self.assert_no_pose_change(info, obs_initial, obs)
diff --git a/src/pybind/rcs.cpp b/src/pybind/rcs.cpp
index f4491802..78726e2e 100644
--- a/src/pybind/rcs.cpp
+++ b/src/pybind/rcs.cpp
@@ -474,6 +474,14 @@ PYBIND11_MODULE(_core, m) {
.def_readwrite("joints", &rcs::sim::SimRobotConfig::joints)
.def_readwrite("actuators", &rcs::sim::SimRobotConfig::actuators)
.def_readwrite("base", &rcs::sim::SimRobotConfig::base)
+ .def("__copy__",
+ [](const rcs::sim::SimRobotConfig& self) {
+ return rcs::sim::SimRobotConfig(self);
+ })
+ .def("__deepcopy__",
+ [](const rcs::sim::SimRobotConfig& self, py::dict) {
+ return rcs::sim::SimRobotConfig(self);
+ })
.def("add_id", &rcs::sim::SimRobotConfig::add_id, py::arg("id"));
py::class_(sim,
"SimRobotState")
@@ -512,6 +520,14 @@ PYBIND11_MODULE(_core, m) {
&rcs::sim::SimGripperConfig::max_actuator_width)
.def_readwrite("min_actuator_width",
&rcs::sim::SimGripperConfig::min_actuator_width)
+ .def("__copy__",
+ [](const rcs::sim::SimGripperConfig& self) {
+ return rcs::sim::SimGripperConfig(self);
+ })
+ .def("__deepcopy__",
+ [](const rcs::sim::SimGripperConfig& self, py::dict) {
+ return rcs::sim::SimGripperConfig(self);
+ })
.def("add_id", &rcs::sim::SimGripperConfig::add_id, py::arg("id"));
py::class_(
sim, "SimGripperState")