Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ros2cli/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<exec_depend>python3-packaging</exec_depend>
<exec_depend>python3-psutil</exec_depend>
<exec_depend>rclpy</exec_depend>
<exec_depend>fzf</exec_depend>

<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
Expand Down
61 changes: 61 additions & 0 deletions ros2cli/ros2cli/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
import functools
import inspect
import os
import subprocess
import sys
import time

from typing import Dict
from typing import List
from typing import Optional


def get_ros_domain_id():
Expand Down Expand Up @@ -133,3 +136,61 @@ def get_rmw_additional_env(rmw_implementation: str) -> Dict[str, str]:
return {
'RMW_IMPLEMENTATION': rmw_implementation,
}


def interactive_select(
items: List[str],
prompt: str = 'Select an item:'
) -> Optional[str]:
Comment on lines +141 to +144
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it miss tty detection? no check for whether stdin/stdout is a tty? i think this will cause scripts and CI/CD pipelines to hang, if it meets this case. related to this issue, fzf reads keyboard input from /dev/tty, not stdin. however, when ros2cli commands are piped or redirected, fzf may not be able to access the tty properly?

the followings are pseudo code,

    # Check if we're in an interactive terminal
    if not sys.stdin.isatty() or not sys.stdout.isatty():
        print('Error: Interactive selection requires a TTY terminal.', 
              file=sys.stderr)
        return None

and

try:
    # fzf needs direct access to /dev/tty for keyboard input
    tty = open('/dev/tty', 'r')
    process = subprocess.Popen(
        ['fzf', '--prompt', prompt + ' ', '--height', '40%', '--reverse'],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=tty,  # fzf uses stderr for display
        text=True
    )
    # ...
finally:
    tty.close()

"""
Launch interactive fuzzy search using fzf to select from a list of items.

:param items: List of items to select from
:param prompt: Prompt message to display in fzf
:return: Selected item or None if user cancelled or fzf not available
"""
if not items:
print('No items available to select from.', file=sys.stderr)
return None

try:
# Check if fzf is available
result = subprocess.run(
['fzf', '--version'],
capture_output=True,
text=True,
timeout=1
)
if result.returncode != 0:
raise FileNotFoundError()
except (FileNotFoundError, subprocess.TimeoutExpired):
print(
'Error: fzf is not installed but is a dependency for this package. You can install it with rosdep',
file=sys.stderr
)
return None
Comment on lines +156 to +171
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i believe this is redundant. i would do the following instead to check fzf availability.

import shutil

if shutil.which('fzf') is None:
    print('Error: fzf is not installed...', file=sys.stderr)
    return None


