Skip to content
33 changes: 33 additions & 0 deletions dimos/msgs/geometry_msgs/Point.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright 2025-2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from dimos_lcm.geometry_msgs import Point as LCMPoint


class Point(LCMPoint):
"""DimOS wrapper for geometry_msgs.Point (3D position).

Inherits x/y/z from LCMPoint. Wire-identical to Vector3 but
semantically represents a position, not a direction/displacement.
"""

msg_name = "geometry_msgs.Point"

def __init__(self, x: float = 0.0, y: float = 0.0, z: float = 0.0) -> None:
super().__init__(float(x), float(y), float(z))

def __repr__(self) -> str:
return f"Point(x={self.x}, y={self.y}, z={self.z})"
100 changes: 100 additions & 0 deletions dimos/msgs/geometry_msgs/PointStamped.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Copyright 2025-2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import time
from typing import TYPE_CHECKING, BinaryIO

if TYPE_CHECKING:
from rerun._baseclasses import Archetype

from dimos_lcm.geometry_msgs import PointStamped as LCMPointStamped

from dimos.msgs.geometry_msgs.Point import Point
from dimos.types.timestamped import Timestamped


class PointStamped(Point, Timestamped):
"""A 3D point with timestamp and frame_id.

Follows the same pattern as PoseStamped(Pose, Timestamped) and
TwistStamped(Twist, Timestamped). Inherits x/y/z from Point
(which inherits from LCMPoint).
"""

msg_name = "geometry_msgs.PointStamped"
ts: float
frame_id: str

def __init__(
self,
x: float = 0.0,
y: float = 0.0,
z: float = 0.0,
ts: float = 0.0,
frame_id: str = "",
) -> None:
self.frame_id = frame_id
self.ts = ts if ts != 0 else time.time()
super().__init__(float(x), float(y), float(z))

# -- LCM encode / decode --

def lcm_encode(self) -> bytes:
"""Encode to LCM binary format."""
lcm_msg = LCMPointStamped()
lcm_msg.point = self # Works because Point inherits from LCMPoint
[lcm_msg.header.stamp.sec, lcm_msg.header.stamp.nsec] = self.ros_timestamp()
lcm_msg.header.frame_id = self.frame_id
return lcm_msg.lcm_encode() # type: ignore[no-any-return]

@classmethod
def lcm_decode(cls, data: bytes | BinaryIO) -> PointStamped:
"""Decode from LCM binary format."""
lcm_msg = LCMPointStamped.lcm_decode(data)
return cls(
x=lcm_msg.point.x,
y=lcm_msg.point.y,
z=lcm_msg.point.z,
ts=lcm_msg.header.stamp.sec + (lcm_msg.header.stamp.nsec / 1_000_000_000),
frame_id=lcm_msg.header.frame_id,
)

# -- Conversion methods --

def to_rerun(self) -> Archetype:
"""Convert to rerun Points3D archetype for visualization."""
import rerun as rr

return rr.Points3D(positions=[[self.x, self.y, self.z]])

def to_pose_stamped(self) -> PoseStamped:
"""Convert to PoseStamped with identity quaternion orientation."""
from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped

return PoseStamped(
ts=self.ts,
frame_id=self.frame_id,
position=[self.x, self.y, self.z],
orientation=[0.0, 0.0, 0.0, 1.0],
)

# -- String representations --

def __str__(self) -> str:
return f"PointStamped(point=[{self.x:.3f}, {self.y:.3f}, {self.z:.3f}], frame_id={self.frame_id!r})"

def __repr__(self) -> str:
return f"PointStamped(x={self.x}, y={self.y}, z={self.z}, ts={self.ts}, frame_id={self.frame_id!r})"
4 changes: 4 additions & 0 deletions dimos/msgs/geometry_msgs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from dimos.msgs.geometry_msgs.Point import Point
from dimos.msgs.geometry_msgs.PointStamped import PointStamped
from dimos.msgs.geometry_msgs.Pose import Pose, PoseLike, to_pose
from dimos.msgs.geometry_msgs.PoseArray import PoseArray
from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped
Expand All @@ -14,6 +16,8 @@
from dimos.msgs.geometry_msgs.WrenchStamped import WrenchStamped

