From 36866492e5b3bf05fb7c1a579eda674935beb7e4 Mon Sep 17 00:00:00 2001 From: Tony Najjar Date: Sun, 21 Dec 2025 21:08:21 +0100 Subject: [PATCH 1/4] Add fuzzy finding Signed-off-by: Tony Najjar --- ros2cli/ros2cli/helpers.py | 68 +++++++++++++++++++++ ros2interface/ros2interface/api/__init__.py | 17 ++++++ ros2interface/ros2interface/verb/show.py | 17 ++++++ ros2node/ros2node/verb/info.py | 20 +++++- ros2param/ros2param/verb/get.py | 39 +++++++++++- ros2service/ros2service/verb/call.py | 24 +++++++- ros2topic/ros2topic/verb/bw.py | 18 +++++- ros2topic/ros2topic/verb/echo.py | 23 ++++++- ros2topic/ros2topic/verb/hz.py | 18 +++++- ros2topic/ros2topic/verb/info.py | 22 ++++++- ros2topic/ros2topic/verb/pub.py | 24 +++++++- 11 files changed, 276 insertions(+), 14 deletions(-) diff --git a/ros2cli/ros2cli/helpers.py b/ros2cli/ros2cli/helpers.py index abffcd489..a4225d5c6 100644 --- a/ros2cli/ros2cli/helpers.py +++ b/ros2cli/ros2cli/helpers.py @@ -16,10 +16,13 @@ import functools import inspect import os +import subprocess import sys import time from typing import Dict +from typing import List +from typing import Optional def get_ros_domain_id(): @@ -133,3 +136,68 @@ def get_rmw_additional_env(rmw_implementation: str) -> Dict[str, str]: return { 'RMW_IMPLEMENTATION': rmw_implementation, } + + +def interactive_select( + items: List[str], + prompt: str = 'Select an item:', + multi: bool = False +) -> Optional[str]: + """ + Launch interactive fuzzy search using fzf to select from a list of items. + + :param items: List of items to select from + :param prompt: Prompt message to display in fzf + :param multi: Allow multiple selections (not yet implemented) + :return: Selected item or None if user cancelled or fzf not available + """ + if not items: + print('No items available to select from.', file=sys.stderr) + return None + + try: + # Check if fzf is available + result = subprocess.run( + ['fzf', '--version'], + capture_output=True, + text=True, + timeout=1 + ) + if result.returncode != 0: + raise FileNotFoundError() + except (FileNotFoundError, subprocess.TimeoutExpired): + print( + 'Error: fzf is not installed. Interactive selection requires fzf.', + file=sys.stderr + ) + print('Install fzf: https://github.com/junegunn/fzf#installation', file=sys.stderr) + print(' - Ubuntu/Debian: sudo apt install fzf', file=sys.stderr) + print(' - Fedora: sudo dnf install fzf', file=sys.stderr) + print(' - macOS: brew install fzf', file=sys.stderr) + return None + + try: + # Launch fzf with items as input + process = subprocess.Popen( + ['fzf', '--prompt', prompt + ' ', '--height', '40%', '--reverse'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + + # Send items to fzf + stdout, stderr = process.communicate(input='\n'.join(items)) + + # Check if user cancelled (Ctrl-C or ESC) + if process.returncode != 0: + return None + + # Return selected item (strip newline) + selected = stdout.strip() + return selected if selected else None + + except Exception as e: + print(f'Error during interactive selection: {e}', file=sys.stderr) + return None + diff --git a/ros2interface/ros2interface/api/__init__.py b/ros2interface/ros2interface/api/__init__.py index 362050d4c..6be3c6502 100644 --- a/ros2interface/ros2interface/api/__init__.py +++ b/ros2interface/ros2interface/api/__init__.py @@ -52,3 +52,20 @@ def interface_to_yaml(identifier): instance = interface() return message_to_yaml(instance) + + +def get_all_interface_names(): + """Get all available interface names (messages, services, and actions).""" + types = [] + for package_name, service_names in get_service_interfaces().items(): + for service_name in service_names: + types.append(f'{package_name}/srv/{service_name}') + + for package_name, message_names in get_message_interfaces().items(): + for message_name in message_names: + types.append(f'{package_name}/msg/{message_name}') + + for package_name, action_names in get_action_interfaces().items(): + for action_name in action_names: + types.append(f'{package_name}/action/{action_name}') + return sorted(types) diff --git a/ros2interface/ros2interface/verb/show.py b/ros2interface/ros2interface/verb/show.py index d97c713cd..0e95f65ba 100644 --- a/ros2interface/ros2interface/verb/show.py +++ b/ros2interface/ros2interface/verb/show.py @@ -16,6 +16,8 @@ import sys import typing +from ros2cli.helpers import interactive_select +from ros2interface.api import get_all_interface_names from ros2interface.api import type_completer from ros2interface.verb import VerbExtension from rosidl_adapter.parser import \ @@ -190,13 +192,28 @@ def add_arguments(self, parser, cli_name): arg = parser.add_argument( 'type', + nargs='?', action=ReadStdinPipe, help="Show an interface definition (e.g. 'example_interfaces/msg/String'). " + "If not provided, an interactive selection will be shown. " "Passing '-' reads the argument from stdin (e.g. " "'ros2 topic type /chatter | ros2 interface show -').") arg.completer = type_completer def main(self, *, args): + # If no type provided, launch interactive selection + if args.type is None: + interface_types = get_all_interface_names() + + selected_type = interactive_select( + interface_types, + prompt='Select interface to show:') + + if selected_type is None: + return 'No interface selected' + + args.type = selected_type + try: _show_interface( args.type, diff --git a/ros2node/ros2node/verb/info.py b/ros2node/ros2node/verb/info.py index 0eed17d26..273af6e9b 100644 --- a/ros2node/ros2node/verb/info.py +++ b/ros2node/ros2node/verb/info.py @@ -14,6 +14,7 @@ import sys +from ros2cli.helpers import interactive_select from ros2cli.node.strategy import add_arguments from ros2cli.node.strategy import NodeStrategy from ros2node.api import get_action_client_info @@ -38,8 +39,9 @@ class InfoVerb(VerbExtension): def add_arguments(self, parser, cli_name): add_arguments(parser) argument = parser.add_argument( - 'node_name', - help='Fully qualified node name to request information') + 'node_name', nargs='?', + help='Fully qualified node name to request information. ' + 'If not provided, an interactive selection will be shown.') argument.completer = NodeNameCompleter() parser.add_argument( '--include-hidden', action='store_true', @@ -47,6 +49,20 @@ def add_arguments(self, parser, cli_name): def main(self, *, args): with NodeStrategy(args) as node: + # If no node name provided, launch interactive selection + if args.node_name is None: + node_names = get_node_names(node=node, include_hidden_nodes=args.include_hidden) + node_name_list = [n.full_name for n in node_names] + + selected_node = interactive_select( + node_name_list, + prompt='Select node for info:') + + if selected_node is None: + return 'No node selected' + + args.node_name = selected_node + node_names = get_node_names(node=node, include_hidden_nodes=args.include_hidden) count = [n.full_name for n in node_names].count(args.node_name) if count > 1: diff --git a/ros2param/ros2param/verb/get.py b/ros2param/ros2param/verb/get.py index facfb7fc6..20ebd12f2 100644 --- a/ros2param/ros2param/verb/get.py +++ b/ros2param/ros2param/verb/get.py @@ -13,13 +13,16 @@ # limitations under the License. from rcl_interfaces.msg import ParameterType +from ros2cli.helpers import interactive_select from ros2cli.node.direct import DirectNode from ros2cli.node.strategy import add_arguments from ros2cli.node.strategy import NodeStrategy from ros2node.api import get_absolute_node_name +from ros2node.api import get_node_names from ros2node.api import NodeNameCompleter from ros2node.api import wait_for_node from ros2param.api import call_get_parameters +from ros2param.api import call_list_parameters from ros2param.api import ParameterNameCompleter from ros2param.verb import VerbExtension @@ -30,14 +33,16 @@ class GetVerb(VerbExtension): def add_arguments(self, parser, cli_name): # noqa: D102 add_arguments(parser) arg = parser.add_argument( - 'node_name', help='Name of the ROS node') + 'node_name', nargs='?', + help='Name of the ROS node. If not provided, an interactive selection will be shown.') arg.completer = NodeNameCompleter( include_hidden_nodes_key='include_hidden_nodes') parser.add_argument( '--include-hidden-nodes', action='store_true', help='Consider hidden nodes as well') arg = parser.add_argument( - 'parameter_name', help='Name of the parameter') + 'parameter_name', nargs='?', + help='Name of the parameter. If not provided, an interactive selection will be shown.') arg.completer = ParameterNameCompleter() parser.add_argument( '--hide-type', action='store_true', @@ -47,6 +52,36 @@ def add_arguments(self, parser, cli_name): # noqa: D102 help='Wait for N seconds until node becomes available (default %(default)s sec)') def main(self, *, args): # noqa: D102 + # If no node name provided, launch interactive selection + if args.node_name is None: + with NodeStrategy(args) as node: + node_names = get_node_names(node=node, include_hidden_nodes=args.include_hidden_nodes) + node_name_list = [n.full_name for n in node_names] + + selected_node = interactive_select( + node_name_list, + prompt='Select node:') + + if selected_node is None: + return 'No node selected' + + args.node_name = selected_node + + # If no parameter name provided, launch interactive selection + if args.parameter_name is None: + with DirectNode(args) as node: + parameter_names = call_list_parameters( + node=node, node_name=args.node_name) + + selected_param = interactive_select( + parameter_names, + prompt='Select parameter:') + + if selected_param is None: + return 'No parameter selected' + + args.parameter_name = selected_param + node_name = get_absolute_node_name(args.node_name) with NodeStrategy(args) as node: if not wait_for_node(node, node_name, args.include_hidden_nodes, args.timeout): diff --git a/ros2service/ros2service/verb/call.py b/ros2service/ros2service/verb/call.py index ffc521aed..cf0326548 100644 --- a/ros2service/ros2service/verb/call.py +++ b/ros2service/ros2service/verb/call.py @@ -20,10 +20,13 @@ from rclpy.qos import QoSProfile from ros2cli.helpers import collect_stdin +from ros2cli.helpers import interactive_select from ros2cli.node import NODE_NAME_PREFIX +from ros2cli.node.strategy import NodeStrategy from ros2cli.qos import add_qos_arguments from ros2cli.qos import profile_configure_short_keys +from ros2service.api import get_service_names from ros2service.api import ServiceNameCompleter from ros2service.api import ServicePrototypeCompleter from ros2service.api import ServiceTypeCompleter @@ -40,8 +43,9 @@ class CallVerb(VerbExtension): def add_arguments(self, parser, cli_name): arg = parser.add_argument( - 'service_name', - help="Name of the ROS service to call to (e.g. '/add_two_ints')") + 'service_name', nargs='?', + help="Name of the ROS service to call to (e.g. '/add_two_ints'). " + "If not provided, an interactive selection will be shown.") arg.completer = ServiceNameCompleter( include_hidden_services_key='include_hidden_services') arg = parser.add_argument( @@ -66,6 +70,22 @@ def add_arguments(self, parser, cli_name): add_qos_arguments(parser, 'service client', 'services_default') def main(self, *, args): + # If no service name provided, launch interactive selection + if args.service_name is None: + with NodeStrategy(args) as node: + service_names = get_service_names( + node=node, + include_hidden_services=args.include_hidden_services) + + selected_service = interactive_select( + service_names, + prompt='Select service to call:') + + if selected_service is None: + return 'No service selected' + + args.service_name = selected_service + if args.rate is not None and args.rate <= 0: raise RuntimeError('rate must be greater than zero') period = 1. / args.rate if args.rate else None diff --git a/ros2topic/ros2topic/verb/bw.py b/ros2topic/ros2topic/verb/bw.py index 85158287a..45a08ff17 100644 --- a/ros2topic/ros2topic/verb/bw.py +++ b/ros2topic/ros2topic/verb/bw.py @@ -41,12 +41,14 @@ from rclpy.executors import ExternalShutdownException +from ros2cli.helpers import interactive_select from ros2cli.node.direct import add_arguments as add_direct_node_arguments from ros2cli.node.direct import DirectNode from ros2cli.qos import add_qos_arguments from ros2cli.qos import choose_qos from ros2topic.api import get_msg_class +from ros2topic.api import get_topic_names from ros2topic.api import get_topic_names_and_types from ros2topic.api import positive_int from ros2topic.api import TopicNameCompleter @@ -101,8 +103,22 @@ def main(self, *, args): def main(args): + # Interactive selection if no topic name and not --all if not args.all_topics and not args.topic_name: - raise RuntimeError('Either specify topic names or use --all/-a option') + with DirectNode(args) as node: + topic_names = get_topic_names( + node=node.node, + include_hidden_topics=args.include_hidden_topics) + + selected_topic = interactive_select( + topic_names, + prompt='Select topic for bw:') + + if selected_topic is None: + return 'No topic selected' + + args.topic_name = [selected_topic] + if args.all_topics and args.topic_name: raise RuntimeError('Cannot specify both --all/-a and topic names') diff --git a/ros2topic/ros2topic/verb/echo.py b/ros2topic/ros2topic/verb/echo.py index 69657110f..c835fbe86 100644 --- a/ros2topic/ros2topic/verb/echo.py +++ b/ros2topic/ros2topic/verb/echo.py @@ -23,6 +23,7 @@ from rclpy.qos import QoSProfile from rclpy.task import Future +from ros2cli.helpers import interactive_select from ros2cli.helpers import unsigned_int from ros2cli.node.strategy import add_arguments as add_strategy_node_arguments from ros2cli.node.strategy import NodeStrategy @@ -30,6 +31,7 @@ from ros2cli.qos import choose_qos from ros2topic.api import get_msg_class +from ros2topic.api import get_topic_names from ros2topic.api import positive_float from ros2topic.api import TopicNameCompleter from ros2topic.verb import VerbExtension @@ -51,8 +53,9 @@ def add_arguments(self, parser, cli_name): add_strategy_node_arguments(parser) arg = parser.add_argument( - 'topic_name', - help="Name of the ROS topic to listen to (e.g. '/chatter')") + 'topic_name', nargs='?', + help="Name of the ROS topic to listen to (e.g. '/chatter'). " + "If not provided, an interactive selection will be shown.") arg.completer = TopicNameCompleter( include_hidden_topics_key='include_hidden_topics') parser.add_argument( @@ -121,6 +124,22 @@ def add_arguments(self, parser, cli_name): def main(self, *, args): + # If no topic name provided, launch interactive selection + if args.topic_name is None: + with NodeStrategy(args) as node: + topic_names = get_topic_names( + node=node, + include_hidden_topics=args.include_hidden_topics) + + selected_topic = interactive_select( + topic_names, + prompt='Select topic to echo:') + + if selected_topic is None: + return 'No topic selected' + + args.topic_name = selected_topic + self.csv = args.csv # Validate field selection diff --git a/ros2topic/ros2topic/verb/hz.py b/ros2topic/ros2topic/verb/hz.py index 335dbd9a0..5d7fee0ac 100644 --- a/ros2topic/ros2topic/verb/hz.py +++ b/ros2topic/ros2topic/verb/hz.py @@ -42,12 +42,14 @@ from rclpy.clock import ClockType from rclpy.executors import ExternalShutdownException +from ros2cli.helpers import interactive_select from ros2cli.node.direct import add_arguments as add_direct_node_arguments from ros2cli.node.direct import DirectNode from ros2cli.qos import add_qos_arguments from ros2cli.qos import choose_qos from ros2topic.api import get_msg_class +from ros2topic.api import get_topic_names from ros2topic.api import get_topic_names_and_types from ros2topic.api import positive_int from ros2topic.api import TopicNameCompleter @@ -100,8 +102,22 @@ def main(self, *, args): def main(args): + # Interactive selection if no topic name and not --all if not args.all_topics and not args.topic_name: - raise RuntimeError('Either specify topic names or use --all/-a option') + with DirectNode(args) as node: + topic_names = get_topic_names( + node=node.node, + include_hidden_topics=args.include_hidden_topics) + + selected_topic = interactive_select( + topic_names, + prompt='Select topic for hz:') + + if selected_topic is None: + return 'No topic selected' + + args.topic_name = [selected_topic] + if args.all_topics and args.topic_name: raise RuntimeError('Cannot specify both --all/-a and topic names') diff --git a/ros2topic/ros2topic/verb/info.py b/ros2topic/ros2topic/verb/info.py index 39ffabceb..c1a311643 100644 --- a/ros2topic/ros2topic/verb/info.py +++ b/ros2topic/ros2topic/verb/info.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +from ros2cli.helpers import interactive_select from ros2cli.node.strategy import add_arguments as add_strategy_node_arguments from ros2cli.node.strategy import NodeStrategy +from ros2topic.api import get_topic_names from ros2topic.api import get_topic_names_and_types from ros2topic.api import TopicNameCompleter from ros2topic.verb import VerbExtension @@ -25,8 +27,9 @@ class InfoVerb(VerbExtension): def add_arguments(self, parser, cli_name): add_strategy_node_arguments(parser) arg = parser.add_argument( - 'topic_name', - help="Name of the ROS topic to get info (e.g. '/chatter')") + 'topic_name', nargs='?', + help="Name of the ROS topic to get info (e.g. '/chatter'). " + "If not provided, an interactive selection will be shown.") parser.add_argument( '--verbose', '-v', @@ -39,6 +42,21 @@ def add_arguments(self, parser, cli_name): def main(self, *, args): with NodeStrategy(args) as node: + # If no topic name provided, launch interactive selection + if args.topic_name is None: + topic_names = get_topic_names( + node=node, + include_hidden_topics=args.include_hidden_topics) + + selected_topic = interactive_select( + topic_names, + prompt='Select topic for info:') + + if selected_topic is None: + return 'No topic selected' + + args.topic_name = selected_topic + topic_names_and_types = get_topic_names_and_types( node=node, include_hidden_topics=True) topic_name = args.topic_name diff --git a/ros2topic/ros2topic/verb/pub.py b/ros2topic/ros2topic/verb/pub.py index 9732ab1ee..9f3bd27ed 100644 --- a/ros2topic/ros2topic/verb/pub.py +++ b/ros2topic/ros2topic/verb/pub.py @@ -23,11 +23,13 @@ from rclpy.qos import QoSProfile from ros2cli.helpers import collect_stdin +from ros2cli.helpers import interactive_select from ros2cli.node.direct import add_arguments as add_direct_node_arguments from ros2cli.node.direct import DirectNode from ros2cli.qos import add_qos_arguments from ros2cli.qos import profile_configure_short_keys +from ros2topic.api import get_topic_names from ros2topic.api import positive_float from ros2topic.api import TopicMessagePrototypeCompleter, YamlCompletionFinder from ros2topic.api import TopicNameCompleter @@ -56,8 +58,9 @@ class PubVerb(VerbExtension): def add_arguments(self, parser, cli_name): arg = parser.add_argument( - 'topic_name', - help="Name of the ROS topic to publish to (e.g. '/chatter')") + 'topic_name', nargs='?', + help="Name of the ROS topic to publish to (e.g. '/chatter'). " + "If not provided, an interactive selection will be shown.") arg.completer = TopicNameCompleter( include_hidden_topics_key='include_hidden_topics') arg = parser.add_argument( @@ -126,6 +129,23 @@ def main(self, *, args): def main(args): + # If no topic name provided, launch interactive selection + if args.topic_name is None: + from ros2cli.node.strategy import NodeStrategy + with NodeStrategy(args) as node: + topic_names = get_topic_names( + node=node, + include_hidden_topics=args.include_hidden_topics) + + selected_topic = interactive_select( + topic_names, + prompt='Select topic to publish to:') + + if selected_topic is None: + return 'No topic selected' + + args.topic_name = selected_topic + qos_profile_name = args.qos_profile qos_profile = rclpy.qos.QoSPresetProfiles.get_from_short_key(qos_profile_name) profile_configure_short_keys( From a6337feea7ae1e1c85db4aa37c7db071609f2cda Mon Sep 17 00:00:00 2001 From: Tony Najjar Date: Sun, 21 Dec 2025 21:55:33 +0100 Subject: [PATCH 2/4] fix tty Signed-off-by: Tony Najjar --- ros2cli/ros2cli/helpers.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ros2cli/ros2cli/helpers.py b/ros2cli/ros2cli/helpers.py index a4225d5c6..30b4dc068 100644 --- a/ros2cli/ros2cli/helpers.py +++ b/ros2cli/ros2cli/helpers.py @@ -177,17 +177,16 @@ def interactive_select( return None try: - # Launch fzf with items as input + # Launch fzf with items as input - using direct TTY access process = subprocess.Popen( ['fzf', '--prompt', prompt + ' ', '--height', '40%', '--reverse'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, text=True ) # Send items to fzf - stdout, stderr = process.communicate(input='\n'.join(items)) + stdout, _ = process.communicate(input='\n'.join(items)) # Check if user cancelled (Ctrl-C or ESC) if process.returncode != 0: From 6c35ddc28dcba195bddc0a61a181faf67381fdd7 Mon Sep 17 00:00:00 2001 From: Tony Najjar Date: Sun, 21 Dec 2025 22:01:46 +0100 Subject: [PATCH 3/4] add fzf as dependency Signed-off-by: Tony Najjar --- ros2cli/package.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/ros2cli/package.xml b/ros2cli/package.xml index 95a86dcd8..3b395b9ee 100644 --- a/ros2cli/package.xml +++ b/ros2cli/package.xml @@ -21,6 +21,7 @@ python3-packaging python3-psutil rclpy + fzf ament_copyright ament_flake8 From e98a297dfaef468365cbdcbe8f794cf72b4dd703 Mon Sep 17 00:00:00 2001 From: Tony Najjar Date: Sun, 21 Dec 2025 22:11:33 +0100 Subject: [PATCH 4/4] small fixes Signed-off-by: Tony Najjar --- ros2cli/ros2cli/helpers.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/ros2cli/ros2cli/helpers.py b/ros2cli/ros2cli/helpers.py index 30b4dc068..d822ae655 100644 --- a/ros2cli/ros2cli/helpers.py +++ b/ros2cli/ros2cli/helpers.py @@ -140,15 +140,13 @@ def get_rmw_additional_env(rmw_implementation: str) -> Dict[str, str]: def interactive_select( items: List[str], - prompt: str = 'Select an item:', - multi: bool = False + prompt: str = 'Select an item:' ) -> Optional[str]: """ Launch interactive fuzzy search using fzf to select from a list of items. :param items: List of items to select from :param prompt: Prompt message to display in fzf - :param multi: Allow multiple selections (not yet implemented) :return: Selected item or None if user cancelled or fzf not available """ if not items: @@ -167,13 +165,9 @@ def interactive_select( raise FileNotFoundError() except (FileNotFoundError, subprocess.TimeoutExpired): print( - 'Error: fzf is not installed. Interactive selection requires fzf.', + 'Error: fzf is not installed but is a dependency for this package. You can install it with rosdep', file=sys.stderr ) - print('Install fzf: https://github.com/junegunn/fzf#installation', file=sys.stderr) - print(' - Ubuntu/Debian: sudo apt install fzf', file=sys.stderr) - print(' - Fedora: sudo dnf install fzf', file=sys.stderr) - print(' - macOS: brew install fzf', file=sys.stderr) return None try: