Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion dimos/robot/cli/dimos.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,10 @@ def humancli(ctx: typer.Context) -> None:
@topic_app.command()
def echo(
topic: str = typer.Argument(..., help="Topic name to listen on (e.g., /goal_request)"),
type_name: str = typer.Argument(..., help="Message type (e.g., PoseStamped)"),
type_name: str | None = typer.Argument(
None,
help="Optional message type (e.g., PoseStamped). If omitted, infer from '/topic#pkg.Msg'.",
),
) -> None:
topic_echo(topic, type_name)

Expand Down
55 changes: 44 additions & 11 deletions dimos/robot/cli/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import importlib
import re
import time

import typer

from dimos.core.transport import LCMTransport, pLCMTransport
from dimos.protocol.pubsub.lcmpubsub import LCMPubSubBase

_modules_to_try = [
"dimos.msgs.geometry_msgs",
Expand All @@ -42,24 +46,53 @@ def _resolve_type(type_name: str) -> type:
raise ValueError(f"Could not find type '{type_name}' in any known message modules")


def topic_echo(topic: str, type_name: str) -> None:
msg_type = _resolve_type(type_name)
use_pickled = getattr(msg_type, "lcm_encode", None) is None
transport: pLCMTransport[object] | LCMTransport[object] = (
pLCMTransport(topic) if use_pickled else LCMTransport(topic, msg_type)
)

def _on_message(msg: object) -> None:
print(msg)
def topic_echo(topic: str, type_name: str | None) -> None:
# Explicit mode (legacy): unchanged.
if type_name is not None:
msg_type = _resolve_type(type_name)
use_pickled = getattr(msg_type, "lcm_encode", None) is None
transport: pLCMTransport[object] | LCMTransport[object] = (
pLCMTransport(topic) if use_pickled else LCMTransport(topic, msg_type)
)

transport.subscribe(_on_message)
def _on_message(msg: object) -> None:
print(msg)

typer.echo(f"Listening on {topic} for {type_name} messages... (Ctrl+C to stop)")
transport.subscribe(_on_message)
typer.echo(f"Listening on {topic} for {type_name} messages... (Ctrl+C to stop)")
try:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
typer.echo("\nStopped.")
return

# Inferred typed mode: listen on /topic#pkg.Msg and decode from the msg_name suffix.
bus = LCMPubSubBase(autoconf=True)
bus.start() # starts threaded handle loop

typed_pattern = rf"^{re.escape(topic)}#.*"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P3] The regex pattern ^{topic}#.* will match channels that start with the topic but could match unintended channels if the topic is a prefix of another topic. For example, if topic is /odom, it will also match /odom_extended#pkg.Msg. Consider using ^{topic}#[^#]+$ to be more precise.


def on_msg(channel: str, data: bytes) -> None:
_, msg_name = channel.split("#", 1) # e.g. "nav_msgs.Odometry"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] If channel doesn't contain #, split will return only one element, causing ValueError: not enough values to unpack on the unpacking. Consider validating the channel format or using try-except.

pkg, cls_name = msg_name.split(".", 1) # "nav_msgs", "Odometry"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] If msg_name doesn't contain ., split will return only one element, causing ValueError: not enough values to unpack. This happens if someone publishes to a channel like /topic#BadFormat without the package prefix.

module = importlib.import_module(f"dimos.msgs.{pkg}")
cls = getattr(module, cls_name)
Comment on lines +79 to +80
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] No error handling for import failures or missing attributes. If the package doesn't exist in dimos.msgs or the class doesn't exist in the module, this will raise ModuleNotFoundError or AttributeError.

print(cls.lcm_decode(data))
Comment on lines +77 to +81
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P0] Missing error handling for channel parsing and message decoding. If a channel doesn't contain # or ., or if the package/class doesn't exist, this will crash with an unhandled exception. The callback should catch exceptions and print error messages instead of crashing the listening loop.

Triggering conditions: malformed channel names, missing message packages, or corrupted message data.


assert bus.l is not None
bus.l.subscribe(typed_pattern, on_msg)

typer.echo(
f"Listening on {topic} (inferring from typed LCM channels like '{topic}#pkg.Msg')... "
"(Ctrl+C to stop)"
)

try:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
bus.stop()
typer.echo("\nStopped.")


Expand Down
6 changes: 6 additions & 0 deletions docs/concepts/transports.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ lcm.stop()
Received velocity: x=1.0, y=0.0, z=0.5
```

### Inspecting LCM traffic (CLI)

- `dimos lcmspy` shows topic frequency/bandwidth stats.
- `dimos topic echo /topic` listens on typed channels like `/topic#pkg.Msg` and decodes automatically.
- `dimos topic echo /topic TypeName` is the explicit legacy form.

## Encoder Mixins

Transports can use encoder mixins to serialize messages. The `PubSubEncoderMixin` pattern wraps publish/subscribe to encode/decode automatically:
Expand Down
Loading