__all__ = [
"Point",
"PointStamped",
"Pose",
"PoseArray",
"PoseLike",
Expand Down
63 changes: 63 additions & 0 deletions dimos/msgs/geometry_msgs/test_PointStamped.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright 2025-2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for geometry_msgs.PointStamped — msg -> lcm bytes -> msg roundtrip."""

import time

from dimos_lcm.geometry_msgs import Point as LCMPoint

from dimos.msgs.geometry_msgs.PointStamped import Point, PointStamped


def test_point_inherits_lcm() -> None:
"""Point wrapper inherits from LCMPoint."""
assert isinstance(Point(1.0, 2.0, 3.0), LCMPoint)


def test_lcm_encode_decode() -> None:
"""Test encoding and decoding of PointStamped to/from binary LCM format."""
source = PointStamped(
x=1.5,
y=-2.5,
z=3.5,
ts=time.time(),
frame_id="/world/grid",
)
binary_msg = source.lcm_encode()
dest = PointStamped.lcm_decode(binary_msg)

assert isinstance(dest, PointStamped)
assert dest is not source
assert dest.x == source.x
assert dest.y == source.y
assert dest.z == source.z
assert abs(dest.ts - source.ts) < 1e-6
assert dest.frame_id == source.frame_id


def test_to_pose_stamped() -> None:
"""Test conversion to PoseStamped with identity orientation."""
from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped

pt = PointStamped(x=1.0, y=2.0, z=3.0, ts=500.0, frame_id="/map")
pose = pt.to_pose_stamped()

assert isinstance(pose, PoseStamped)
assert pose.x == 1.0
assert pose.y == 2.0
assert pose.z == 3.0
assert pose.orientation.w == 1.0
assert pose.ts == 500.0
assert pose.frame_id == "/map"
128 changes: 128 additions & 0 deletions dimos/robot/unitree/go2/blueprints/smart/_clicked_point_transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#!/usr/bin/env python3
# Copyright 2025-2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Transport that receives PointStamped clicks from Rerun and delivers PoseStamped.

Subscribes to an LCM topic carrying PointStamped (published by the Rerun
viewer fork) and converts each message to PoseStamped via
``PointStamped.to_pose_stamped()`` before delivering to stream subscribers.

No DimOS Module is needed -- the conversion lives entirely inside the
transport layer.
"""

from __future__ import annotations

from typing import TYPE_CHECKING, Any

from dimos.core.transport import PubSubTransport
from dimos.msgs.geometry_msgs import PoseStamped
from dimos.msgs.geometry_msgs.PointStamped import PointStamped
from dimos.protocol.pubsub.impl.lcmpubsub import LCM, Topic as LCMTopic
from dimos.utils.logging_config import setup_logger

if TYPE_CHECKING:
from collections.abc import Callable

from dimos.core.stream import Out, Stream

logger = setup_logger()


class ClickedPointTransport(PubSubTransport[PoseStamped]):
"""PubSubTransport that bridges ``/clicked_point`` PointStamped -> PoseStamped.

Internally subscribes to an LCM topic carrying ``PointStamped`` messages
(e.g. published by the Rerun viewer) and converts each one to
``PoseStamped`` with identity quaternion via ``PointStamped.to_pose_stamped()``.

Also supports local ``broadcast()`` so that other in-process producers
(RPC ``set_goal``, agent planners) can still publish ``PoseStamped``
directly through the same transport.

Usage in a blueprint::

from dimos.msgs.geometry_msgs import PoseStamped

my_blueprint = autoconnect(...).transports({
("goal_request", PoseStamped): ClickedPointTransport(),
})
"""

_started: bool = False

def __init__(self, clicked_point_topic: str = "/clicked_point") -> None:
super().__init__(clicked_point_topic)
self._click_lcm = LCM()
self._click_topic = LCMTopic(clicked_point_topic, PointStamped)
self._subscribers: list[Callable[[PoseStamped], Any]] = []

# -- PubSubTransport interface -------------------------------------------

def broadcast(self, _: Out[PoseStamped] | None, msg: PoseStamped) -> None:
"""Deliver a PoseStamped directly to all local subscribers."""
for cb in self._subscribers:
cb(msg)

def subscribe(
self,
callback: Callable[[PoseStamped], Any],
selfstream: Stream[PoseStamped] | None = None,
) -> Callable[[], None]:
"""Subscribe and also start listening for PointStamped clicks on LCM."""
if not self._started:
self.start()

self._subscribers.append(callback)

# Subscribe to the external PointStamped LCM topic; convert on receive.
unsub_lcm = self._click_lcm.subscribe(
self._click_topic,
lambda msg, _topic: self._on_click(msg, callback),
)

def unsubscribe() -> None:
if callback in self._subscribers:
self._subscribers.remove(callback)
unsub_lcm()

return unsubscribe

# -- Lifecycle -----------------------------------------------------------

def start(self) -> None:
if not self._started:
self._click_lcm.start()
self._started = True

def stop(self) -> None:
if self._started:
self._click_lcm.stop()
self._started = False

# -- Internal ------------------------------------------------------------

@staticmethod
def _on_click(
point_stamped: PointStamped,
callback: Callable[[PoseStamped], Any],
) -> None:
pose = point_stamped.to_pose_stamped()
logger.info(
"ClickedPointTransport",
point=f"({point_stamped.x:.2f}, {point_stamped.y:.2f}, {point_stamped.z:.2f})",
pose=str(pose),
)
callback(pose)
Loading
Loading