try:
# Launch fzf with items as input - using direct TTY access
process = subprocess.Popen(
['fzf', '--prompt', prompt + ' ', '--height', '40%', '--reverse'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True
)

# Send items to fzf
stdout, _ = process.communicate(input='\n'.join(items))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need to have any signal control here when opening the child process? If the user presses Ctrl+C, this may leave the terminal in a bad state or not clean up properly... maybe we can add try/except statement to catch the KeyboardInterrupt to call process.terminate()?

import signal

try:
    stdout, _ = process.communicate(input='\n'.join(items))
except KeyboardInterrupt:
    process.terminate()
    process.wait()
    return None

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

besides that, i would add the timeout just in case if fzf hangs for some reason, the entire command hangs indefinitely.


# Check if user cancelled (Ctrl-C or ESC)
if process.returncode != 0:
return None

# Return selected item (strip newline)
selected = stdout.strip()
return selected if selected else None

except Exception as e:
print(f'Error during interactive selection: {e}', file=sys.stderr)
return None
Comment on lines +193 to +195
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching all exceptions hides bugs and makes debugging harder.

Suggested change
except Exception as e:
print(f'Error during interactive selection: {e}', file=sys.stderr)
return None
except (OSError, subprocess.SubprocessError) as e:
print(f'Error during interactive selection: {e}', file=sys.stderr)
return None


17 changes: 17 additions & 0 deletions ros2interface/ros2interface/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,20 @@ def interface_to_yaml(identifier):
instance = interface()

return message_to_yaml(instance)


def get_all_interface_names():
"""Get all available interface names (messages, services, and actions)."""
types = []
for package_name, service_names in get_service_interfaces().items():
for service_name in service_names:
types.append(f'{package_name}/srv/{service_name}')

for package_name, message_names in get_message_interfaces().items():
for message_name in message_names:
types.append(f'{package_name}/msg/{message_name}')

for package_name, action_names in get_action_interfaces().items():
for action_name in action_names:
types.append(f'{package_name}/action/{action_name}')
return sorted(types)
17 changes: 17 additions & 0 deletions ros2interface/ros2interface/verb/show.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import sys
import typing

from ros2cli.helpers import interactive_select
from ros2interface.api import get_all_interface_names
from ros2interface.api import type_completer
from ros2interface.verb import VerbExtension
from rosidl_adapter.parser import \
Expand Down Expand Up @@ -190,13 +192,28 @@ def add_arguments(self, parser, cli_name):

arg = parser.add_argument(
'type',
nargs='?',
action=ReadStdinPipe,
help="Show an interface definition (e.g. 'example_interfaces/msg/String'). "
"If not provided, an interactive selection will be shown. "
"Passing '-' reads the argument from stdin (e.g. "
"'ros2 topic type /chatter | ros2 interface show -').")
arg.completer = type_completer

def main(self, *, args):
# If no type provided, launch interactive selection
if args.type is None:
interface_types = get_all_interface_names()

selected_type = interactive_select(
interface_types,
prompt='Select interface to show:')

if selected_type is None:
return 'No interface selected'

args.type = selected_type

try:
_show_interface(
args.type,
Expand Down
20 changes: 18 additions & 2 deletions ros2node/ros2node/verb/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import sys

from ros2cli.helpers import interactive_select
from ros2cli.node.strategy import add_arguments
from ros2cli.node.strategy import NodeStrategy
from ros2node.api import get_action_client_info
Expand All @@ -38,15 +39,30 @@ class InfoVerb(VerbExtension):
def add_arguments(self, parser, cli_name):
add_arguments(parser)
argument = parser.add_argument(
'node_name',
help='Fully qualified node name to request information')
'node_name', nargs='?',
help='Fully qualified node name to request information. '
'If not provided, an interactive selection will be shown.')
argument.completer = NodeNameCompleter()
parser.add_argument(
'--include-hidden', action='store_true',
help='Display hidden topics, services, and actions as well')

def main(self, *, args):
with NodeStrategy(args) as node:
# If no node name provided, launch interactive selection
if args.node_name is None:
node_names = get_node_names(node=node, include_hidden_nodes=args.include_hidden)
node_name_list = [n.full_name for n in node_names]

selected_node = interactive_select(
node_name_list,
prompt='Select node for info:')

if selected_node is None:
return 'No node selected'

args.node_name = selected_node

node_names = get_node_names(node=node, include_hidden_nodes=args.include_hidden)
count = [n.full_name for n in node_names].count(args.node_name)
if count > 1:
Expand Down
39 changes: 37 additions & 2 deletions ros2param/ros2param/verb/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
# limitations under the License.

from rcl_interfaces.msg import ParameterType
from ros2cli.helpers import interactive_select
from ros2cli.node.direct import DirectNode
from ros2cli.node.strategy import add_arguments
from ros2cli.node.strategy import NodeStrategy
from ros2node.api import get_absolute_node_name
from ros2node.api import get_node_names
from ros2node.api import NodeNameCompleter
from ros2node.api import wait_for_node
from ros2param.api import call_get_parameters
from ros2param.api import call_list_parameters
from ros2param.api import ParameterNameCompleter
from ros2param.verb import VerbExtension

Expand All @@ -30,14 +33,16 @@ class GetVerb(VerbExtension):
def add_arguments(self, parser, cli_name): # noqa: D102
add_arguments(parser)
arg = parser.add_argument(
'node_name', help='Name of the ROS node')
'node_name', nargs='?',
help='Name of the ROS node. If not provided, an interactive selection will be shown.')
arg.completer = NodeNameCompleter(
include_hidden_nodes_key='include_hidden_nodes')
parser.add_argument(
'--include-hidden-nodes', action='store_true',
help='Consider hidden nodes as well')
arg = parser.add_argument(
'parameter_name', help='Name of the parameter')
'parameter_name', nargs='?',
help='Name of the parameter. If not provided, an interactive selection will be shown.')
arg.completer = ParameterNameCompleter()
parser.add_argument(
'--hide-type', action='store_true',
Expand All @@ -47,6 +52,36 @@ def add_arguments(self, parser, cli_name): # noqa: D102
help='Wait for N seconds until node becomes available (default %(default)s sec)')

def main(self, *, args): # noqa: D102
# If no node name provided, launch interactive selection
if args.node_name is None:
with NodeStrategy(args) as node:
node_names = get_node_names(node=node, include_hidden_nodes=args.include_hidden_nodes)
node_name_list = [n.full_name for n in node_names]

selected_node = interactive_select(
node_name_list,
prompt='Select node:')

if selected_node is None:
return 'No node selected'

args.node_name = selected_node

# If no parameter name provided, launch interactive selection
if args.parameter_name is None:
with DirectNode(args) as node:
parameter_names = call_list_parameters(
node=node, node_name=args.node_name)

selected_param = interactive_select(
parameter_names,
prompt='Select parameter:')

if selected_param is None:
return 'No parameter selected'

args.parameter_name = selected_param

