diff --git a/ros2cli/package.xml b/ros2cli/package.xml
index 95a86dcd8..3b395b9ee 100644
--- a/ros2cli/package.xml
+++ b/ros2cli/package.xml
@@ -21,6 +21,7 @@
python3-packaging
python3-psutil
rclpy
+ fzf
ament_copyright
ament_flake8
diff --git a/ros2cli/ros2cli/helpers.py b/ros2cli/ros2cli/helpers.py
index abffcd489..d822ae655 100644
--- a/ros2cli/ros2cli/helpers.py
+++ b/ros2cli/ros2cli/helpers.py
@@ -16,10 +16,13 @@
import functools
import inspect
import os
+import subprocess
import sys
import time
from typing import Dict
+from typing import List
+from typing import Optional
def get_ros_domain_id():
@@ -133,3 +136,61 @@ def get_rmw_additional_env(rmw_implementation: str) -> Dict[str, str]:
return {
'RMW_IMPLEMENTATION': rmw_implementation,
}
+
+
+def interactive_select(
+ items: List[str],
+ prompt: str = 'Select an item:'
+) -> Optional[str]:
+ """
+ Launch interactive fuzzy search using fzf to select from a list of items.
+
+ :param items: List of items to select from
+ :param prompt: Prompt message to display in fzf
+ :return: Selected item or None if user cancelled or fzf not available
+ """
+ if not items:
+ print('No items available to select from.', file=sys.stderr)
+ return None
+
+ try:
+ # Check if fzf is available
+ result = subprocess.run(
+ ['fzf', '--version'],
+ capture_output=True,
+ text=True,
+ timeout=1
+ )
+ if result.returncode != 0:
+ raise FileNotFoundError()
+ except (FileNotFoundError, subprocess.TimeoutExpired):
+ print(
+ 'Error: fzf is not installed but is a dependency for this package. You can install it with rosdep',
+ file=sys.stderr
+ )
+ return None
+
+ try:
+ # Launch fzf with items as input - using direct TTY access
+ process = subprocess.Popen(
+ ['fzf', '--prompt', prompt + ' ', '--height', '40%', '--reverse'],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ text=True
+ )
+
+ # Send items to fzf
+ stdout, _ = process.communicate(input='\n'.join(items))
+
+ # Check if user cancelled (Ctrl-C or ESC)
+ if process.returncode != 0:
+ return None
+
+ # Return selected item (strip newline)
+ selected = stdout.strip()
+ return selected if selected else None
+
+ except Exception as e:
+ print(f'Error during interactive selection: {e}', file=sys.stderr)
+ return None
+
diff --git a/ros2interface/ros2interface/api/__init__.py b/ros2interface/ros2interface/api/__init__.py
index 362050d4c..6be3c6502 100644
--- a/ros2interface/ros2interface/api/__init__.py
+++ b/ros2interface/ros2interface/api/__init__.py
@@ -52,3 +52,20 @@ def interface_to_yaml(identifier):
instance = interface()
return message_to_yaml(instance)
+
+
+def get_all_interface_names():
+ """Get all available interface names (messages, services, and actions)."""
+ types = []
+ for package_name, service_names in get_service_interfaces().items():
+ for service_name in service_names:
+ types.append(f'{package_name}/srv/{service_name}')
+
+ for package_name, message_names in get_message_interfaces().items():
+ for message_name in message_names:
+ types.append(f'{package_name}/msg/{message_name}')
+
+ for package_name, action_names in get_action_interfaces().items():
+ for action_name in action_names:
+ types.append(f'{package_name}/action/{action_name}')
+ return sorted(types)
diff --git a/ros2interface/ros2interface/verb/show.py b/ros2interface/ros2interface/verb/show.py
index d97c713cd..0e95f65ba 100644
--- a/ros2interface/ros2interface/verb/show.py
+++ b/ros2interface/ros2interface/verb/show.py
@@ -16,6 +16,8 @@
import sys
import typing
+from ros2cli.helpers import interactive_select
+from ros2interface.api import get_all_interface_names
from ros2interface.api import type_completer
from ros2interface.verb import VerbExtension
from rosidl_adapter.parser import \
@@ -190,13 +192,28 @@ def add_arguments(self, parser, cli_name):
arg = parser.add_argument(
'type',
+ nargs='?',
action=ReadStdinPipe,
help="Show an interface definition (e.g. 'example_interfaces/msg/String'). "
+ "If not provided, an interactive selection will be shown. "
"Passing '-' reads the argument from stdin (e.g. "
"'ros2 topic type /chatter | ros2 interface show -').")
arg.completer = type_completer
def main(self, *, args):
+ # If no type provided, launch interactive selection
+ if args.type is None:
+ interface_types = get_all_interface_names()
+
+ selected_type = interactive_select(
+ interface_types,
+ prompt='Select interface to show:')
+
+ if selected_type is None:
+ return 'No interface selected'
+
+ args.type = selected_type
+
try:
_show_interface(
args.type,
diff --git a/ros2node/ros2node/verb/info.py b/ros2node/ros2node/verb/info.py
index 0eed17d26..273af6e9b 100644
--- a/ros2node/ros2node/verb/info.py
+++ b/ros2node/ros2node/verb/info.py
@@ -14,6 +14,7 @@
import sys
+from ros2cli.helpers import interactive_select
from ros2cli.node.strategy import add_arguments
from ros2cli.node.strategy import NodeStrategy
from ros2node.api import get_action_client_info
@@ -38,8 +39,9 @@ class InfoVerb(VerbExtension):
def add_arguments(self, parser, cli_name):
add_arguments(parser)
argument = parser.add_argument(
- 'node_name',
- help='Fully qualified node name to request information')
+ 'node_name', nargs='?',
+ help='Fully qualified node name to request information. '
+ 'If not provided, an interactive selection will be shown.')
argument.completer = NodeNameCompleter()
parser.add_argument(
'--include-hidden', action='store_true',
@@ -47,6 +49,20 @@ def add_arguments(self, parser, cli_name):
def main(self, *, args):
with NodeStrategy(args) as node:
+ # If no node name provided, launch interactive selection
+ if args.node_name is None:
+ node_names = get_node_names(node=node, include_hidden_nodes=args.include_hidden)
+ node_name_list = [n.full_name for n in node_names]
+
+ selected_node = interactive_select(
+ node_name_list,
+ prompt='Select node for info:')
+
+ if selected_node is None:
+ return 'No node selected'
+
+ args.node_name = selected_node
+
node_names = get_node_names(node=node, include_hidden_nodes=args.include_hidden)
count = [n.full_name for n in node_names].count(args.node_name)
if count > 1:
diff --git a/ros2param/ros2param/verb/get.py b/ros2param/ros2param/verb/get.py
index facfb7fc6..20ebd12f2 100644
--- a/ros2param/ros2param/verb/get.py
+++ b/ros2param/ros2param/verb/get.py
@@ -13,13 +13,16 @@
# limitations under the License.
from rcl_interfaces.msg import ParameterType
+from ros2cli.helpers import interactive_select
from ros2cli.node.direct import DirectNode
from ros2cli.node.strategy import add_arguments
from ros2cli.node.strategy import NodeStrategy
from ros2node.api import get_absolute_node_name
+from ros2node.api import get_node_names
from ros2node.api import NodeNameCompleter
from ros2node.api import wait_for_node
from ros2param.api import call_get_parameters
+from ros2param.api import call_list_parameters
from ros2param.api import ParameterNameCompleter
from ros2param.verb import VerbExtension
@@ -30,14 +33,16 @@ class GetVerb(VerbExtension):
def add_arguments(self, parser, cli_name): # noqa: D102
add_arguments(parser)
arg = parser.add_argument(
- 'node_name', help='Name of the ROS node')
+ 'node_name', nargs='?',
+ help='Name of the ROS node. If not provided, an interactive selection will be shown.')
arg.completer = NodeNameCompleter(
include_hidden_nodes_key='include_hidden_nodes')
parser.add_argument(
'--include-hidden-nodes', action='store_true',
help='Consider hidden nodes as well')
arg = parser.add_argument(
- 'parameter_name', help='Name of the parameter')
+ 'parameter_name', nargs='?',
+ help='Name of the parameter. If not provided, an interactive selection will be shown.')
arg.completer = ParameterNameCompleter()
parser.add_argument(
'--hide-type', action='store_true',
@@ -47,6 +52,36 @@ def add_arguments(self, parser, cli_name): # noqa: D102
help='Wait for N seconds until node becomes available (default %(default)s sec)')
def main(self, *, args): # noqa: D102
+ # If no node name provided, launch interactive selection
+ if args.node_name is None:
+ with NodeStrategy(args) as node:
+ node_names = get_node_names(node=node, include_hidden_nodes=args.include_hidden_nodes)
+ node_name_list = [n.full_name for n in node_names]
+
+ selected_node = interactive_select(
+ node_name_list,
+ prompt='Select node:')
+
+ if selected_node is None:
+ return 'No node selected'
+
+ args.node_name = selected_node
+
+ # If no parameter name provided, launch interactive selection
+ if args.parameter_name is None:
+ with DirectNode(args) as node:
+ parameter_names = call_list_parameters(
+ node=node, node_name=args.node_name)
+
+ selected_param = interactive_select(
+ parameter_names,
+ prompt='Select parameter:')
+
+ if selected_param is None:
+ return 'No parameter selected'
+
+ args.parameter_name = selected_param
+
node_name = get_absolute_node_name(args.node_name)
with NodeStrategy(args) as node:
if not wait_for_node(node, node_name, args.include_hidden_nodes, args.timeout):
diff --git a/ros2service/ros2service/verb/call.py b/ros2service/ros2service/verb/call.py
index ffc521aed..cf0326548 100644
--- a/ros2service/ros2service/verb/call.py
+++ b/ros2service/ros2service/verb/call.py
@@ -20,10 +20,13 @@
from rclpy.qos import QoSProfile
from ros2cli.helpers import collect_stdin
+from ros2cli.helpers import interactive_select
from ros2cli.node import NODE_NAME_PREFIX
+from ros2cli.node.strategy import NodeStrategy
from ros2cli.qos import add_qos_arguments
from ros2cli.qos import profile_configure_short_keys
+from ros2service.api import get_service_names
from ros2service.api import ServiceNameCompleter
from ros2service.api import ServicePrototypeCompleter
from ros2service.api import ServiceTypeCompleter
@@ -40,8 +43,9 @@ class CallVerb(VerbExtension):
def add_arguments(self, parser, cli_name):
arg = parser.add_argument(
- 'service_name',
- help="Name of the ROS service to call to (e.g. '/add_two_ints')")
+ 'service_name', nargs='?',
+ help="Name of the ROS service to call to (e.g. '/add_two_ints'). "
+ "If not provided, an interactive selection will be shown.")
arg.completer = ServiceNameCompleter(
include_hidden_services_key='include_hidden_services')
arg = parser.add_argument(
@@ -66,6 +70,22 @@ def add_arguments(self, parser, cli_name):
add_qos_arguments(parser, 'service client', 'services_default')
def main(self, *, args):
+ # If no service name provided, launch interactive selection
+ if args.service_name is None:
+ with NodeStrategy(args) as node:
+ service_names = get_service_names(
+ node=node,
+ include_hidden_services=args.include_hidden_services)
+
+ selected_service = interactive_select(
+ service_names,
+ prompt='Select service to call:')
+
+ if selected_service is None:
+ return 'No service selected'
+
+ args.service_name = selected_service
+
if args.rate is not None and args.rate <= 0:
raise RuntimeError('rate must be greater than zero')
period = 1. / args.rate if args.rate else None
diff --git a/ros2topic/ros2topic/verb/bw.py b/ros2topic/ros2topic/verb/bw.py
index 85158287a..45a08ff17 100644
--- a/ros2topic/ros2topic/verb/bw.py
+++ b/ros2topic/ros2topic/verb/bw.py
@@ -41,12 +41,14 @@
from rclpy.executors import ExternalShutdownException
+from ros2cli.helpers import interactive_select
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
from ros2cli.node.direct import DirectNode
from ros2cli.qos import add_qos_arguments
from ros2cli.qos import choose_qos
from ros2topic.api import get_msg_class
+from ros2topic.api import get_topic_names
from ros2topic.api import get_topic_names_and_types
from ros2topic.api import positive_int
from ros2topic.api import TopicNameCompleter
@@ -101,8 +103,22 @@ def main(self, *, args):
def main(args):
+ # Interactive selection if no topic name and not --all
if not args.all_topics and not args.topic_name:
- raise RuntimeError('Either specify topic names or use --all/-a option')
+ with DirectNode(args) as node:
+ topic_names = get_topic_names(
+ node=node.node,
+ include_hidden_topics=args.include_hidden_topics)
+
+ selected_topic = interactive_select(
+ topic_names,
+ prompt='Select topic for bw:')
+
+ if selected_topic is None:
+ return 'No topic selected'
+
+ args.topic_name = [selected_topic]
+
if args.all_topics and args.topic_name:
raise RuntimeError('Cannot specify both --all/-a and topic names')
diff --git a/ros2topic/ros2topic/verb/echo.py b/ros2topic/ros2topic/verb/echo.py
index 69657110f..c835fbe86 100644
--- a/ros2topic/ros2topic/verb/echo.py
+++ b/ros2topic/ros2topic/verb/echo.py
@@ -23,6 +23,7 @@
from rclpy.qos import QoSProfile
from rclpy.task import Future
+from ros2cli.helpers import interactive_select
from ros2cli.helpers import unsigned_int
from ros2cli.node.strategy import add_arguments as add_strategy_node_arguments
from ros2cli.node.strategy import NodeStrategy
@@ -30,6 +31,7 @@
from ros2cli.qos import choose_qos
from ros2topic.api import get_msg_class
+from ros2topic.api import get_topic_names
from ros2topic.api import positive_float
from ros2topic.api import TopicNameCompleter
from ros2topic.verb import VerbExtension
@@ -51,8 +53,9 @@ def add_arguments(self, parser, cli_name):
add_strategy_node_arguments(parser)
arg = parser.add_argument(
- 'topic_name',
- help="Name of the ROS topic to listen to (e.g. '/chatter')")
+ 'topic_name', nargs='?',
+ help="Name of the ROS topic to listen to (e.g. '/chatter'). "
+ "If not provided, an interactive selection will be shown.")
arg.completer = TopicNameCompleter(
include_hidden_topics_key='include_hidden_topics')
parser.add_argument(
@@ -121,6 +124,22 @@ def add_arguments(self, parser, cli_name):
def main(self, *, args):
+ # If no topic name provided, launch interactive selection
+ if args.topic_name is None:
+ with NodeStrategy(args) as node:
+ topic_names = get_topic_names(
+ node=node,
+ include_hidden_topics=args.include_hidden_topics)
+
+ selected_topic = interactive_select(
+ topic_names,
+ prompt='Select topic to echo:')
+
+ if selected_topic is None:
+ return 'No topic selected'
+
+ args.topic_name = selected_topic
+
self.csv = args.csv
# Validate field selection
diff --git a/ros2topic/ros2topic/verb/hz.py b/ros2topic/ros2topic/verb/hz.py
index 335dbd9a0..5d7fee0ac 100644
--- a/ros2topic/ros2topic/verb/hz.py
+++ b/ros2topic/ros2topic/verb/hz.py
@@ -42,12 +42,14 @@
from rclpy.clock import ClockType
from rclpy.executors import ExternalShutdownException
+from ros2cli.helpers import interactive_select
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
from ros2cli.node.direct import DirectNode
from ros2cli.qos import add_qos_arguments
from ros2cli.qos import choose_qos
from ros2topic.api import get_msg_class
+from ros2topic.api import get_topic_names
from ros2topic.api import get_topic_names_and_types
from ros2topic.api import positive_int
from ros2topic.api import TopicNameCompleter
@@ -100,8 +102,22 @@ def main(self, *, args):
def main(args):
+ # Interactive selection if no topic name and not --all
if not args.all_topics and not args.topic_name:
- raise RuntimeError('Either specify topic names or use --all/-a option')
+ with DirectNode(args) as node:
+ topic_names = get_topic_names(
+ node=node.node,
+ include_hidden_topics=args.include_hidden_topics)
+
+ selected_topic = interactive_select(
+ topic_names,
+ prompt='Select topic for hz:')
+
+ if selected_topic is None:
+ return 'No topic selected'
+
+ args.topic_name = [selected_topic]
+
if args.all_topics and args.topic_name:
raise RuntimeError('Cannot specify both --all/-a and topic names')
diff --git a/ros2topic/ros2topic/verb/info.py b/ros2topic/ros2topic/verb/info.py
index 39ffabceb..c1a311643 100644
--- a/ros2topic/ros2topic/verb/info.py
+++ b/ros2topic/ros2topic/verb/info.py
@@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from ros2cli.helpers import interactive_select
from ros2cli.node.strategy import add_arguments as add_strategy_node_arguments
from ros2cli.node.strategy import NodeStrategy
+from ros2topic.api import get_topic_names
from ros2topic.api import get_topic_names_and_types
from ros2topic.api import TopicNameCompleter
from ros2topic.verb import VerbExtension
@@ -25,8 +27,9 @@ class InfoVerb(VerbExtension):
def add_arguments(self, parser, cli_name):
add_strategy_node_arguments(parser)
arg = parser.add_argument(
- 'topic_name',
- help="Name of the ROS topic to get info (e.g. '/chatter')")
+ 'topic_name', nargs='?',
+ help="Name of the ROS topic to get info (e.g. '/chatter'). "
+ "If not provided, an interactive selection will be shown.")
parser.add_argument(
'--verbose',
'-v',
@@ -39,6 +42,21 @@ def add_arguments(self, parser, cli_name):
def main(self, *, args):
with NodeStrategy(args) as node:
+ # If no topic name provided, launch interactive selection
+ if args.topic_name is None:
+ topic_names = get_topic_names(
+ node=node,
+ include_hidden_topics=args.include_hidden_topics)
+
+ selected_topic = interactive_select(
+ topic_names,
+ prompt='Select topic for info:')
+
+ if selected_topic is None:
+ return 'No topic selected'
+
+ args.topic_name = selected_topic
+
topic_names_and_types = get_topic_names_and_types(
node=node, include_hidden_topics=True)
topic_name = args.topic_name
diff --git a/ros2topic/ros2topic/verb/pub.py b/ros2topic/ros2topic/verb/pub.py
index 9732ab1ee..9f3bd27ed 100644
--- a/ros2topic/ros2topic/verb/pub.py
+++ b/ros2topic/ros2topic/verb/pub.py
@@ -23,11 +23,13 @@
from rclpy.qos import QoSProfile
from ros2cli.helpers import collect_stdin
+from ros2cli.helpers import interactive_select
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
from ros2cli.node.direct import DirectNode
from ros2cli.qos import add_qos_arguments
from ros2cli.qos import profile_configure_short_keys
+from ros2topic.api import get_topic_names
from ros2topic.api import positive_float
from ros2topic.api import TopicMessagePrototypeCompleter, YamlCompletionFinder
from ros2topic.api import TopicNameCompleter
@@ -56,8 +58,9 @@ class PubVerb(VerbExtension):
def add_arguments(self, parser, cli_name):
arg = parser.add_argument(
- 'topic_name',
- help="Name of the ROS topic to publish to (e.g. '/chatter')")
+ 'topic_name', nargs='?',
+ help="Name of the ROS topic to publish to (e.g. '/chatter'). "
+ "If not provided, an interactive selection will be shown.")
arg.completer = TopicNameCompleter(
include_hidden_topics_key='include_hidden_topics')
arg = parser.add_argument(
@@ -126,6 +129,23 @@ def main(self, *, args):
def main(args):
+ # If no topic name provided, launch interactive selection
+ if args.topic_name is None:
+ from ros2cli.node.strategy import NodeStrategy
+ with NodeStrategy(args) as node:
+ topic_names = get_topic_names(
+ node=node,
+ include_hidden_topics=args.include_hidden_topics)
+
+ selected_topic = interactive_select(
+ topic_names,
+ prompt='Select topic to publish to:')
+
+ if selected_topic is None:
+ return 'No topic selected'
+
+ args.topic_name = selected_topic
+
qos_profile_name = args.qos_profile
qos_profile = rclpy.qos.QoSPresetProfiles.get_from_short_key(qos_profile_name)
profile_configure_short_keys(