Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/source/overview/gym/observation_functors.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ This page lists all available observation functors that can be used with the Obs

* - Functor Name
- Description
* - ``get_object_uid``
- Get the user IDs of objects. Returns tensor of shape (num_envs,) with dtype int32. Returns zero tensor if object doesn't exist.
* - ``get_object_body_scale``
- Get the body scale of objects. Returns tensor of shape (num_envs, 3). Only supports ``RigidObject``. Returns zero tensor if object doesn't exist.
* - ``get_rigid_object_velocity``
Expand Down Expand Up @@ -153,6 +155,15 @@ observations = {
"entity_cfg": SceneEntityCfg(uid="cube"),
},
),
# Example: Get object user ID
"object_uid": ObservationCfg(
func="get_object_uid",
mode="add",
name="object/cube/uid",
params={
"entity_cfg": SceneEntityCfg(uid="cube"),
},
),
# Example: Get articulation joint drive properties
"robot_joint_drive": ObservationCfg(
func="get_articulation_joint_drive",
Expand Down
53 changes: 39 additions & 14 deletions docs/source/overview/sim/sim_sensor.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ The {class}`ContactSensorCfg` class defines the configuration for contact sensor
| `rigid_uid_list` | `List[str]` | `[]` | List of rigid body UIDs to monitor for contacts. |
| `articulation_cfg_list` | `List[ArticulationContactFilterCfg]` | `[]` | List of articulation link contact filter configurations. |
| `filter_need_both_actor` | `bool` | `True` | Whether to filter contact only when both actors are in the filter list. If `False`, contact is reported if either actor is in the filter. |
| `max_contacts_per_env` | `int` | `64` | Maximum number of contacts per environment that the sensor can handle. |

### Articulation Contact Filter Configuration

Expand Down Expand Up @@ -170,6 +171,9 @@ contact_filter_cfg.articulation_cfg_list = [contact_filter_art_cfg]
# Only report contacts when both actors are in the filter list
contact_filter_cfg.filter_need_both_actor = True

# Set maximum contacts per environment
contact_filter_cfg.max_contacts_per_env = 128

# 2. Add Sensor to Simulation
contact_sensor: ContactSensor = sim.add_sensor(sensor_cfg=contact_filter_cfg)

Expand All @@ -178,17 +182,28 @@ sim.update(step=1)
contact_sensor.update()
contact_report = contact_sensor.get_data()

# Access contacts for a specific environment using is_valid mask
env_id = 0
env_valid_mask = contact_report["is_valid"][env_id]
env_contact_positions = contact_report["position"][env_id][env_valid_mask]

# Or get all valid contacts across all environments
valid_mask = contact_report["is_valid"]
all_valid_positions = contact_report["position"][valid_mask] # Shape: (total_valid_contacts, 3)

# 4. Filter contacts by specific user IDs
cube2_user_ids = sim.get_rigid_object("cube2").get_user_ids()
finger1_user_ids = sim.get_robot("UR10_PGI").get_user_ids("finger1_link").reshape(-1)
filter_user_ids = torch.cat([cube2_user_ids, finger1_user_ids])
filter_contact_report = contact_sensor.filter_by_user_ids(filter_user_ids)
# Filter for specific environments
filter_contact_report = contact_sensor.filter_by_user_ids(filter_user_ids, env_ids=[env_id])

# 5. Visualize Contact Points
contact_sensor.set_contact_point_visibility(
visible=True,
visible=True,
rgba=(0.0, 0.0, 1.0, 1.0), # Blue color
point_size=6.0
point_size=6.0,
env_ids=[env_id], # Optional: visualize only specific environments
)
```

Expand All @@ -198,17 +213,27 @@ Retrieve contact data using `contact_sensor.get_data()`. The data is returned as

| Key | Data Type | Shape | Description |
| :--- | :--- | :--- | :--- |
| `position` | `torch.float32` | `(n_contact, 3)` | Contact positions in arena frame (world coordinates minus arena offset). |
| `normal` | `torch.float32` | `(n_contact, 3)` | Contact normal vectors. |
| `friction` | `torch.float32` | `(n_contact, 3)` | Contact friction forces. *Note: Currently this value may not be accurate.* |
| `impulse` | `torch.float32` | `(n_contact,)` | Contact impulse magnitudes. |
| `distance` | `torch.float32` | `(n_contact,)` | Contact penetration distances. |
| `user_ids` | `torch.int32` | `(n_contact, 2)` | Pair of user IDs for the two actors in contact. Use with `rigid_object.get_user_ids()` to identify objects. |
| `env_ids` | `torch.int32` | `(n_contact,)` | Environment IDs indicating which parallel environment each contact belongs to. |

*Note: `N` represents the number of contacts detected.*
| `position` | `torch.float32` | `(num_envs, max_contacts_per_env, 3)` | Contact positions in arena frame (world coordinates minus arena offset). |
| `normal` | `torch.float32` | `(num_envs, max_contacts_per_env, 3)` | Contact normal vectors. |
| `friction` | `torch.float32` | `(num_envs, max_contacts_per_env, 3)` | Contact friction forces. *Note: Currently this value may not be accurate.* |
| `impulse` | `torch.float32` | `(num_envs, max_contacts_per_env)` | Contact impulse magnitudes. |
| `distance` | `torch.float32` | `(num_envs, max_contacts_per_env)` | Contact penetration distances. |
| `user_ids` | `torch.int32` | `(num_envs, max_contacts_per_env, 2)` | Pair of user IDs for the two actors in contact. Use with `rigid_object.get_user_ids()` to identify objects. |
| `is_valid` | `torch.bool` | `(num_envs, max_contacts_per_env)` | Boolean mask indicating which contact slots contain valid data. Use this mask to filter out unused slots. |

**Note**: Use the `is_valid` mask to access only valid contacts:
```python
# Get all valid contacts across all environments
valid_mask = contact_report["is_valid"]
valid_positions = contact_report["position"][valid_mask] # Shape: (total_valid_contacts, 3)

# Or access per-environment
env_id = 0
num_valid = contact_report["is_valid"][env_id].sum().item()
env_positions = contact_report["position"][env_id, :num_valid]
```

### Additional Methods

- **`filter_by_user_ids(item_user_ids)`**: Filter contact report to include only contacts involving specific user IDs.
- **`set_contact_point_visibility(visible, rgba, point_size)`**: Enable/disable visualization of contact points with customizable color and size.
- **`filter_by_user_ids(item_user_ids, env_ids=None)`**: Filter contact report to include only contacts involving specific user IDs. Optionally filter by specific environment IDs.
- **`set_contact_point_visibility(visible, rgba, point_size, env_ids=None)`**: Enable/disable visualization of contact points with customizable color and size. Optionally visualize only specific environments.
121 changes: 95 additions & 26 deletions embodichain/lab/gym/envs/managers/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
from embodichain.utils import logger
from embodichain.data.constants import EMBODICHAIN_DEFAULT_DATASET_ROOT
from embodichain.lab.gym.utils.misc import is_stereocam
from embodichain.utils.utility import get_right_name
from embodichain.data.enum import JointType
from embodichain.lab.sim.sensors import Camera, ContactSensor
from .manager_base import Functor
from .cfg import DatasetFunctorCfg

Expand Down Expand Up @@ -306,28 +305,37 @@ def _build_features(self) -> Dict:

for sensor_name, value in sensor_obs_space.items():
sensor = self._env.get_sensor(sensor_name)
is_stereo = is_stereocam(sensor)

for frame_name, space in value.items():
# TODO: Support depth (uint16) and mask (also uint16 or uint8)
if frame_name not in ["color", "color_right"]:
logger.log_error(
f"Only support 'color' frame for vision sensors, but got '{frame_name}' in sensor '{sensor_name}'"
)
if isinstance(sensor, Camera):
is_stereo = is_stereocam(sensor)

features[f"{sensor_name}.{frame_name}"] = {
"dtype": "video" if self.use_videos else "image",
"shape": (sensor.cfg.height, sensor.cfg.width, 3),
"names": ["height", "width", "channel"],
}
for frame_name, space in value.items():
# TODO: Support depth (uint16) and mask (also uint16 or uint8)
if frame_name not in ["color", "color_right"]:
logger.log_error(
f"Only support 'color' frame for vision sensors, but got '{frame_name}' in sensor '{sensor_name}'"
)

if is_stereo:
features[f"{sensor_name}.{frame_name}_right"] = {
features[f"{sensor_name}.{frame_name}"] = {
"dtype": "video" if self.use_videos else "image",
"shape": (sensor.cfg.height, sensor.cfg.width, 3),
"names": ["height", "width", "channel"],
}

if is_stereo:
features[f"{sensor_name}.{frame_name}_right"] = {
"dtype": "video" if self.use_videos else "image",
"shape": (sensor.cfg.height, sensor.cfg.width, 3),
"names": ["height", "width", "channel"],
}
elif isinstance(sensor, ContactSensor):
for frame_name, space in value.items():
features[f"{sensor_name}.{frame_name}"] = {
"dtype": str(space.dtype),
"shape": space.shape,
"names": frame_name,
}

# Add any extra features specified in observation space excluding 'robot' and 'sensor'
for key, space in self._env.single_observation_space.items():
if key in ["robot", "sensor"]:
Expand All @@ -338,12 +346,13 @@ def _build_features(self) -> Dict:
self._add_nested_features(features, key, space)
continue

features[f"observation.{key}"] = {
features[key] = {
"dtype": str(space.dtype),
"shape": space.shape,
"names": key,
}
Comment on lines +349 to 353
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra top-level observations are now stored in features using the raw observation-space key (e.g. features[key]), but nested Dict observations still use the observation.{key}.{sub_key} prefix (see _add_nested_features). This creates an inconsistent schema within the same dataset (some extra obs under observation.*, others not), which can break downstream consumers expecting a single convention. Consider either keeping the observation. prefix for top-level extras as well, or removing it for nested extras so the naming is consistent.

Copilot uses AI. Check for mistakes.

self._modify_feature_names(features)
return features

def _add_nested_features(
Expand Down Expand Up @@ -380,6 +389,51 @@ def _add_nested_features(
"names": sub_key,
}

def _modify_feature_names(self, features: dict[str, Any]) -> None:
"""Get feature names for an observation based on its functor config.

Note:
The `space` parameter is kept for API consistency but not used
directly, as the feature names are derived from the functor config
and entity properties.

For observations generated by `get_object_uid`, returns meaningful names:
- RigidObject: object UID names
- Articulation/Robot: link names

Args:
key: The observation space key.
space: The observation space.

Returns:
A list of feature names for the observation.
Comment on lines +393 to +409
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _modify_feature_names docstring mentions parameters like key/space and says it returns a list of names, but the function signature is (_modify_feature_names(self, features: dict[str, Any]) -> None) and it mutates features in place. Please update the docstring so that the documented args/returns match the implementation (or adjust the signature if the docstring is intended).

Suggested change
"""Get feature names for an observation based on its functor config.
Note:
The `space` parameter is kept for API consistency but not used
directly, as the feature names are derived from the functor config
and entity properties.
For observations generated by `get_object_uid`, returns meaningful names:
- RigidObject: object UID names
- Articulation/Robot: link names
Args:
key: The observation space key.
space: The observation space.
Returns:
A list of feature names for the observation.
"""Modify feature metadata in-place based on the functor configuration.
This function adjusts the feature definitions stored in ``features``:
- Ensures scalar features with empty shapes ``()`` are converted to ``(1,)``,
as required by LeRobot.
- For observations generated by :func:`get_object_uid`, assigns meaningful
``names`` values based on the corresponding asset:
- :class:`RigidObject`: the asset UID is used as the feature name.
- :class:`Articulation` / :class:`Robot`: link names are used as
the feature names.
Args:
features: A mapping from feature keys to metadata dictionaries. This
dictionary is modified in-place; no value is returned.

Copilot uses AI. Check for mistakes.
"""
from embodichain.lab.gym.envs.managers.observations import get_object_uid
from embodichain.lab.sim.objects import RigidObject, Articulation, Robot

# Change the features shape if is ()
for key, feature in features.items():
if feature["shape"] == ():
features[key]["shape"] = (1,)

for functor_name in self._env.observation_manager.active_functors["add"]:
functor_cfg = self._env.observation_manager.get_functor_cfg(
functor_name=functor_name
)
if functor_cfg.func == get_object_uid:
obs_key = functor_cfg.name
asset_uid = functor_cfg.params["entity_cfg"].uid
asset = self._env.sim.get_asset(asset_uid)
if isinstance(asset, RigidObject):
features[obs_key]["names"] = asset_uid
elif isinstance(asset, (Articulation, Robot)):
link_names = asset.link_names
features[obs_key]["names"] = link_names
else:
logger.log_warning(
f"Asset with UID '{asset_uid}' is not RigidObject, Articulation or Robot. Cannot assign feature names based on asset properties."
)

def _convert_frame_to_lerobot(
self, obs: TensorDict, action: TensorDict | torch.Tensor, task: str
) -> Dict:
Expand All @@ -401,16 +455,29 @@ def _convert_frame_to_lerobot(
# Add images
for sensor_name, value in sensor_obs_space.items():
sensor = self._env.get_sensor(sensor_name)
is_stereo = is_stereocam(sensor)

color_data = obs["sensor"][sensor_name]["color"]
color_img = color_data[:, :, :3].cpu()
frame[f"{sensor_name}.color"] = color_img
if isinstance(sensor, Camera):
is_stereo = is_stereocam(sensor)

color_data = obs["sensor"][sensor_name]["color"]
color_img = color_data[:, :, :3].cpu()
frame[f"{sensor_name}.color"] = color_img

if is_stereo:
color_right_data = obs["sensor"][sensor_name]["color_right"]
color_right_img = color_right_data[:, :, :3].cpu()
frame[f"{sensor_name}.color_right"] = color_right_img
if is_stereo:
color_right_data = obs["sensor"][sensor_name]["color_right"]
color_right_img = color_right_data[:, :, :3].cpu()
frame[f"{sensor_name}.color_right"] = color_right_img
elif isinstance(sensor, ContactSensor):
for frame_name in value.keys():
frame[f"{sensor_name}.{frame_name}"] = obs["sensor"][
sensor_name
][
frame_name
].cpu() # Debug here to inspect contact sensor data
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inline comment # Debug here to inspect contact sensor data looks like a leftover debugging note in production code. Consider removing it or converting it into a proper explanatory comment (e.g., describing why .cpu() is required) to avoid confusing future readers.

Suggested change
].cpu() # Debug here to inspect contact sensor data
].cpu() # Move contact sensor tensor to CPU for serialization

Copilot uses AI. Check for mistakes.
else:
logger.log_warning(
f"Unsupported sensor type for '{sensor_name}' when converting to LeRobot format. Currently only support Camera and ContactSensor."
)

# Add state
frame["observation.qpos"] = obs["robot"]["qpos"].cpu()
Expand All @@ -427,7 +494,9 @@ def _convert_frame_to_lerobot(
# Handle nested TensorDict (e.g., physics attributes)
self._add_nested_obs_to_frame(frame, key, value)
else:
frame[f"observation.{key}"] = value.cpu()
if value.shape == ():
value = value.unsqueeze(0)
frame[key] = value.cpu()

# Add action.
if isinstance(action, torch.Tensor):
Expand Down
35 changes: 35 additions & 0 deletions embodichain/lab/gym/envs/managers/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,41 @@ def get_object_body_scale(
return obj.get_body_scale()


def get_object_uid(
env: EmbodiedEnv,
obs: EnvObs,
entity_cfg: SceneEntityCfg,
) -> torch.Tensor:
"""Get the user IDs of the objects in the environment.

If the object with the specified UID does not exist in the environment,
a zero tensor will be returned.

Note:
- If asset is RigidObject, the user IDs is shaped as (num_envs,)
- If asset is Articulation or Robot, the user IDs is shaped as (num_envs, num_links) and ordered by
link_names in the configuration.

Args:
env: The environment instance.
obs: The observation dictionary.
entity_cfg: The configuration of the scene entity.

Returns:
A tensor of shape (num_envs,) representing the user IDs of the objects.
"""
if entity_cfg.uid not in env.sim.asset_uids:
return torch.zeros((env.num_envs,), dtype=torch.int32, device=env.device)

obj = env.sim.get_asset(entity_cfg.uid)
if isinstance(obj, (Articulation, Robot, RigidObject)) is False:
logger.log_error(
f"Object with UID '{entity_cfg.uid}' is not an Articulation, Robot or RigidObject. Currently only support getting user IDs for Articulation, Robot and RigidObject, please check again."
)

return obj.get_user_ids()


def get_rigid_object_velocity(
env: EmbodiedEnv,
obs: EnvObs,
Expand Down
42 changes: 34 additions & 8 deletions embodichain/lab/sim/objects/articulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,24 @@ def link_names(self) -> List[str]:
"""
return self._data.link_names

@cached_property
def user_ids(self) -> torch.Tensor:
"""Get the user-defined IDs of the articulation.

Note:
The return tensor has shape (num_instances, num_links), where each column corresponds to a link in the articulation.

Returns:
torch.Tensor: The user-defined IDs of the articulation with shape (num_instances, num_links).
"""
user_ids = torch.zeros(
(self.num_instances, self.num_links), dtype=torch.int32, device=self.device
)
for i, entity in enumerate(self._entities):
for j, link_name in enumerate(self.link_names):
user_ids[i, j] = entity.get_user_ids(link_name)[0]
return user_ids

@cached_property
def root_link_name(self) -> str:
"""Get the name of the root link of the articulation.
Expand Down Expand Up @@ -1330,22 +1348,30 @@ def get_joint_drive(
)[local_joint_ids_tensor]
return stiffness, damping, max_effort, max_velocity, friction

def get_user_ids(self, link_name: str | None = None) -> torch.Tensor:
def get_user_ids(
self, link_name: str | None = None, env_ids: Sequence[int] | None = None
) -> torch.Tensor:
"""Get the user ids of the articulation.

Args:
link_name: (str | None): The name of the link. If None, returns user ids for all links.
env_ids: (Sequence[int] | None): Environment indices. If None, then all indices are used.

Returns:
torch.Tensor: The user ids of the articulation with shape (N, 1) for given link_name or (N, num_links) if link_name is None.
"""
return torch.as_tensor(
np.array(
[entity.get_user_ids(link_name) for entity in self._entities],
),
dtype=torch.int32,
device=self.device,
)
if link_name is not None and link_name not in self.link_names:
logger.log_error(
f"Link name {link_name} not found in {self.__class__.__name__}. Available links: {self.link_names}"
)

local_env_ids = self._all_indices if env_ids is None else env_ids

if link_name is None:
return self.user_ids[local_env_ids]
else:
link_idx = self.link_names.index(link_name)
return self.user_ids[local_env_ids, link_idx]

def clear_dynamics(self, env_ids: Sequence[int] | None = None) -> None:
"""Clear the dynamics of the articulation.
Expand Down
Loading
Loading