node_name = get_absolute_node_name(args.node_name)
with NodeStrategy(args) as node:
if not wait_for_node(node, node_name, args.include_hidden_nodes, args.timeout):
Expand Down
24 changes: 22 additions & 2 deletions ros2service/ros2service/verb/call.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
from rclpy.qos import QoSProfile

from ros2cli.helpers import collect_stdin
from ros2cli.helpers import interactive_select
from ros2cli.node import NODE_NAME_PREFIX
from ros2cli.node.strategy import NodeStrategy
from ros2cli.qos import add_qos_arguments
from ros2cli.qos import profile_configure_short_keys

from ros2service.api import get_service_names
from ros2service.api import ServiceNameCompleter
from ros2service.api import ServicePrototypeCompleter
from ros2service.api import ServiceTypeCompleter
Expand All @@ -40,8 +43,9 @@ class CallVerb(VerbExtension):

def add_arguments(self, parser, cli_name):
arg = parser.add_argument(
'service_name',
help="Name of the ROS service to call to (e.g. '/add_two_ints')")
'service_name', nargs='?',
help="Name of the ROS service to call to (e.g. '/add_two_ints'). "
"If not provided, an interactive selection will be shown.")
arg.completer = ServiceNameCompleter(
include_hidden_services_key='include_hidden_services')
arg = parser.add_argument(
Expand All @@ -66,6 +70,22 @@ def add_arguments(self, parser, cli_name):
add_qos_arguments(parser, 'service client', 'services_default')

def main(self, *, args):
# If no service name provided, launch interactive selection
if args.service_name is None:
with NodeStrategy(args) as node:
service_names = get_service_names(
node=node,
include_hidden_services=args.include_hidden_services)

selected_service = interactive_select(
service_names,
prompt='Select service to call:')

if selected_service is None:
return 'No service selected'

args.service_name = selected_service

if args.rate is not None and args.rate <= 0:
raise RuntimeError('rate must be greater than zero')
period = 1. / args.rate if args.rate else None
Expand Down
18 changes: 17 additions & 1 deletion ros2topic/ros2topic/verb/bw.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@

from rclpy.executors import ExternalShutdownException

from ros2cli.helpers import interactive_select
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
from ros2cli.node.direct import DirectNode
from ros2cli.qos import add_qos_arguments
from ros2cli.qos import choose_qos

from ros2topic.api import get_msg_class
from ros2topic.api import get_topic_names
from ros2topic.api import get_topic_names_and_types
from ros2topic.api import positive_int
from ros2topic.api import TopicNameCompleter
Expand Down Expand Up @@ -101,8 +103,22 @@ def main(self, *, args):


def main(args):
# Interactive selection if no topic name and not --all
if not args.all_topics and not args.topic_name:
raise RuntimeError('Either specify topic names or use --all/-a option')
with DirectNode(args) as node:
topic_names = get_topic_names(
node=node.node,
include_hidden_topics=args.include_hidden_topics)

selected_topic = interactive_select(
topic_names,
prompt='Select topic for bw:')

if selected_topic is None:
return 'No topic selected'

args.topic_name = [selected_topic]

if args.all_topics and args.topic_name:
raise RuntimeError('Cannot specify both --all/-a and topic names')

Expand Down
23 changes: 21 additions & 2 deletions ros2topic/ros2topic/verb/echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
from rclpy.qos import QoSProfile
from rclpy.task import Future

from ros2cli.helpers import interactive_select
from ros2cli.helpers import unsigned_int
from ros2cli.node.strategy import add_arguments as add_strategy_node_arguments
from ros2cli.node.strategy import NodeStrategy
from ros2cli.qos import add_qos_arguments
from ros2cli.qos import choose_qos

from ros2topic.api import get_msg_class
from ros2topic.api import get_topic_names
from ros2topic.api import positive_float
from ros2topic.api import TopicNameCompleter
from ros2topic.verb import VerbExtension
Expand All @@ -51,8 +53,9 @@ def add_arguments(self, parser, cli_name):
add_strategy_node_arguments(parser)

arg = parser.add_argument(
'topic_name',
help="Name of the ROS topic to listen to (e.g. '/chatter')")
'topic_name', nargs='?',
help="Name of the ROS topic to listen to (e.g. '/chatter'). "
"If not provided, an interactive selection will be shown.")
arg.completer = TopicNameCompleter(
include_hidden_topics_key='include_hidden_topics')
parser.add_argument(
Expand Down Expand Up @@ -121,6 +124,22 @@ def add_arguments(self, parser, cli_name):

def main(self, *, args):

# If no topic name provided, launch interactive selection
if args.topic_name is None:
with NodeStrategy(args) as node:
topic_names = get_topic_names(
node=node,
include_hidden_topics=args.include_hidden_topics)

selected_topic = interactive_select(
topic_names,
prompt='Select topic to echo:')

if selected_topic is None:
return 'No topic selected'

args.topic_name = selected_topic

self.csv = args.csv

# Validate field selection
Expand Down
Loading