diff --git a/.github/workflows/py.yaml b/.github/workflows/py.yaml index 7007f06e..35c52c9e 100644 --- a/.github/workflows/py.yaml +++ b/.github/workflows/py.yaml @@ -27,10 +27,13 @@ jobs: env: CC: clang-15 CXX: clang++-15 + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} steps: - uses: actions/checkout@v4 - name: Install CPP dependencies run: sudo apt install $(cat debian_deps.txt) + - name: Install virtual frame buffer tool + run: sudo apt install xvfb - name: Set up Python 3.11 uses: actions/setup-python@v5 with: @@ -38,13 +41,17 @@ jobs: cache: "pip" - name: Install building dependencies run: python -m pip install -r requirements_dev.txt + - name: Download the models using cmake + env: + MODEL_TOKEN: ${{ secrets.MODEL_TOKEN }} + run: cmake -G Ninja -B build -DINCLUDE_UTN_MODELS=ON -DGITLAB_MODELS_TOKEN="$MODEL_TOKEN" -DMUJOCO_VERSION=3.1.5 - name: Create python wheel run: python -m build --no-isolation --wheel --outdir dist/ - name: Install the wheel package - run: python -m pip install --no-build-isolation dist/*.whl + run: python -m pip install -v --no-build-isolation dist/*.whl - name: Code linting run: make pylint - name: Check that stub files are up-to-date run: make stubgen && git diff --exit-code - name: Ensure all unittests(pytest) are passing - run: make pytest + run: xvfb-run make pytest diff --git a/README.md b/README.md index b695e27f..e2af3ea7 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ sudo apt install $(cat debian_deps.txt) python3 -m venv .venv source .venv/bin/activate pip config --site set global.no-build-isolation false -pip install --requirements requirements_dev.txt +pip install -r requirements_dev.txt ``` 3. Build and install RCS: ```shell diff --git a/python/rcsss/envs/base.py b/python/rcsss/envs/base.py index d3c0f7a3..447b42eb 100644 --- a/python/rcsss/envs/base.py +++ b/python/rcsss/envs/base.py @@ -111,14 +111,26 @@ class CameraDictType(RCSpaceType): ] -# joining works with inhertiance but need to inherit from protocol again -class ArmObsType(TQuartDictType, JointsDictType): ... +# joining works with inheritance but need to inherit from protocol again +class ArmObsType(TQuartDictType, JointsDictType, TRPYDictType): ... CartOrJointContType: TypeAlias = TQuartDictType | JointsDictType | TRPYDictType LimitedCartOrJointContType: TypeAlias = LimitedTQuartRelDictType | LimitedJointsRelDictType | LimitedTRPYRelDictType +class ObsArmsGr(ArmObsType, GripperDictType): + pass + + +class ObsArmsGrCam(ArmObsType, GripperDictType, CameraDictType): + pass + + +class ObsArmsGrCamCG(ArmObsType, GripperDictType, CameraDictType): + pass + + class ControlMode(Enum): JOINTS = 1 CARTESIAN_TRPY = 2 @@ -131,7 +143,8 @@ class FR3Env(gym.Env): Top view of on the robot. Robot faces into x direction. z direction faces upwards. (Right handed coordinate axis) ^ x - <- RobotBase + | + <-- RobotBase y """ @@ -178,6 +191,7 @@ def get_obs(self) -> ArmObsType: [self.robot.get_cartesian_position().translation(), self.robot.get_cartesian_position().rotation_q()] ), joints=self.robot.get_joint_position(), + xyzrpy=self.robot.get_cartesian_position().xyzrpy(), ) def step(self, action: CartOrJointContType) -> tuple[ArmObsType, float, bool, bool, dict]: diff --git a/python/rcsss/envs/hw.py b/python/rcsss/envs/hw.py index 8724ebe6..db396c5e 100644 --- a/python/rcsss/envs/hw.py +++ b/python/rcsss/envs/hw.py @@ -19,7 +19,7 @@ def __init__(self, env): assert isinstance(self.unwrapped.robot, hw.FR3), "Robot must be a hw.FR3 instance." self.hw_robot = cast(hw.FR3, self.unwrapped.robot) - def step(self, action: Any) -> tuple[Any, SupportsFloat, bool, bool, dict]: + def step(self, action: Any) -> tuple[dict[str, Any], SupportsFloat, bool, bool, dict]: try: return super().step(action) except hw.exceptions.FrankaControlException as e: @@ -27,7 +27,7 @@ def step(self, action: Any) -> tuple[Any, SupportsFloat, bool, bool, dict]: self.hw_robot.automatic_error_recovery() # TODO: this does not work if some wrappers are in between # FR3HW and FR3Env - return self.unwrapped.get_obs(), 0, False, True, {} + return dict(self.unwrapped.get_obs()), 0, False, True, {} def reset( self, seed: int | None = None, options: dict[str, Any] | None = None diff --git a/python/rcsss/envs/sim.py b/python/rcsss/envs/sim.py index 8390644b..24b51d23 100644 --- a/python/rcsss/envs/sim.py +++ b/python/rcsss/envs/sim.py @@ -25,14 +25,15 @@ def __init__(self, env, simulation: sim.Sim): self.sim = simulation def step(self, action: dict[str, Any]) -> tuple[dict[str, Any], float, bool, bool, dict]: - obs, _, _, _, info = super().step(action) + _, _, _, _, info = super().step(action) self.sim.step_until_convergence() state = self.sim_robot.get_state() info["collision"] = state.collision info["ik_success"] = state.ik_success info["is_sim_converged"] = self.sim.is_converged() # truncate episode if collision - return obs, 0, False, state.collision or not state.ik_success, info + + return dict(self.unwrapped.get_obs()), 0, False, state.collision or not state.ik_success, info def reset( self, seed: int | None = None, options: dict[str, Any] | None = None diff --git a/python/rcsss/envs/typed_factory.py b/python/rcsss/envs/typed_factory.py new file mode 100644 index 00000000..6287c820 --- /dev/null +++ b/python/rcsss/envs/typed_factory.py @@ -0,0 +1,539 @@ +import typing + +import gymnasium as gym +from rcsss import hw, sim +from rcsss._core.hw import FHConfig +from rcsss._core.sim import CameraType +from rcsss.camera.realsense import RealSenseCameraSet, RealSenseSetConfig +from rcsss.camera.sim import SimCameraConfig, SimCameraSet, SimCameraSetConfig +from rcsss.config import read_config_yaml +from rcsss.desk import Desk +from rcsss.envs.base import ( + ArmObsType, + CameraSetWrapper, + CartOrJointContType, + ControlMode, + FR3Env, + GripperWrapper, + JointsDictType, + LimitedJointsRelDictType, + LimitedTQuartRelDictType, + LimitedTRPYRelDictType, + ObsArmsGr, + ObsArmsGrCam, + RelativeActionSpace, + TQuartDictType, + TRPYDictType, +) +from rcsss.envs.hw import FR3HW +from rcsss.envs.sim import CollisionGuard, FR3Sim + + +def produce_env_sim( + mjcf_path: str, urdf_path: str, control_mode: ControlMode, cfg: sim.FR3Config, robot_id: str = "0" +) -> tuple[gym.Env[ArmObsType, CartOrJointContType], dict[str, typing.Any]]: + simulation = sim.Sim(mjcf_path) + robot = sim.FR3(simulation, robot_id, urdf_path) + robot.set_parameters(cfg) + env = FR3Env(robot, control_mode) + env_sim = FR3Sim(env, simulation) + return typing.cast(gym.Env[ArmObsType, CartOrJointContType], FR3Sim(env_sim, simulation)), { + "sim": simulation, + "robot": robot, + } + + +def produce_env_sim_joints( + mjcf_path: str, urdf_path: str, cfg: sim.FR3Config, robot_id: str = "0" +) -> tuple[gym.Env[ArmObsType, JointsDictType], dict[str, typing.Any]]: + env, info = produce_env_sim(mjcf_path, urdf_path, control_mode=ControlMode.JOINTS, cfg=cfg, robot_id=robot_id) + return ( + typing.cast( + gym.Env[ArmObsType, JointsDictType], + env, + ), + info, + ) + + +def produce_env_sim_trpy( + mjcf_path: str, urdf_path: str, cfg: sim.FR3Config, robot_id: str = "0" +) -> tuple[gym.Env[ArmObsType, TRPYDictType], dict[str, typing.Any]]: + env, info = produce_env_sim( + mjcf_path, urdf_path, control_mode=ControlMode.CARTESIAN_TRPY, cfg=cfg, robot_id=robot_id + ) + return ( + typing.cast( + gym.Env[ArmObsType, TRPYDictType], + env, + ), + info, + ) + + +def produce_env_sim_tquart( + mjcf_path: str, urdf_path: str, cfg: sim.FR3Config, robot_id: str = "0" +) -> tuple[gym.Env[ArmObsType, TQuartDictType], dict[str, typing.Any]]: + env, info = produce_env_sim( + mjcf_path, urdf_path, control_mode=ControlMode.CARTESIAN_TQuart, cfg=cfg, robot_id=robot_id + ) + return typing.cast(gym.Env[ArmObsType, TQuartDictType], env), info + + +def produce_env_sim_tquart_gripper_camera( + mjcf_path: str, + urdf_path: str, + cfg: sim.FR3Config, + gripper_cfg: sim.FHConfig, + cam_cfg: SimCameraSetConfig, + robot_id: str = "0", +) -> gym.Env[ObsArmsGrCam, TQuartDictType]: + """Environment with tquart, gripper, camera, collision guard and relative limits""" + env_sim, info = produce_env_sim_tquart(mjcf_path, urdf_path, cfg, robot_id) + simulation = info["sim"] + gripper = sim.FrankaHand(simulation, robot_id, gripper_cfg) + env_sim_gripper = GripperWrapper(env_sim, gripper) + camera_set = SimCameraSet(simulation, cam_cfg) + env_sim_gripper_cam: gym.Env = CameraSetWrapper(env_sim_gripper, camera_set) + return typing.cast( + gym.Env[ObsArmsGrCam, TQuartDictType], + env_sim_gripper_cam, + ) + + +def produce_env_sim_tquart_gripper_camera_rel( + mjcf_path: str, + urdf_path: str, + cfg: sim.FR3Config, + gripper_cfg: sim.FHConfig, + cam_cfg: SimCameraSetConfig, + robot_id: str = "0", +) -> gym.Env[ObsArmsGrCam, LimitedTQuartRelDictType]: + """Environment with tquart, gripper, camera, collision guard and relative limits""" + env_cam = produce_env_sim_tquart_gripper_camera(mjcf_path, urdf_path, cfg, gripper_cfg, cam_cfg, robot_id) + env_rel = RelativeActionSpace(env_cam) + return typing.cast( + gym.Env[ObsArmsGrCam, LimitedTQuartRelDictType], + env_rel, + ) + + +def produce_env_sim_tquart_gripper_camera_cg( + mjcf_path: str, + urdf_path: str, + cfg: sim.FR3Config, + gripper_cfg: sim.FHConfig, + cam_cfg: SimCameraSetConfig, + robot_id: str = "0", +) -> gym.Env[ObsArmsGrCam, TQuartDictType]: + """Environment with tquart, gripper, camera, collision guard and relative limits""" + env_sim_tquart_gripper_cam = produce_env_sim_tquart_gripper_camera( + mjcf_path, urdf_path, cfg, gripper_cfg, cam_cfg, robot_id + ) + env_sim_tquart_gripper_cam_cg = CollisionGuard.env_from_xml_paths( + env_sim_tquart_gripper_cam, + mjcf_path, + urdf_path, + gripper=True, + check_home_collision=False, + ) + return typing.cast( + gym.Env[ObsArmsGrCam, TQuartDictType], + env_sim_tquart_gripper_cam_cg, + ) + + +def produce_env_sim_tquart_gripper_camera_cg_rel( + mjcf_path: str, + urdf_path: str, + cfg: sim.FR3Config, + gripper_cfg: sim.FHConfig, + cam_cfg: SimCameraSetConfig, + robot_id: str = "0", +) -> gym.Env[ObsArmsGrCam, LimitedTQuartRelDictType]: + """Environment with tquart, gripper, camera, collision guard and relative limits""" + env_sim_tquart_gripper_cam = produce_env_sim_tquart_gripper_camera_cg( + mjcf_path, urdf_path, cfg, gripper_cfg, cam_cfg, robot_id + ) + env_sim_tquart_gripper_cam_cg = RelativeActionSpace(env_sim_tquart_gripper_cam) + return typing.cast( + gym.Env[ObsArmsGrCam, LimitedTQuartRelDictType], + env_sim_tquart_gripper_cam_cg, + ) + + +def produce_env_sim_trpy_gripper_camera( + mjcf_path: str, + urdf_path: str, + cfg: sim.FR3Config, + gripper_cfg: sim.FHConfig, + cam_cfg: SimCameraSetConfig, + robot_id: str = "0", +) -> gym.Env[ObsArmsGrCam, TRPYDictType]: + """Environment with trpy, gripper, camera, collision guard and relative limits""" + env_sim_trpy, info = produce_env_sim_trpy(mjcf_path, urdf_path, cfg, robot_id) + simulation = info["sim"] + gripper = sim.FrankaHand(simulation, robot_id, gripper_cfg) + env_sim_trpy_gripper = GripperWrapper(env_sim_trpy, gripper) + camera_set = SimCameraSet(simulation, cam_cfg) + env_sim_trpy_gripper_cam: gym.Env = CameraSetWrapper(env_sim_trpy_gripper, camera_set) + return typing.cast( + gym.Env[ObsArmsGrCam, TRPYDictType], + env_sim_trpy_gripper_cam, + ) + + +def produce_env_sim_trpy_gripper_camera_rel( + mjcf_path: str, + urdf_path: str, + cfg: sim.FR3Config, + gripper_cfg: sim.FHConfig, + cam_cfg: SimCameraSetConfig, + robot_id: str = "0", +) -> gym.Env[ObsArmsGrCam, TRPYDictType]: + """Environment with trpy, gripper, camera, collision guard and relative limits""" + env_sim_trpy_gripper_cam = produce_env_sim_trpy_gripper_camera( + mjcf_path, urdf_path, cfg, gripper_cfg, cam_cfg, robot_id + ) + env_sim_trpy_gripper_cam_rel = RelativeActionSpace(env_sim_trpy_gripper_cam) + return typing.cast( + gym.Env[ObsArmsGrCam, LimitedTRPYRelDictType], + env_sim_trpy_gripper_cam_rel, + ) + + +def produce_env_sim_trpy_gripper_camera_cg( + mjcf_path: str, + urdf_path: str, + cfg: sim.FR3Config, + gripper_cfg: sim.FHConfig, + cam_cfg: SimCameraSetConfig, + robot_id: str = "0", +) -> gym.Env[ObsArmsGrCam, TRPYDictType]: + """Environment with trpy, gripper, camera, collision guard and relative limits""" + env_sim_trpy_gripper_cam = produce_env_sim_trpy_gripper_camera( + mjcf_path, urdf_path, cfg, gripper_cfg, cam_cfg, robot_id + ) + env_sim_trpy_gripper_cam_cg = CollisionGuard.env_from_xml_paths( + env_sim_trpy_gripper_cam, + mjcf_path, + urdf_path, + gripper=True, + check_home_collision=False, + ) + return typing.cast( + gym.Env[ObsArmsGrCam, TRPYDictType], + env_sim_trpy_gripper_cam_cg, + ) + + +def produce_env_sim_trpy_gripper_camera_cg_rel( + mjcf_path: str, + urdf_path: str, + cfg: sim.FR3Config, + gripper_cfg: sim.FHConfig, + cam_cfg: SimCameraSetConfig, + robot_id: str = "0", +) -> gym.Env[ObsArmsGrCam, LimitedTRPYRelDictType]: + """Environment with trpy, gripper, camera, collision guard and relative limits""" + env_sim_trpy_gripper_camera_cg = produce_env_sim_trpy_gripper_camera_cg( + mjcf_path, urdf_path, cfg, gripper_cfg, cam_cfg, robot_id + ) + env_sim_trpy_gripper_camera_cg_rel = RelativeActionSpace(env_sim_trpy_gripper_camera_cg) + return typing.cast( + gym.Env[ObsArmsGrCam, LimitedTRPYRelDictType], + env_sim_trpy_gripper_camera_cg_rel, + ) + + +def produce_env_sim_joints_gripper_camera( + mjcf_path: str, + urdf_path: str, + cfg: sim.FR3Config, + gripper_cfg: sim.FHConfig, + cam_cfg: SimCameraSetConfig, + robot_id: str = "0", +) -> gym.Env[ObsArmsGrCam, JointsDictType]: + """Environment with joints, gripper, camera""" + env_sim_joints, info = produce_env_sim_joints(mjcf_path, urdf_path, cfg, robot_id) + simulation = info["sim"] + gripper = sim.FrankaHand(simulation, robot_id, gripper_cfg) + env_sim_joints_gripper = GripperWrapper(env_sim_joints, gripper) + camera_set = SimCameraSet(simulation, cam_cfg) + env_sim_joints_gripper_cam: gym.Env = CameraSetWrapper(env_sim_joints_gripper, camera_set) + return typing.cast( + gym.Env[ObsArmsGrCam, JointsDictType], + env_sim_joints_gripper_cam, + ) + + +def produce_env_sim_joints_gripper_camera_rel( + mjcf_path: str, + urdf_path: str, + cfg: sim.FR3Config, + gripper_cfg: sim.FHConfig, + cam_cfg: SimCameraSetConfig, + robot_id: str = "0", +) -> gym.Env[ObsArmsGrCam, JointsDictType]: + """Environment with joints, gripper, camera, collision guard and relative limits""" + env_sim_joints_cam = produce_env_sim_joints_gripper_camera( + mjcf_path, urdf_path, cfg, gripper_cfg, cam_cfg, robot_id + ) + env_sim_joints_cam_rel = RelativeActionSpace(env_sim_joints_cam) + return typing.cast( + gym.Env[ObsArmsGrCam, LimitedJointsRelDictType], + env_sim_joints_cam_rel, + ) + + +def produce_env_sim_joints_gripper_camera_cg( + mjcf_path: str, + urdf_path: str, + cfg: sim.FR3Config, + gripper_cfg: sim.FHConfig, + cam_cfg: SimCameraSetConfig, + robot_id: str = "0", +) -> gym.Env[ObsArmsGrCam, JointsDictType]: + """Environment with joints, gripper, camera, collision guard and relative limits""" + env_sim_joints_gripper_cam = produce_env_sim_joints_gripper_camera( + mjcf_path, urdf_path, cfg, gripper_cfg, cam_cfg, robot_id + ) + env_sim_joints_gripper_cam_cg = CollisionGuard.env_from_xml_paths( + env_sim_joints_gripper_cam, + mjcf_path, + urdf_path, + gripper=True, + check_home_collision=False, + ) + return typing.cast( + gym.Env[ObsArmsGrCam, JointsDictType], + env_sim_joints_gripper_cam_cg, + ) + + +def produce_env_sim_joints_gripper_camera_cg_rel( + mjcf_path: str, + urdf_path: str, + cfg: sim.FR3Config, + gripper_cfg: sim.FHConfig, + cam_cfg: SimCameraSetConfig, + robot_id: str = "0", +) -> gym.Env[ObsArmsGrCam, LimitedJointsRelDictType]: + """Environment with joints, gripper, camera, collision guard and relative limits""" + env_sim_joints_gripper_cam_cg = produce_env_sim_joints_gripper_camera_cg( + mjcf_path, urdf_path, cfg, gripper_cfg, cam_cfg, robot_id + ) + env_sim_joints_gripper_cam_cg_rel = RelativeActionSpace(env_sim_joints_gripper_cam_cg) + return typing.cast( + gym.Env[ObsArmsGrCam, LimitedJointsRelDictType], + env_sim_joints_gripper_cam_cg_rel, + ) + + +def produce_env_hw( + ip: str, urdf_path: str, control_mode: ControlMode, cfg_path: str +) -> gym.Env[ArmObsType, CartOrJointContType]: + cfg = read_config_yaml(cfg_path) + with Desk(ip, cfg.hw.username, cfg.hw.password) as d: + d.unlock() + d.activate_fci() + robot = hw.FR3(ip, urdf_path) + env = FR3Env(robot, control_mode) + env_hw = FR3HW(env) + + return typing.cast(gym.Env[ArmObsType, CartOrJointContType], env_hw) + + +def produce_env_hw_joints(ip: str, urdf_path: str, cfg_path: str) -> gym.Env[ArmObsType, JointsDictType]: + env_hw_joints = produce_env_hw(ip, urdf_path, ControlMode.JOINTS, cfg_path) + return typing.cast(gym.Env[ArmObsType, JointsDictType], env_hw_joints) + + +def produce_env_hw_joints_gripper( + ip: str, urdf_path: str, gripper_cfg: FHConfig, cfg_path: str +) -> (gym.Env)[ObsArmsGr, JointsDictType]: + env_hw_joints = produce_env_hw_joints(ip, urdf_path, cfg_path) + gripper = hw.FrankaHand(ip, gripper_cfg) + env_hw_joints_gripper = GripperWrapper(env_hw_joints, gripper) + return typing.cast(gym.Env[ObsArmsGr, JointsDictType], env_hw_joints_gripper) + + +def produce_env_hw_joints_gripper_camera( + ip: str, urdf_path: str, gripper_cfg: FHConfig, cam_cfg: RealSenseSetConfig, cfg_path: str +) -> (gym.Env)[ObsArmsGrCam, JointsDictType]: + env_hw_joints_gripper = produce_env_hw_joints_gripper(ip, urdf_path, gripper_cfg, cfg_path) + camera_set = RealSenseCameraSet(cam_cfg) + env_hw_joints_gripper_cam: gym.Env = CameraSetWrapper(env_hw_joints_gripper, camera_set) + return typing.cast(gym.Env[ObsArmsGrCam, JointsDictType], env_hw_joints_gripper_cam) + + +def produce_env_hw_joints_gripper_camera_rel( + ip: str, urdf_path: str, gripper_cfg: FHConfig, cam_cfg: RealSenseSetConfig, cfg_path: str +) -> gym.Env[ObsArmsGrCam, LimitedJointsRelDictType]: + env_hw_joints_gripper_cam = produce_env_hw_joints_gripper_camera(ip, urdf_path, gripper_cfg, cam_cfg, cfg_path) + env_sim_joints_gripper_cam_rel = RelativeActionSpace(env_hw_joints_gripper_cam) + return typing.cast(gym.Env[ObsArmsGrCam, LimitedJointsRelDictType], env_sim_joints_gripper_cam_rel) + + +def produce_env_hw_joints_gripper_camera_cg( + ip: str, urdf_path: str, mjcf_path: str, gripper_cfg: FHConfig, cam_cfg: RealSenseSetConfig, cfg_path: str +) -> gym.Env[ObsArmsGrCam, JointsDictType]: + env_hw_joints_gripper_cam = produce_env_hw_joints_gripper_camera(ip, urdf_path, gripper_cfg, cam_cfg, cfg_path) + env_hw_joints_gripper_cam_cg = CollisionGuard.env_from_xml_paths( + env_hw_joints_gripper_cam, mjcf_path, urdf_path, gripper=True, check_home_collision=False + ) + return typing.cast(gym.Env[ObsArmsGrCam, JointsDictType], env_hw_joints_gripper_cam_cg) + + +def produce_env_hw_joints_gripper_camera_cg_rel( + ip: str, urdf_path: str, mjcf_path: str, gripper_cfg: FHConfig, cam_cfg: RealSenseSetConfig, cfg_path: str +) -> gym.Env[ObsArmsGrCam, LimitedJointsRelDictType]: + env_hw_joints_gripper_cam = produce_env_hw_joints_gripper_camera(ip, urdf_path, gripper_cfg, cam_cfg, cfg_path) + env_hw_joints_gripper_cam_cg = CollisionGuard.env_from_xml_paths( + env_hw_joints_gripper_cam, + mjcf_path, + urdf_path, + gripper=True, + check_home_collision=False, + ) + return typing.cast(gym.Env[ObsArmsGrCam, LimitedJointsRelDictType], env_hw_joints_gripper_cam_cg) + + +def produce_env_hw_trpy(ip: str, urdf_path: str, cfg_path: str) -> gym.Env[ArmObsType, TRPYDictType]: + env_hw_trpy = produce_env_hw(ip, urdf_path, ControlMode.CARTESIAN_TRPY, cfg_path) + return typing.cast(gym.Env[ArmObsType, TRPYDictType], env_hw_trpy) + + +def produce_env_hw_trpy_gripper( + ip: str, urdf_path: str, gripper_cfg: FHConfig, cfg_path: str +) -> (gym.Env)[ObsArmsGr, TRPYDictType]: + env_hw_joints = produce_env_hw_trpy(ip, urdf_path, cfg_path) + gripper = hw.FrankaHand(ip, gripper_cfg) + env_hw_trpy_gripper = GripperWrapper(env_hw_joints, gripper) + return typing.cast(gym.Env[ObsArmsGr, TRPYDictType], env_hw_trpy_gripper) + + +def produce_env_hw_trpy_gripper_camera( + ip: str, urdf_path: str, gripper_cfg: FHConfig, cam_cfg: RealSenseSetConfig, cfg_path: str +) -> gym.Env[ObsArmsGrCam, TRPYDictType]: + env_hw_trpy_gripper = produce_env_hw_trpy_gripper(ip, urdf_path, gripper_cfg, cfg_path) + camera_set = RealSenseCameraSet(cam_cfg) + env_hw_trpy_gripper_cam: gym.Env = CameraSetWrapper(env_hw_trpy_gripper, camera_set) + return typing.cast(gym.Env[ObsArmsGrCam, TRPYDictType], env_hw_trpy_gripper_cam) + + +def produce_env_hw_trpy_gripper_camera_rel( + ip: str, urdf_path: str, gripper_cfg: FHConfig, cam_cfg: RealSenseSetConfig, cfg_path: str +) -> gym.Env[ObsArmsGrCam, LimitedTRPYRelDictType]: + env_hw_trpy_gripper_cam = produce_env_hw_trpy_gripper_camera(ip, urdf_path, gripper_cfg, cam_cfg, cfg_path) + env_hw_trpy_gripper_cam_rel = RelativeActionSpace(env_hw_trpy_gripper_cam) + return typing.cast(gym.Env[ObsArmsGrCam, LimitedTRPYRelDictType], env_hw_trpy_gripper_cam_rel) + + +def produce_env_hw_trpy_gripper_camera_cg( + ip: str, urdf_path: str, mjcf_path: str, gripper_cfg: FHConfig, cam_cfg: RealSenseSetConfig, cfg_path: str +) -> gym.Env[ObsArmsGrCam, TRPYDictType]: + env_hw_trpy_gripper_cam = produce_env_hw_trpy_gripper_camera(ip, urdf_path, gripper_cfg, cam_cfg, cfg_path) + env_hw_trpy_gripper_cam_cg = CollisionGuard.env_from_xml_paths( + env_hw_trpy_gripper_cam, mjcf_path, urdf_path, gripper=True, check_home_collision=False + ) + return typing.cast(gym.Env[ObsArmsGrCam, TRPYDictType], env_hw_trpy_gripper_cam_cg) + + +def produce_env_hw_trpy_gripper_camera_cg_rel( + ip: str, urdf_path: str, mjcf_path: str, gripper_cfg: FHConfig, cam_cfg: RealSenseSetConfig, cfg_path: str +) -> gym.Env[ObsArmsGrCam, LimitedTRPYRelDictType]: + env_hw_trpy_gripper_cam = produce_env_hw_trpy_gripper_camera(ip, urdf_path, gripper_cfg, cam_cfg, cfg_path) + env_hw_joints_gripper_cam_cg = CollisionGuard.env_from_xml_paths( + env_hw_trpy_gripper_cam, + mjcf_path, + urdf_path, + gripper=True, + check_home_collision=False, + ) + return typing.cast(gym.Env[ObsArmsGrCam, LimitedTRPYRelDictType], env_hw_joints_gripper_cam_cg) + + +def produce_env_hw_tquart(ip: str, urdf_path: str, cfg_path: str) -> gym.Env[ArmObsType, TQuartDictType]: + env_hw_tquart = produce_env_hw(ip, urdf_path, ControlMode.CARTESIAN_TQuart, cfg_path) + return typing.cast(gym.Env[ArmObsType, TQuartDictType], env_hw_tquart) + + +def produce_env_hw_tquart_gripper( + ip: str, urdf_path: str, gripper_cfg: FHConfig, cfg_path: str +) -> (gym.Env)[ObsArmsGr, TQuartDictType]: + env_hw_tquart = produce_env_hw_tquart(ip, urdf_path, cfg_path) + gripper = hw.FrankaHand(ip, gripper_cfg) + env_hw_tquart_gripper = GripperWrapper(env_hw_tquart, gripper) + return typing.cast(gym.Env[ObsArmsGr, TQuartDictType], env_hw_tquart_gripper) + + +def produce_env_hw_tquart_gripper_camera( + ip: str, urdf_path: str, gripper_cfg: FHConfig, cam_cfg: RealSenseSetConfig, cfg_path: str +) -> gym.Env[ObsArmsGrCam, TQuartDictType]: + env_hw_tquart_gripper = produce_env_hw_tquart_gripper(ip, urdf_path, gripper_cfg, cfg_path) + camera_set = RealSenseCameraSet(cam_cfg) + env_hw_tquart_gripper_cam: gym.Env = CameraSetWrapper(env_hw_tquart_gripper, camera_set) + return typing.cast(gym.Env[ObsArmsGrCam, TQuartDictType], env_hw_tquart_gripper_cam) + + +def produce_env_hw_tquart_gripper_camera_rel( + ip: str, urdf_path: str, gripper_cfg: FHConfig, cam_cfg: RealSenseSetConfig, cfg_path: str +) -> gym.Env[ObsArmsGrCam, LimitedTQuartRelDictType]: + env_hw_tquart_gripper_cam = produce_env_hw_tquart_gripper_camera(ip, urdf_path, gripper_cfg, cam_cfg, cfg_path) + env_sim_tquart_gripper_cam_rel = RelativeActionSpace(env_hw_tquart_gripper_cam) + return typing.cast(gym.Env[ObsArmsGrCam, LimitedTQuartRelDictType], env_sim_tquart_gripper_cam_rel) + + +def produce_env_hw_tquart_gripper_camera_cg( + ip: str, urdf_path: str, mjcf_path: str, gripper_cfg: FHConfig, cam_cfg: RealSenseSetConfig, cfg_path: str +) -> gym.Env[ObsArmsGrCam, TQuartDictType]: + env_hw_tquart_gripper_cam = produce_env_hw_tquart_gripper_camera(ip, urdf_path, gripper_cfg, cam_cfg, cfg_path) + env_hw_tquart_gripper_cam_cg = CollisionGuard.env_from_xml_paths( + env_hw_tquart_gripper_cam, mjcf_path, urdf_path, gripper=True, check_home_collision=False + ) + return typing.cast(gym.Env[ObsArmsGrCam, TQuartDictType], env_hw_tquart_gripper_cam_cg) + + +def produce_env_hw_tquart_gripper_camera_cg_rel( + ip: str, urdf_path: str, mjcf_path: str, gripper_cfg: FHConfig, cam_cfg: RealSenseSetConfig, cfg_path: str +) -> gym.Env[ObsArmsGrCam, LimitedTQuartRelDictType]: + env_hw_tquart_gripper_cam = produce_env_hw_tquart_gripper_camera_cg( + ip, urdf_path, mjcf_path, gripper_cfg, cam_cfg, cfg_path + ) + env_hw_tquart_gripper_cam_cg = CollisionGuard.env_from_xml_paths( + env_hw_tquart_gripper_cam, + mjcf_path, + urdf_path, + gripper=True, + check_home_collision=False, + ) + return typing.cast(gym.Env[ObsArmsGrCam, LimitedTQuartRelDictType], env_hw_tquart_gripper_cam_cg) + + +if __name__ == "__main__": + mjcf = "models/mjcf/fr3_modular/scene.xml" + urdf = "models/fr3/urdf/fr3_from_panda.urdf" + robot_id_ = "0" + + cfg_ = sim.FR3Config() + cfg_.ik_duration_in_milliseconds = 300 + cfg_.realtime = False + gripper_cfg = sim.FHConfig() + + cameras = { + "wrist": SimCameraConfig(identifier="eye-in-hand_0", type=int(CameraType.fixed), on_screen_render=False), + "default_free": SimCameraConfig(identifier="", type=int(CameraType.default_free), on_screen_render=True), + } + cam_cfg_ = SimCameraSetConfig(cameras=cameras, resolution_width=640, resolution_height=480, frame_rate=50) + + env_ = produce_env_sim_tquart_gripper_camera_rel( + mjcf_path=mjcf, urdf_path=urdf, cfg=cfg_, gripper_cfg=gripper_cfg, cam_cfg=cam_cfg_, robot_id=robot_id_ + ) + obs_, info_ = env_.reset() + for _ in range(100): + act = env_.action_space.sample() + act["tquart"][3:] = [0, 0, 0, 1] + obs, reward, terminated, truncated, info = env_.step(act) + if truncated or terminated: + env_.reset() diff --git a/python/tests/test_common.py b/python/tests/test_common.py index c246a885..2a241278 100644 --- a/python/tests/test_common.py +++ b/python/tests/test_common.py @@ -1,8 +1,8 @@ import math +import numpy as np import pytest from rcsss import common -import numpy as np class TestPose: diff --git a/python/tests/test_envs.py b/python/tests/test_envs.py index 15fff55d..a65fb808 100644 --- a/python/tests/test_envs.py +++ b/python/tests/test_envs.py @@ -1,8 +1,7 @@ from typing import Annotated -import numpy as np import gymnasium as gym - +import numpy as np from rcsss.envs.space_utils import RCSpaceType, get_space, get_space_keys diff --git a/python/tests/test_sim_envs.py b/python/tests/test_sim_envs.py new file mode 100644 index 00000000..b9c757ba --- /dev/null +++ b/python/tests/test_sim_envs.py @@ -0,0 +1,360 @@ +from collections import OrderedDict + +import numpy as np +import pytest +import rcsss +from rcsss import sim +from rcsss._core import common +from rcsss.camera.sim import SimCameraSetConfig +from rcsss.envs import typed_factory as tf + + +@pytest.fixture +def cfg(): + fr3_config = sim.FR3Config() + fr3_config.ik_duration_in_milliseconds = 300 + fr3_config.realtime = False + return fr3_config + + +@pytest.fixture +def gripper_cfg(): + gripper_cfg = sim.FHConfig() + return gripper_cfg + + +@pytest.fixture +def cam_cfg(): + # no camera obs needed for this test + cameras = {} + cam_cfg_ = SimCameraSetConfig(cameras=cameras, resolution_width=640, resolution_height=480, frame_rate=50) + return cam_cfg_ + + +class TestSimEnvs: + """This class is for testing common sim env functionalities""" + + mjcf_path = str(rcsss.scenes["fr3_empty_world"]) + urdf_path = str(rcsss.scenes["lab"].parent / "fr3.urdf") + + def test_model_availability(self): + assert ( + "lab" in rcsss.scenes + ), "This pip package was not built with the UTN lab models which is needed for these tests, aborting." + + +class TestSimEnvsTRPY: + """This class is for testing TRPY sim env functionalities""" + + def test_zero_action_trpy(self, cfg): + """ + Test that a zero action does not change the state significantly + """ + env, _ = tf.produce_env_sim_trpy(TestSimEnvs.mjcf_path, TestSimEnvs.urdf_path, cfg, robot_id="0") + obs_initial, _ = env.reset() + + t = obs_initial["tquart"][:3] + q = obs_initial["tquart"][3:] + initial_pose = common.Pose(translation=np.array(t), quaternion=np.array(q)) + xyzrpy = t.tolist() + initial_pose.rotation_rpy().as_vector().tolist() + zero_action = OrderedDict([("xyzrpy", xyzrpy)]) + obs, _, _, _, info = env.step(zero_action) + assert info["ik_success"] + assert info["is_sim_converged"] + + out_pose = common.Pose(translation=np.array(obs["tquart"][:3]), quaternion=np.array(obs["tquart"][3:])) + expected_pose = initial_pose + assert out_pose.is_close(expected_pose, 1e-2, 1e-3) + + def test_non_zero_action_trpy(self, cfg): + """ + This is for testing that a certain action leads to the expected change in state + """ + # env creation + env, _ = tf.produce_env_sim_trpy(TestSimEnvs.mjcf_path, TestSimEnvs.urdf_path, cfg, robot_id="0") + obs_initial, _ = env.reset() + # action to be performed` + x_pos_change = 0.2 + initial_tquart = obs_initial["tquart"].copy() + t = initial_tquart[:3] + # shift the translation in x + t[0] += x_pos_change + q = initial_tquart[3:] + initial_pose = common.Pose(translation=np.array(t), quaternion=np.array(q)) + xyzrpy = t.tolist() + initial_pose.rotation_rpy().as_vector().tolist() + non_zero_action = OrderedDict([("xyzrpy", np.array(xyzrpy)), ("gripper", 0)]) + expected_obs = obs_initial["tquart"].copy() + expected_obs[0] += x_pos_change + obs, _, _, _, info = env.step(non_zero_action) + assert info["ik_success"] + assert info["is_sim_converged"] + + out_pose = common.Pose(translation=np.array(obs["tquart"][:3]), quaternion=np.array(obs["tquart"][3:])) + expected_pose = common.Pose(translation=np.array(expected_obs[:3]), quaternion=np.array(expected_obs[3:])) + assert out_pose.is_close(expected_pose, 1e-2, 1e-3) + + def test_relative_zero_action_trpy(self, cfg, gripper_cfg, cam_cfg): + # env creation + env = tf.produce_env_sim_trpy_gripper_camera_rel( + TestSimEnvs.mjcf_path, TestSimEnvs.urdf_path, cfg, gripper_cfg, cam_cfg, robot_id="0" + ) + obs_initial, _ = env.reset() + + # action to be performed + zero_action = OrderedDict([("xyzrpy", np.array([0, 0, 0, 0, 0, 0], dtype=np.float32)), ("gripper", 0)]) + obs, _, _, _, info = env.step(zero_action) + assert info["ik_success"] + assert info["is_sim_converged"] + + out_pose = common.Pose(translation=np.array(obs["tquart"][:3]), quaternion=np.array(obs["tquart"][3:])) + expected_pose = common.Pose( + translation=np.array(obs_initial["tquart"][:3]), quaternion=np.array(obs_initial["tquart"][3:]) + ) + assert out_pose.is_close(expected_pose, 1e-2, 1e-3) + + def test_relative_non_zero_action(self, cfg, gripper_cfg, cam_cfg): + # env creation + env = tf.produce_env_sim_trpy_gripper_camera_cg_rel( + TestSimEnvs.mjcf_path, TestSimEnvs.urdf_path, cfg, gripper_cfg, cam_cfg, robot_id="0" + ) + obs_initial, _ = env.reset() + + # action to be performed + x_pos_change = 0.2 + non_zero_action = OrderedDict( + [ + ("xyzrpy", np.array([x_pos_change, 0, 0, 0, 0, 0], dtype=np.float32)), + ("gripper", 0), + ] + ) + expected_obs = obs_initial["tquart"].copy() + + expected_obs[0] += x_pos_change + obs, _, _, _, info = env.step(non_zero_action) + assert info["ik_success"] + assert info["is_sim_converged"] + + out_pose = common.Pose(translation=np.array(obs["tquart"][:3]), quaternion=np.array(obs["tquart"][3:])) + expected_pose = common.Pose(translation=np.array(expected_obs[:3]), quaternion=np.array(expected_obs[3:])) + assert out_pose.is_close(expected_pose, 1e-2, 1e-3) + + # TODO: collision test still randomly fails + def test_collision_guard_trpy(self, cfg, gripper_cfg, cam_cfg): + """ + Check that an obvious collision is detected by the CollisionGuard + """ + + env_ = tf.produce_env_sim_trpy_gripper_camera_cg( + mjcf_path=TestSimEnvs.mjcf_path, + urdf_path=TestSimEnvs.urdf_path, + cfg=cfg, + gripper_cfg=gripper_cfg, + cam_cfg=cam_cfg, + robot_id="0", + ) + obs, _ = env_.reset() + # an obvious below ground collision action + obs["xyzrpy"][2] = -0.2 + + collision_action = OrderedDict( + [ + ("xyzrpy", obs["xyzrpy"]), + ("gripper", 0), + ] + ) + _, _, _, _, info = env_.step(collision_action) + # This is a scenario of the tcp below the ground which is an obvious collision, however the info.collision + # property is not populated correctly + + assert info["ik_success"] + assert info["is_sim_converged"] + assert info["collision"] + + +class TestSimEnvsTquart: + """This class is for testing Tquart sim env functionalities""" + + def test_non_zero_action_tquart(self, cfg): + """ + Test that a zero action does not change the state significantly in the tquart configuration + """ + env, _ = tf.produce_env_sim_tquart(TestSimEnvs.mjcf_path, TestSimEnvs.urdf_path, cfg, robot_id="0") + obs_initial, _ = env.reset() + + # action to be performed + t = obs_initial["tquart"][:3].tolist() + q = obs_initial["tquart"][3:].tolist() + x_pos_change = 0.3 + # updating the x action by 20cm + t[0] += x_pos_change + non_zero_action = OrderedDict([("tquart", np.array(t + q, dtype=np.float32))]) + + expected_obs = obs_initial["tquart"].copy() + expected_obs[0] += x_pos_change + + obs, _, _, _, info = env.step(non_zero_action) + out_pose = common.Pose(translation=np.array(obs["tquart"][:3]), quaternion=np.array(obs["tquart"][3:])) + expected_pose = common.Pose(translation=np.array(expected_obs[:3]), quaternion=np.array(expected_obs[3:])) + assert info["ik_success"] + assert out_pose.is_close(expected_pose, 1e-2, 1e-3) + + def test_zero_action_tquart(self, cfg): + """ + Test that a zero action does not change the state significantly in the tquart configuration + """ + env, _ = tf.produce_env_sim_tquart(TestSimEnvs.mjcf_path, TestSimEnvs.urdf_path, cfg, robot_id="0") + obs_initial, info_ = env.reset() + + home_action_vec = obs_initial["tquart"] + zero_action = OrderedDict([("tquart", home_action_vec)]) + + obs, _, _, _, info = env.step(zero_action) + assert info["ik_success"] + assert info["is_sim_converged"] + out_pose = common.Pose(translation=np.array(obs["tquart"][:3]), quaternion=np.array(obs["tquart"][3:])) + expected_pose = common.Pose( + translation=np.array(obs_initial["tquart"][:3]), quaternion=np.array(obs_initial["tquart"][3:]) + ) + assert out_pose.is_close(expected_pose, 1e-2, 1e-3) + + def test_relative_zero_action_tquart(self, cfg, gripper_cfg, cam_cfg): + # env creation + env = tf.produce_env_sim_tquart_gripper_camera_rel( + TestSimEnvs.mjcf_path, TestSimEnvs.urdf_path, cfg, gripper_cfg, cam_cfg, robot_id="0" + ) + obs_initial, _ = env.reset() + print(f"{obs_initial = }") + zero_rel_action = OrderedDict( + [ + ("tquart", np.array([0, 0, 0, 0, 0, 0, 1.0], dtype=np.float32)), + ("gripper", 0), + ] + ) + obs, _, _, _, info = env.step(zero_rel_action) + assert info["ik_success"] + assert info["is_sim_converged"] + out_pose = common.Pose(translation=np.array(obs["tquart"][:3]), quaternion=np.array(obs["tquart"][3:])) + expected_pose = common.Pose( + translation=np.array(obs_initial["tquart"][:3]), quaternion=np.array(obs_initial["tquart"][3:]) + ) + assert out_pose.is_close(expected_pose, 1e-2, 1e-3) + + # TODO: collision test still randomly fails + def test_collision_guard_tquart(self, cfg, gripper_cfg, cam_cfg): + """ + Check that an obvious collision is detected by the CollisionGuard + """ + + env_ = tf.produce_env_sim_tquart_gripper_camera_cg( + mjcf_path=TestSimEnvs.mjcf_path, + urdf_path=TestSimEnvs.urdf_path, + cfg=cfg, + gripper_cfg=gripper_cfg, + cam_cfg=cam_cfg, + robot_id="0", + ) + env_.reset() + # this is a pose with an obvious collision (below the floor) + act = OrderedDict( + [ + ( + "tquart", + np.array([0.3, 0, -0.01, 9.23879533e-01, -3.82683432e-01, -7.25288143e-17, -3.00424186e-17]), + ), + ("gripper", 0), + ] + ) + _, _, _, _, info = env_.step(act) + + assert info["ik_success"] + assert info["is_sim_converged"] + assert info["collision"] + + +class TestSimEnvsJoints: + """This class is for testing Joints sim env functionalities""" + + def test_zero_action_joints(self, cfg): + """ + This is for testing that a certain action leads to the expected change in state + """ + env_, _ = tf.produce_env_sim_joints(TestSimEnvs.mjcf_path, TestSimEnvs.urdf_path, cfg, robot_id="0") + obs_initial, _ = env_.reset() + + # action to be performed + zero_action = OrderedDict([("joints", np.array(obs_initial["joints"]))]) + obs, _, _, _, info = env_.step(zero_action) + assert info["ik_success"] + assert info["is_sim_converged"] + assert np.allclose(obs["joints"], obs_initial["joints"], atol=0.01, rtol=0) + + def test_non_zero_action_joints(self, cfg): + """ + This is for testing that a certain action leads to the expected change in state + """ + env_, _ = tf.produce_env_sim_joints(TestSimEnvs.mjcf_path, TestSimEnvs.urdf_path, cfg, robot_id="0") + obs_initial, _ = env_.reset() + # action to be performed + non_zero_action = OrderedDict( + [("joints", obs_initial["joints"] + np.array([0.1, 0.1, 0.1, 0.1, -0.1, -0.1, 0.1]))] + ) + obs, _, _, _, info = env_.step(non_zero_action) + assert info["ik_success"] + assert info["is_sim_converged"] + + assert np.allclose(obs["joints"], non_zero_action["joints"], atol=0.01, rtol=0) + + def test_collision_guard_joints(self, cfg, gripper_cfg, cam_cfg): + """ + Check that an obvious collision is detected by the CollisionGuard + """ + + env_ = tf.produce_env_sim_joints_gripper_camera_cg( + mjcf_path=TestSimEnvs.mjcf_path, + urdf_path=TestSimEnvs.urdf_path, + cfg=cfg, + gripper_cfg=gripper_cfg, + cam_cfg=cam_cfg, + robot_id="0", + ) + env_.reset() + # the below action is a test_case where there is an obvious collision regardless of the gripper action + act = OrderedDict( + [ + ("joints", np.array([0, 1.78, 0, -1.45, 0, 0, 0], dtype=np.float32)), + ("gripper", 1), + ] + ) + _, _, _, _, info = env_.step(act) + assert info["ik_success"] + assert info["is_sim_converged"] + assert info["collision"] + + def test_relative_zero_action_joints(self, cfg, gripper_cfg, cam_cfg): + """ + Check that an obvious collision is detected by the CollisionGuard + """ + + env_ = tf.produce_env_sim_joints_gripper_camera_rel( + mjcf_path=TestSimEnvs.mjcf_path, + urdf_path=TestSimEnvs.urdf_path, + cfg=cfg, + gripper_cfg=gripper_cfg, + cam_cfg=cam_cfg, + robot_id="0", + ) + obs_initial, _ = env_.reset() + act = OrderedDict( + [ + ("joints", np.array([0, 0, 0, 0, 0, 0, 0], dtype=np.float32)), + ("gripper", 0), + ] + ) + obs, _, _, _, info = env_.step(act) + assert info["ik_success"] + assert info["is_sim_converged"] + out_pose = common.Pose(translation=np.array(obs["tquart"][:3]), quaternion=np.array(obs["tquart"][3:])) + expected_pose = common.Pose( + translation=np.array(obs_initial["tquart"][:3]), quaternion=np.array(obs_initial["tquart"][3:]) + ) + assert out_pose.is_close(expected_pose, 1e-2, 1e-3)