From 7d5f9d9083b1ca2003483a73dae2ee61e6f78458 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 23 Apr 2026 08:33:47 -0700 Subject: [PATCH 1/6] admin.cli: refactor subcommand parser construction --- kafka/cli/admin/__init__.py | 39 ++++++++++++------- kafka/cli/admin/acls/__init__.py | 15 ++----- kafka/cli/admin/acls/create.py | 6 +-- kafka/cli/admin/acls/delete.py | 6 +-- kafka/cli/admin/acls/describe.py | 6 +-- kafka/cli/admin/cluster/__init__.py | 16 +++----- kafka/cli/admin/cluster/api_versions.py | 6 +-- kafka/cli/admin/cluster/broker_version.py | 8 ++-- kafka/cli/admin/cluster/describe.py | 11 ++++-- kafka/cli/admin/cluster/features.py | 12 +++--- kafka/cli/admin/cluster/log_dirs.py | 14 ++++--- kafka/cli/admin/configs/__init__.py | 15 ++----- kafka/cli/admin/configs/alter.py | 6 +-- kafka/cli/admin/configs/describe.py | 6 +-- kafka/cli/admin/configs/list.py | 9 ++--- kafka/cli/admin/configs/reset.py | 6 +-- kafka/cli/admin/groups/__init__.py | 19 +++------ kafka/cli/admin/groups/alter_offsets.py | 8 ++-- kafka/cli/admin/groups/delete.py | 10 +++-- kafka/cli/admin/groups/delete_offsets.py | 8 ++-- kafka/cli/admin/groups/describe.py | 10 +++-- kafka/cli/admin/groups/list.py | 11 ++++-- kafka/cli/admin/groups/list_offsets.py | 6 +-- kafka/cli/admin/groups/remove_members.py | 8 ++-- kafka/cli/admin/groups/reset_offsets.py | 8 ++-- kafka/cli/admin/partitions/__init__.py | 31 ++++++--------- .../admin/partitions/alter_reassignments.py | 8 ++-- kafka/cli/admin/partitions/create.py | 8 ++-- kafka/cli/admin/partitions/delete_records.py | 8 ++-- kafka/cli/admin/partitions/describe.py | 8 ++-- kafka/cli/admin/partitions/elect_leaders.py | 8 ++-- kafka/cli/admin/partitions/list_offsets.py | 8 ++-- .../admin/partitions/list_reassignments.py | 8 ++-- kafka/cli/admin/topics/__init__.py | 15 ++----- kafka/cli/admin/topics/create.py | 6 +-- kafka/cli/admin/topics/delete.py | 6 +-- kafka/cli/admin/topics/describe.py | 10 +++-- kafka/cli/admin/topics/list.py | 11 ++++-- kafka/cli/admin/users/__init__.py | 15 ++----- .../users/alter_user_scram_credentials.py | 8 ++-- .../users/describe_user_scram_credentials.py | 8 ++-- 41 files changed, 199 insertions(+), 236 deletions(-) diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index 3cf55bb8e..a96d0c6ae 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -4,36 +4,45 @@ from pprint import pprint from kafka.admin.client import KafkaAdminClient -from .acls import ACLsSubCommand -from .cluster import ClusterSubCommand -from .configs import ConfigsSubCommand -from .groups import GroupsSubCommand -from .partitions import PartitionsSubCommand -from .topics import TopicsSubCommand -from .users import UsersSubCommand +from .acls import ACLsCommandGroup +from .cluster import ClusterCommandGroup +from .configs import ConfigsCommandGroup +from .groups import GroupsCommandGroup +from .partitions import PartitionsCommandGroup +from .topics import TopicsCommandGroup +from .users import UsersCommandGroup from ..common import add_common_cli_args, configure_logging, build_connect_kwargs from kafka.errors import BrokerResponseError -def main_parser(): +def build_parser(groups=()): parser = argparse.ArgumentParser( prog='python -m kafka.admin', - description='Kafka admin client', + description='Kafka Admin Client', ) add_common_cli_args(parser) parser.add_argument( '-f', '--format', type=str, default='raw', help='output format: raw|json') + groups_sub = parser.add_subparsers(dest='group', metavar='GROUP', title='Available command groups') + for group in groups: + group_parser = groups_sub.add_parser(group.GROUP, help=group.HELP) + group_parser.set_defaults(group=group_parser) # refcycle.. + commands_sub = group_parser.add_subparsers(dest='command', metavar='COMMAND', title='Available commands') + for cmd in group.COMMANDS: + command_parser = commands_sub.add_parser(cmd.COMMAND, help=cmd.HELP) + options = command_parser.add_argument_group('command options') + cmd.add_arguments(options) + command_parser.set_defaults(command=cmd.command) return parser def run_cli(args=None): - parser = main_parser() - subparsers = parser.add_subparsers(help='subcommands') - for cmd in [ACLsSubCommand, ClusterSubCommand, ConfigsSubCommand, - TopicsSubCommand, PartitionsSubCommand, - GroupsSubCommand, UsersSubCommand]: - cmd.add_subparser(subparsers) + parser = build_parser([ + ACLsCommandGroup, ClusterCommandGroup, ConfigsCommandGroup, + TopicsCommandGroup, PartitionsCommandGroup, GroupsCommandGroup, + UsersCommandGroup, + ]) config = parser.parse_args(args) if config.format not in ('raw', 'json'): raise ValueError('Unrecognized format: %s' % config.format) diff --git a/kafka/cli/admin/acls/__init__.py b/kafka/cli/admin/acls/__init__.py index f3c3a4566..13772b719 100644 --- a/kafka/cli/admin/acls/__init__.py +++ b/kafka/cli/admin/acls/__init__.py @@ -1,16 +1,9 @@ -import sys - from .create import CreateACLs from .delete import DeleteACLs from .describe import DescribeACLs -class ACLsSubCommand: - - @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('acls', help='Manage Kafka ACLs') - commands = parser.add_subparsers() - for cmd in [DescribeACLs, CreateACLs, DeleteACLs]: - cmd.add_subparser(commands) - parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) +class ACLsCommandGroup: + GROUP = 'acls' + HELP = 'Manage Kafka ACLs' + COMMANDS = [DescribeACLs, CreateACLs, DeleteACLs] diff --git a/kafka/cli/admin/acls/create.py b/kafka/cli/admin/acls/create.py index b5c7b0ca8..7df29de9f 100644 --- a/kafka/cli/admin/acls/create.py +++ b/kafka/cli/admin/acls/create.py @@ -2,12 +2,12 @@ class CreateACLs: + COMMAND = 'create' + HELP = 'Create Kafka ACLs' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('create', help='Create Kafka ACLs') + def add_arguments(cls, parser): add_acl_args(parser, required=True) - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/acls/delete.py b/kafka/cli/admin/acls/delete.py index ebe580039..1e1cd6f3d 100644 --- a/kafka/cli/admin/acls/delete.py +++ b/kafka/cli/admin/acls/delete.py @@ -2,12 +2,12 @@ class DeleteACLs: + COMMAND = 'delete' + HELP = 'Delete Kafka ACLs' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('delete', help='Delete Kafka ACLs') + def add_arguments(cls, parser): add_acl_filter_args(parser) - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/acls/describe.py b/kafka/cli/admin/acls/describe.py index a3dd548c2..77e117e71 100644 --- a/kafka/cli/admin/acls/describe.py +++ b/kafka/cli/admin/acls/describe.py @@ -2,12 +2,12 @@ class DescribeACLs: + COMMAND = 'describe' + HELP = 'Describe Kafka ACLs' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('describe', help='Describe Kafka ACLs') + def add_arguments(cls, parser): add_acl_filter_args(parser) - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/cluster/__init__.py b/kafka/cli/admin/cluster/__init__.py index b47b03674..ebfb034dc 100644 --- a/kafka/cli/admin/cluster/__init__.py +++ b/kafka/cli/admin/cluster/__init__.py @@ -1,5 +1,3 @@ -import sys - from .api_versions import GetApiVersions from .broker_version import GetBrokerVersion from .describe import DescribeCluster @@ -7,12 +5,8 @@ from .features import DescribeFeatures, UpdateFeatures -class ClusterSubCommand: - - @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('cluster', help='Manage Kafka Cluster') - commands = parser.add_subparsers() - for cmd in [DescribeCluster, DescribeFeatures, UpdateFeatures, GetApiVersions, GetBrokerVersion, DescribeLogDirs]: - cmd.add_subparser(commands) - parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) +class ClusterCommandGroup: + GROUP = 'cluster' + HELP = 'Manage Kafka Cluster' + COMMANDS = [DescribeCluster, DescribeFeatures, UpdateFeatures, + GetApiVersions, GetBrokerVersion, DescribeLogDirs] diff --git a/kafka/cli/admin/cluster/api_versions.py b/kafka/cli/admin/cluster/api_versions.py index 0d2ded02d..31572623e 100644 --- a/kafka/cli/admin/cluster/api_versions.py +++ b/kafka/cli/admin/cluster/api_versions.py @@ -2,13 +2,13 @@ class GetApiVersions: + COMMAND = 'api-versions' + HELP = 'Get Supported Api Versions' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('api-versions', help='Get Supported Api Versions') + def add_arguments(cls, parser): parser.add_argument('-k', '--api-key', type=str, action='append', dest='api_keys', default=None) parser.add_argument('--raw', action='store_true') - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/cluster/broker_version.py b/kafka/cli/admin/cluster/broker_version.py index bb9d19169..2ad24c78d 100644 --- a/kafka/cli/admin/cluster/broker_version.py +++ b/kafka/cli/admin/cluster/broker_version.py @@ -1,10 +1,10 @@ class GetBrokerVersion: + COMMAND = 'broker-version' + HELP = 'Get Version for Broker' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('broker-version', help='Get Version for Broker') - parser.add_argument('-b', '--broker', required=True) - parser.set_defaults(command=cls.command) + def add_arguments(cls, parser): + parser.add_argument('--broker', required=True) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/cluster/describe.py b/kafka/cli/admin/cluster/describe.py index 40cc4c0ac..4ca21a319 100644 --- a/kafka/cli/admin/cluster/describe.py +++ b/kafka/cli/admin/cluster/describe.py @@ -1,6 +1,11 @@ class DescribeCluster: + COMMAND = 'describe' + HELP = 'Describe Kafka Cluster' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('describe', help='Describe Kafka Cluster') - parser.set_defaults(command=lambda cli, _args: cli.describe_cluster()) + def add_arguments(cls, parser): + pass + + @classmethod + def command(cls, client, args): + return client.describe_cluster() diff --git a/kafka/cli/admin/cluster/features.py b/kafka/cli/admin/cluster/features.py index c16880d11..6fd289128 100644 --- a/kafka/cli/admin/cluster/features.py +++ b/kafka/cli/admin/cluster/features.py @@ -2,13 +2,13 @@ class DescribeFeatures: + COMMAND = 'describe-features' + HELP = 'Describe Features of Kafka Cluster' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('describe-features', help='Describe Features of Kafka Cluster') + def add_arguments(cls, parser): parser.add_argument('-f', '--feature', type=str, action='append', dest='features', default=[], help='Show one or more specific features. If not provided, returns all features.') - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): @@ -20,16 +20,16 @@ def command(cls, client, args): class UpdateFeatures: + COMMAND = 'update-features' + HELP = 'Update Features of Kafka Cluster' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('update-features', help='Update Features of Kafka Cluster') + def add_arguments(cls, parser): parser.add_argument('-f', '--feature', type=str, action='append', dest='features', default=[], help='set feature=value') parser.add_argument('--downgrade', action='store_true') parser.add_argument('--unsafe', action='store_true') parser.add_argument('--timeout', type=int, default=60) parser.add_argument('--validate-only', action='store_true') - parser.set_defaults(command=cls.command) @staticmethod def _feature_type(args): diff --git a/kafka/cli/admin/cluster/log_dirs.py b/kafka/cli/admin/cluster/log_dirs.py index f533bb11a..055d47b90 100644 --- a/kafka/cli/admin/cluster/log_dirs.py +++ b/kafka/cli/admin/cluster/log_dirs.py @@ -1,8 +1,12 @@ class DescribeLogDirs: + COMMAND = 'log-dirs' + HELP = 'Get topic log directories and stats' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('log-dirs', help='Get topic log directories and stats') - parser.add_argument('-b', '--broker', type=int, action='append', dest='brokers', help='Query specific broker(s)') - parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', help='Get data about specific topic(s)') - parser.set_defaults(command=lambda cli, args: cli.describe_log_dirs(topic_partitions=args.topics, brokers=args.brokers)) + def add_arguments(cls, parser): + parser.add_argument('--broker', type=int, action='append', dest='brokers', help='Query specific broker(s)') + parser.add_argument('--topic', type=str, action='append', dest='topics', help='Get data about specific topic(s)') + + @classmethod + def command(cls, client, args): + return client.describe_log_dirs(topic_partitions=args.topics, brokers=args.brokers) diff --git a/kafka/cli/admin/configs/__init__.py b/kafka/cli/admin/configs/__init__.py index d8639f57e..a909a6864 100644 --- a/kafka/cli/admin/configs/__init__.py +++ b/kafka/cli/admin/configs/__init__.py @@ -1,17 +1,10 @@ -import sys - from .alter import AlterConfigs from .describe import DescribeConfigs from .list import ListConfigResources from .reset import ResetConfigs -class ConfigsSubCommand: - - @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('configs', help='Manage Kafka Configuration') - commands = parser.add_subparsers() - for cmd in [DescribeConfigs, AlterConfigs, ListConfigResources, ResetConfigs]: - cmd.add_subparser(commands) - parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) +class ConfigsCommandGroup: + GROUP = 'configs' + HELP = 'Manage Kafka Configuration' + COMMANDS = [DescribeConfigs, AlterConfigs, ListConfigResources, ResetConfigs] diff --git a/kafka/cli/admin/configs/alter.py b/kafka/cli/admin/configs/alter.py index 33ea7b096..558ae446f 100644 --- a/kafka/cli/admin/configs/alter.py +++ b/kafka/cli/admin/configs/alter.py @@ -5,10 +5,11 @@ class AlterConfigs: + COMMAND = 'alter' + HELP = 'Alter Kafka Configs' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('alter', help='Alter Kafka Configs') + def add_arguments(cls, parser): add_resource_arguments(parser) parser.add_argument('-c', '--config', type=str, action='append', dest='configs', required=True, help='key=value to alter') parser.add_argument('-v', '--validate-only', action='store_true', default=False) @@ -16,7 +17,6 @@ def add_subparser(cls, subparsers): incremental = parser.add_mutually_exclusive_group() incremental.add_argument('--force-incremental', action='store_true', dest='incremental', default=None) incremental.add_argument('--force-alter', action='store_false', dest='incremental', default=None) - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/configs/describe.py b/kafka/cli/admin/configs/describe.py index 16d2a73d4..8c6b4a25a 100644 --- a/kafka/cli/admin/configs/describe.py +++ b/kafka/cli/admin/configs/describe.py @@ -2,17 +2,17 @@ class DescribeConfigs: + COMMAND = 'describe' + HELP = 'Describe Kafka Configs' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('describe', help='Describe Kafka Configs') + def add_arguments(cls, parser): add_resource_arguments(parser) parser.add_argument('-c', '--config', type=str, action='append', dest='configs', default=None) parser.add_argument('--dynamic', action='store_true', default=False) parser.add_argument('--modified', action='store_true', default=False) parser.add_argument('--static', action='store_true', default=False) parser.add_argument('--default', action='store_true', default=False) - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/configs/list.py b/kafka/cli/admin/configs/list.py index cff2c8159..0365010d1 100644 --- a/kafka/cli/admin/configs/list.py +++ b/kafka/cli/admin/configs/list.py @@ -1,17 +1,14 @@ class ListConfigResources: + COMMAND = 'list' + HELP = 'List config resources known to the cluster (requires broker >= 4.1 for non client_metrics types)' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser( - 'list', - help='List config resources known to the cluster (requires broker >= 4.1 ' - 'for non client_metrics types)') + def add_arguments(cls, parser): parser.add_argument( '-r', '--resource-type', type=str, action='append', dest='resource_types', default=[], help='Filter by resource type (repeatable): topic, broker, ' 'broker_logger, client_metrics, group. Omit to list all ' 'supported types.') - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/configs/reset.py b/kafka/cli/admin/configs/reset.py index f9d832121..0de13bd28 100644 --- a/kafka/cli/admin/configs/reset.py +++ b/kafka/cli/admin/configs/reset.py @@ -2,15 +2,15 @@ class ResetConfigs: + COMMAND = 'reset' + HELP = 'Reset Kafka Configs' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('reset', help='Reset Kafka Configs') + def add_arguments(cls, parser): add_resource_arguments(parser) parser.add_argument('-c', '--config', type=str, action='append', dest='configs', default=[], help='key to reset') parser.add_argument('-v', '--validate-only', action='store_true', default=False) parser.add_argument('--allow-unknown', action='store_false', dest='raise_on_unknown', default=True) - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/groups/__init__.py b/kafka/cli/admin/groups/__init__.py index 602eba5df..752ca84cc 100644 --- a/kafka/cli/admin/groups/__init__.py +++ b/kafka/cli/admin/groups/__init__.py @@ -1,5 +1,3 @@ -import sys - from .alter_offsets import AlterGroupOffsets from .delete import DeleteGroups from .delete_offsets import DeleteGroupOffsets @@ -10,14 +8,9 @@ from .reset_offsets import ResetGroupOffsets -class GroupsSubCommand: - - @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('groups', help='Manage Kafka Groups') - commands = parser.add_subparsers() - for cmd in [ListGroups, DescribeGroups, DeleteGroups, - ListGroupOffsets, AlterGroupOffsets, ResetGroupOffsets, DeleteGroupOffsets, - RemoveGroupMembers]: - cmd.add_subparser(commands) - parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) +class GroupsCommandGroup: + GROUP = 'groups' + HELP = 'Manage Kafka Groups' + COMMANDS = [ListGroups, DescribeGroups, DeleteGroups, + ListGroupOffsets, AlterGroupOffsets, ResetGroupOffsets, DeleteGroupOffsets, + RemoveGroupMembers] diff --git a/kafka/cli/admin/groups/alter_offsets.py b/kafka/cli/admin/groups/alter_offsets.py index c06cbfffe..52b704bca 100644 --- a/kafka/cli/admin/groups/alter_offsets.py +++ b/kafka/cli/admin/groups/alter_offsets.py @@ -2,12 +2,11 @@ class AlterGroupOffsets: + COMMAND = 'alter-offsets' + HELP = 'Alter committed offsets for a consumer group' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser( - 'alter-offsets', - help='Alter committed offsets for a consumer group') + def add_arguments(cls, parser): parser.add_argument('-g', '--group-id', type=str, required=True) parser.add_argument( '-o', '--offset', type=str, action='append', @@ -16,7 +15,6 @@ def add_subparser(cls, subparsers): parser.add_argument( '--group-coordinator-id', type=int, default=None, help='Send directly to this broker id, skipping coordinator lookup') - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/groups/delete.py b/kafka/cli/admin/groups/delete.py index ec122cd6f..c5cc28039 100644 --- a/kafka/cli/admin/groups/delete.py +++ b/kafka/cli/admin/groups/delete.py @@ -1,7 +1,11 @@ class DeleteGroups: + COMMAND = 'delete' + HELP = 'Delete Groups' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('delete', help='Delete Groups') + def add_arguments(cls, parser): parser.add_argument('-g', '--group-id', type=str, action='append', dest='groups', required=True) - parser.set_defaults(command=lambda cli, args: cli.delete_groups(args.groups)) + + @classmethod + def command(cls, client, args): + return client.delete_groups(args.groups) diff --git a/kafka/cli/admin/groups/delete_offsets.py b/kafka/cli/admin/groups/delete_offsets.py index e5117dcce..4a020ed1b 100644 --- a/kafka/cli/admin/groups/delete_offsets.py +++ b/kafka/cli/admin/groups/delete_offsets.py @@ -2,12 +2,11 @@ class DeleteGroupOffsets: + COMMAND = 'delete-offsets' + HELP = 'Delete committed offsets for a consumer group' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser( - 'delete-offsets', - help='Delete committed offsets for a consumer group') + def add_arguments(cls, parser): parser.add_argument('-g', '--group-id', type=str, required=True) parser.add_argument( '-p', '--partition', type=str, action='append', @@ -16,7 +15,6 @@ def add_subparser(cls, subparsers): parser.add_argument( '--group-coordinator-id', type=int, default=None, help='Send directly to this broker id, skipping coordinator lookup') - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/groups/describe.py b/kafka/cli/admin/groups/describe.py index 5898c891a..0950a0c07 100644 --- a/kafka/cli/admin/groups/describe.py +++ b/kafka/cli/admin/groups/describe.py @@ -1,7 +1,11 @@ class DescribeGroups: + COMMAND = 'describe' + HELP = 'Describe Groups' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('describe', help='Describe Groups') + def add_arguments(cls, parser): parser.add_argument('-g', '--group-id', type=str, action='append', dest='groups', required=True) - parser.set_defaults(command=lambda cli, args: cli.describe_groups(args.groups)) + + @classmethod + def command(cls, client, args): + return client.describe_groups(args.groups) diff --git a/kafka/cli/admin/groups/list.py b/kafka/cli/admin/groups/list.py index c0342718d..5acd1475a 100644 --- a/kafka/cli/admin/groups/list.py +++ b/kafka/cli/admin/groups/list.py @@ -1,6 +1,11 @@ class ListGroups: + COMMAND = 'list' + HELP = 'List Groups' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('list', help='List Groups') - parser.set_defaults(command=lambda cli, _args: cli.list_groups()) + def add_arguments(cls, parser): + pass + + @classmethod + def command(cls, client, args): + return client.list_groups() diff --git a/kafka/cli/admin/groups/list_offsets.py b/kafka/cli/admin/groups/list_offsets.py index e568958b0..afa720863 100644 --- a/kafka/cli/admin/groups/list_offsets.py +++ b/kafka/cli/admin/groups/list_offsets.py @@ -5,12 +5,12 @@ class ListGroupOffsets: + COMMAND = 'list-offsets' + HELP = 'List Offsets for Group' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('list-offsets', help='List Offsets for Group') + def add_arguments(cls, parser): parser.add_argument('-g', '--group-id', type=str, required=True) - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/groups/remove_members.py b/kafka/cli/admin/groups/remove_members.py index 196824e81..3a3308ff2 100644 --- a/kafka/cli/admin/groups/remove_members.py +++ b/kafka/cli/admin/groups/remove_members.py @@ -2,12 +2,11 @@ class RemoveGroupMembers: + COMMAND = 'remove-members' + HELP = 'Remove members from a consumer group' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser( - 'remove-members', - help='Remove members from a consumer group') + def add_arguments(cls, parser): parser.add_argument('-g', '--group-id', type=str, required=True) parser.add_argument( '-m', '--member-id', type=str, action='append', @@ -25,7 +24,6 @@ def add_subparser(cls, subparsers): parser.add_argument( '--group-coordinator-id', type=int, default=None, help='Send directly to this broker id, skipping coordinator lookup') - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/groups/reset_offsets.py b/kafka/cli/admin/groups/reset_offsets.py index 1cdb07c3d..66c19d34a 100644 --- a/kafka/cli/admin/groups/reset_offsets.py +++ b/kafka/cli/admin/groups/reset_offsets.py @@ -5,12 +5,11 @@ class ResetGroupOffsets: + COMMAND = 'reset-offsets' + HELP = 'Reset committed offsets for a consumer group' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser( - 'reset-offsets', - help='Reset committed offsets for a consumer group') + def add_arguments(cls, parser): parser.add_argument('-g', '--group-id', type=str, required=True) parser.add_argument( '-s', '--spec', type=str, @@ -26,7 +25,6 @@ def add_subparser(cls, subparsers): 'a single wildcard "*" for all partitions. SPEC may be one of ' 'earliest, latest, max-timestamp, earliest-local, latest-tiered, ' 'or a millisecond timestamp.') - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/partitions/__init__.py b/kafka/cli/admin/partitions/__init__.py index 428987089..9c3584815 100644 --- a/kafka/cli/admin/partitions/__init__.py +++ b/kafka/cli/admin/partitions/__init__.py @@ -1,5 +1,3 @@ -import sys - from .alter_reassignments import AlterPartitionReassignments from .create import CreatePartitions from .delete_records import DeleteRecords @@ -9,20 +7,15 @@ from .list_reassignments import ListPartitionReassignments -class PartitionsSubCommand: - - @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('partitions', help='Manage Kafka Partitions') - commands = parser.add_subparsers() - for cmd in [ - CreatePartitions, - DescribeTopicPartitions, - ListPartitionOffsets, - ListPartitionReassignments, - AlterPartitionReassignments, - DeleteRecords, - ElectLeaders, - ]: - cmd.add_subparser(commands) - parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) +class PartitionsCommandGroup: + GROUP = 'partitions' + HELP = 'Manage Kafka Partitions' + COMMANDS = [ + CreatePartitions, + DescribeTopicPartitions, + ListPartitionOffsets, + ListPartitionReassignments, + AlterPartitionReassignments, + DeleteRecords, + ElectLeaders, + ] diff --git a/kafka/cli/admin/partitions/alter_reassignments.py b/kafka/cli/admin/partitions/alter_reassignments.py index 59692cc52..e91b40c6c 100644 --- a/kafka/cli/admin/partitions/alter_reassignments.py +++ b/kafka/cli/admin/partitions/alter_reassignments.py @@ -2,12 +2,11 @@ class AlterPartitionReassignments: + COMMAND = 'alter-reassignments' + HELP = 'Alter replica assignments for partitions' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser( - 'alter-reassignments', - help='Alter replica assignments for partitions') + def add_arguments(cls, parser): parser.add_argument( '-r', '--reassign', type=str, action='append', dest='reassignments', default=[], required=True, @@ -20,7 +19,6 @@ def add_subparser(cls, subparsers): parser.add_argument( '--no-raise-errors', dest='raise_errors', action='store_false', help='Do not raise on partition-level errors; return the response instead') - parser.set_defaults(command=cls.command, raise_errors=True) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/partitions/create.py b/kafka/cli/admin/partitions/create.py index 43e02ae40..ba3181157 100644 --- a/kafka/cli/admin/partitions/create.py +++ b/kafka/cli/admin/partitions/create.py @@ -1,10 +1,9 @@ class CreatePartitions: + COMMAND = 'create' + HELP = 'Create additional partitions for existing topics' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser( - 'create', - help='Create additional partitions for existing topics') + def add_arguments(cls, parser): parser.add_argument( '-p', '--topic-partitions', type=str, action='append', dest='topic_partitions', default=[], required=True, @@ -15,7 +14,6 @@ def add_subparser(cls, subparsers): parser.add_argument( '--validate-only', action='store_true', help='Validate the request without actually creating partitions') - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/partitions/delete_records.py b/kafka/cli/admin/partitions/delete_records.py index 03d2da0cc..9a298fb8c 100644 --- a/kafka/cli/admin/partitions/delete_records.py +++ b/kafka/cli/admin/partitions/delete_records.py @@ -2,12 +2,11 @@ class DeleteRecords: + COMMAND = 'delete-records' + HELP = 'Delete records from partitions up to a given offset' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser( - 'delete-records', - help='Delete records from partitions up to a given offset') + def add_arguments(cls, parser): parser.add_argument( '-r', '--record', type=str, action='append', dest='records', default=[], required=True, @@ -19,7 +18,6 @@ def add_subparser(cls, subparsers): parser.add_argument( '--partition-leader-id', type=int, default=None, help='Send all delete requests to this broker id, skipping metadata lookup') - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/partitions/describe.py b/kafka/cli/admin/partitions/describe.py index 2319e067f..3f5a79a9d 100644 --- a/kafka/cli/admin/partitions/describe.py +++ b/kafka/cli/admin/partitions/describe.py @@ -1,10 +1,9 @@ class DescribeTopicPartitions: + COMMAND = 'describe' + HELP = 'Describe topic partitions with pagination (KIP-966, broker >=3.9)' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser( - 'describe', - help='Describe topic partitions with pagination (KIP-966, broker >=3.9)') + def add_arguments(cls, parser): parser.add_argument( '-t', '--topic', type=str, action='append', dest='topics', default=[], required=True, @@ -19,7 +18,6 @@ def add_subparser(cls, subparsers): parser.add_argument( '--cursor-partition', type=int, default=None, help='Partition index to start pagination from') - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/partitions/elect_leaders.py b/kafka/cli/admin/partitions/elect_leaders.py index f90789627..a12226a2a 100644 --- a/kafka/cli/admin/partitions/elect_leaders.py +++ b/kafka/cli/admin/partitions/elect_leaders.py @@ -5,12 +5,11 @@ class ElectLeaders: + COMMAND = 'elect-leaders' + HELP = 'Trigger leader election for partitions' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser( - 'elect-leaders', - help='Trigger leader election for partitions') + def add_arguments(cls, parser): parser.add_argument( '--election-type', type=str, default='preferred', choices=sorted(_ELECTION_TYPES), @@ -31,7 +30,6 @@ def add_subparser(cls, subparsers): parser.add_argument( '--no-raise-errors', dest='raise_errors', action='store_false', help='Do not raise on partition-level errors; return the response instead') - parser.set_defaults(command=cls.command, raise_errors=True) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/partitions/list_offsets.py b/kafka/cli/admin/partitions/list_offsets.py index 0175e7b02..9663f1494 100644 --- a/kafka/cli/admin/partitions/list_offsets.py +++ b/kafka/cli/admin/partitions/list_offsets.py @@ -5,12 +5,11 @@ class ListPartitionOffsets: + COMMAND = 'list-offsets' + HELP = 'List offsets for partitions by spec (earliest/latest/timestamp)' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser( - 'list-offsets', - help='List offsets for partitions by spec (earliest/latest/timestamp)') + def add_arguments(cls, parser): parser.add_argument( '-t', '--topic', type=str) parser.add_argument( @@ -25,7 +24,6 @@ def add_subparser(cls, subparsers): 'a single wildcard "*" for all partitions. SPEC may be one of ' 'earliest, latest, max-timestamp, earliest-local, latest-tiered, ' 'or a millisecond timestamp.') - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/partitions/list_reassignments.py b/kafka/cli/admin/partitions/list_reassignments.py index 6ef97187d..d7be2209a 100644 --- a/kafka/cli/admin/partitions/list_reassignments.py +++ b/kafka/cli/admin/partitions/list_reassignments.py @@ -4,12 +4,11 @@ class ListPartitionReassignments: + COMMAND = 'list-reassignments' + HELP = 'List the current ongoing partition reassignments' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser( - 'list-reassignments', - help='List the current ongoing partition reassignments') + def add_arguments(cls, parser): parser.add_argument( '-p', '--topic-partition', type=str, action='append', dest='topic_partitions', default=[], @@ -18,7 +17,6 @@ def add_subparser(cls, subparsers): parser.add_argument( '--timeout-ms', type=int, default=None, help='Request timeout in milliseconds') - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/topics/__init__.py b/kafka/cli/admin/topics/__init__.py index bcb1973af..bf1e19a6b 100644 --- a/kafka/cli/admin/topics/__init__.py +++ b/kafka/cli/admin/topics/__init__.py @@ -1,17 +1,10 @@ -import sys - from .create import CreateTopic from .delete import DeleteTopic from .describe import DescribeTopics from .list import ListTopics -class TopicsSubCommand: - - @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('topics', help='List/Describe/Create/Delete Kafka Topics') - commands = parser.add_subparsers() - for cmd in [ListTopics, DescribeTopics, CreateTopic, DeleteTopic]: - cmd.add_subparser(commands) - parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) +class TopicsCommandGroup: + GROUP = 'topics' + HELP = 'Manage Kafka Topics' + COMMANDS = [ListTopics, DescribeTopics, CreateTopic, DeleteTopic] diff --git a/kafka/cli/admin/topics/create.py b/kafka/cli/admin/topics/create.py index 7112e4e7f..eb6408e2a 100644 --- a/kafka/cli/admin/topics/create.py +++ b/kafka/cli/admin/topics/create.py @@ -1,12 +1,12 @@ class CreateTopic: + COMMAND = 'create' + HELP = 'Create a Kafka Topic' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('create', help='Create a Kafka Topic') + def add_arguments(cls, parser): parser.add_argument('-t', '--topic', type=str, required=True) parser.add_argument('--num-partitions', type=int, default=-1) parser.add_argument('--replication-factor', type=int, default=-1) - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/topics/delete.py b/kafka/cli/admin/topics/delete.py index ff2d93450..1be8c7d09 100644 --- a/kafka/cli/admin/topics/delete.py +++ b/kafka/cli/admin/topics/delete.py @@ -2,13 +2,13 @@ class DeleteTopic: + COMMAND = 'delete' + HELP = 'Delete Kafka Topic' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('delete', help='Delete Kafka Topic') + def add_arguments(cls, parser): parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', default=[], help='topic name') parser.add_argument('--id', type=str, action='append', dest='topic_ids', default=[], help='topic UUID') - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/topics/describe.py b/kafka/cli/admin/topics/describe.py index b053524b4..43b0f5fb9 100644 --- a/kafka/cli/admin/topics/describe.py +++ b/kafka/cli/admin/topics/describe.py @@ -1,7 +1,11 @@ class DescribeTopics: + COMMAND = 'describe' + HELP = 'Describe Kafka Topics' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('describe', help='Describe Kafka Topics') + def add_arguments(cls, parser): parser.add_argument('-t', '--topic', type=str, action='append', dest='topics') - parser.set_defaults(command=lambda cli, args: cli.describe_topics(args.topics or None)) + + @classmethod + def command(cls, client, args): + return client.describe_topics(args.topics or None) diff --git a/kafka/cli/admin/topics/list.py b/kafka/cli/admin/topics/list.py index 2c5d48ccc..b6714a8aa 100644 --- a/kafka/cli/admin/topics/list.py +++ b/kafka/cli/admin/topics/list.py @@ -1,6 +1,11 @@ class ListTopics: + COMMAND = 'list' + HELP = 'List Kafka Topics' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('list', help='List Kafka Topics') - parser.set_defaults(command=lambda cli, _args: cli.list_topics()) + def add_arguments(cls, parser): + pass + + @classmethod + def command(cls, client, args): + return client.list_topics() diff --git a/kafka/cli/admin/users/__init__.py b/kafka/cli/admin/users/__init__.py index d856e777b..272b0e5f2 100644 --- a/kafka/cli/admin/users/__init__.py +++ b/kafka/cli/admin/users/__init__.py @@ -1,15 +1,8 @@ -import sys - from .alter_user_scram_credentials import AlterUserScramCredentials from .describe_user_scram_credentials import DescribeUserScramCredentials -class UsersSubCommand: - - @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser('users', help='Manage Kafka Users') - commands = parser.add_subparsers() - for cmd in [DescribeUserScramCredentials, AlterUserScramCredentials]: - cmd.add_subparser(commands) - parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2)) +class UsersCommandGroup: + GROUP = 'users' + HELP = 'Manage Kafka Users' + COMMANDS = [DescribeUserScramCredentials, AlterUserScramCredentials] diff --git a/kafka/cli/admin/users/alter_user_scram_credentials.py b/kafka/cli/admin/users/alter_user_scram_credentials.py index 379c8c737..de240e1f1 100644 --- a/kafka/cli/admin/users/alter_user_scram_credentials.py +++ b/kafka/cli/admin/users/alter_user_scram_credentials.py @@ -6,12 +6,11 @@ class AlterUserScramCredentials: + COMMAND = 'alter-scram-credentials' + HELP = 'Alter SCRAM credentials for Kafka users' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser( - 'alter-scram-credentials', - help='Alter SCRAM credentials for Kafka users') + def add_arguments(cls, parser): parser.add_argument( '--delete', type=str, action='append', dest='deletions', default=[], help='USER:MECHANISM pair to delete (e.g. alice:SCRAM-SHA-256)') @@ -21,7 +20,6 @@ def add_subparser(cls, subparsers): parser.add_argument( '--iterations', type=int, default=None, help='PBKDF2 iteration count for upsertions (default: 4096)') - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): diff --git a/kafka/cli/admin/users/describe_user_scram_credentials.py b/kafka/cli/admin/users/describe_user_scram_credentials.py index 01a8eb537..935a93d44 100644 --- a/kafka/cli/admin/users/describe_user_scram_credentials.py +++ b/kafka/cli/admin/users/describe_user_scram_credentials.py @@ -1,15 +1,13 @@ class DescribeUserScramCredentials: + COMMAND = 'describe-scram-credentials' + HELP = 'Describe SCRAM credentials for Kafka users' @classmethod - def add_subparser(cls, subparsers): - parser = subparsers.add_parser( - 'describe-scram-credentials', - help='Describe SCRAM credentials for Kafka users') + def add_arguments(cls, parser): parser.add_argument( '--user', type=str, action='append', dest='users', default=[], help='User name to describe (repeatable). ' 'If omitted, describes all users.') - parser.set_defaults(command=cls.command) @classmethod def command(cls, client, args): From 92adaae68134b3ba5cc336d984e9e65967914e30 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 23 Apr 2026 08:35:48 -0700 Subject: [PATCH 2/6] group/command help --- kafka/cli/admin/__init__.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index a96d0c6ae..c8207a62f 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -44,6 +44,12 @@ def run_cli(args=None): UsersCommandGroup, ]) config = parser.parse_args(args) + if not config.group: + parser.print_help() + return 1 + elif not config.command: + config.group.print_help() + return 1 if config.format not in ('raw', 'json'): raise ValueError('Unrecognized format: %s' % config.format) From 247d323094bcc8719c3a0dde76a598338e1bfff5 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 23 Apr 2026 08:38:51 -0700 Subject: [PATCH 3/6] bootstrap_required=False --- kafka/cli/admin/__init__.py | 9 +++++++-- kafka/cli/common.py | 10 ++++++---- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index c8207a62f..1b406429f 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -20,7 +20,7 @@ def build_parser(groups=()): prog='python -m kafka.admin', description='Kafka Admin Client', ) - add_common_cli_args(parser) + add_common_cli_args(parser, bootstrap_required=False) parser.add_argument( '-f', '--format', type=str, default='raw', help='output format: raw|json') @@ -56,7 +56,12 @@ def run_cli(args=None): configure_logging(config) logger = logging.getLogger(__name__) - kwargs = build_connect_kwargs(config) + try: + kwargs = build_connect_kwargs(config) + except ValueError as exc: + parser.print_usage() + print(f'{parser.prog}: {exc}') + return 1 client = KafkaAdminClient(**kwargs) try: diff --git a/kafka/cli/common.py b/kafka/cli/common.py index 5d4e29aea..4ad3c7a6a 100644 --- a/kafka/cli/common.py +++ b/kafka/cli/common.py @@ -1,10 +1,10 @@ import logging -def add_connect_cli_args(parser): +def add_connect_cli_args(parser, bootstrap_required=True): connect_group = parser.add_argument_group('connection') connect_group.add_argument( - '-b', '--bootstrap-servers', type=str, action='append', required=True, + '-b', '--bootstrap-servers', type=str, action='append', required=bootstrap_required, help='host:port for cluster bootstrap server. Can be provided multiple times.') connect_group.add_argument( '-s', '--security-protocol', type=str, default='PLAINTEXT', help='PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL') @@ -35,6 +35,8 @@ def build_kwargs(props): def build_connect_kwargs(config): + if not config.bootstrap_servers: + raise ValueError('python -m kafka: error: the following arguments are required: -b/--bootstrap-servers') kwargs = build_kwargs(config.extra_config) kwargs.update({ 'bootstrap_servers': config.bootstrap_servers, @@ -88,6 +90,6 @@ def configure_logging(config): logging.getLogger(name).setLevel(logging.CRITICAL + 1) -def add_common_cli_args(parser): - add_connect_cli_args(parser) +def add_common_cli_args(parser, bootstrap_required=True): + add_connect_cli_args(parser, bootstrap_required) add_logging_cli_args(parser) From 37b97d094738edad3e61598377e796e12b817db3 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 23 Apr 2026 08:43:34 -0700 Subject: [PATCH 4/6] output groups; fixup format error processing --- kafka/cli/admin/__init__.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index 1b406429f..5b62f9661 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -21,8 +21,8 @@ def build_parser(groups=()): description='Kafka Admin Client', ) add_common_cli_args(parser, bootstrap_required=False) - parser.add_argument( - '-f', '--format', type=str, default='raw', + parser.add_argument_group('output').add_argument( + '--format', type=str, default='raw', help='output format: raw|json') groups_sub = parser.add_subparsers(dest='group', metavar='GROUP', title='Available command groups') for group in groups: @@ -51,7 +51,8 @@ def run_cli(args=None): config.group.print_help() return 1 if config.format not in ('raw', 'json'): - raise ValueError('Unrecognized format: %s' % config.format) + print(f'Unrecognized format: {config.format}') + return 1 configure_logging(config) logger = logging.getLogger(__name__) From c4acfcdbf94dc0925b4efa80fc22b713dcabdf02 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 23 Apr 2026 08:49:49 -0700 Subject: [PATCH 5/6] cli consumer/producer options groups --- kafka/cli/consumer/__init__.py | 11 ++++++----- kafka/cli/producer/__init__.py | 5 +++-- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/kafka/cli/consumer/__init__.py b/kafka/cli/consumer/__init__.py index 56af3a14a..ada4ac96e 100644 --- a/kafka/cli/consumer/__init__.py +++ b/kafka/cli/consumer/__init__.py @@ -12,19 +12,20 @@ def main_parser(): description='Kafka console consumer', ) add_common_cli_args(parser) - parser.add_argument( + options = parser.add_argument_group('consumer options') + options.add_argument( '-t', '--topic', type=str, action='append', dest='topics', required=True, help='subscribe to topic') - parser.add_argument( + options.add_argument( '-g', '--group', type=str, required=True, help='consumer group') - parser.add_argument( + options.add_argument( '-i', '--group-instance-id', type=str, help='static group membership identifier') - parser.add_argument( + options.add_argument( '-f', '--format', type=str, default='str', help='output format: str|raw|full') - parser.add_argument( + options.add_argument( '--encoding', type=str, default='utf-8', help='encoding to use for str output decode()') return parser diff --git a/kafka/cli/producer/__init__.py b/kafka/cli/producer/__init__.py index 09c6bfc78..ebdfc7362 100644 --- a/kafka/cli/producer/__init__.py +++ b/kafka/cli/producer/__init__.py @@ -12,10 +12,11 @@ def main_parser(): description='Kafka console producer', ) add_common_cli_args(parser) - parser.add_argument( + options = parser.add_argument_group('producer options') + options.add_argument( '-t', '--topic', type=str, required=True, help='publish to topic') - parser.add_argument( + options.add_argument( '--encoding', type=str, default='utf-8', help='byte encoding for produced messages') return parser From ab131c67e6d93f5dd0c57c80c97672b09e6218ec Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 23 Apr 2026 08:50:23 -0700 Subject: [PATCH 6/6] use uppercase for common short flags --- kafka/cli/common.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/kafka/cli/common.py b/kafka/cli/common.py index 4ad3c7a6a..98e1aed96 100644 --- a/kafka/cli/common.py +++ b/kafka/cli/common.py @@ -7,13 +7,13 @@ def add_connect_cli_args(parser, bootstrap_required=True): '-b', '--bootstrap-servers', type=str, action='append', required=bootstrap_required, help='host:port for cluster bootstrap server. Can be provided multiple times.') connect_group.add_argument( - '-s', '--security-protocol', type=str, default='PLAINTEXT', help='PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL') + '-S', '--security-protocol', type=str, default='PLAINTEXT', help='PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL') connect_group.add_argument( - '-m', '--sasl-mechanism', type=str, default='PLAIN', help='PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512') + '-M', '--sasl-mechanism', type=str, default='PLAIN', help='PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512') connect_group.add_argument( - '-u', '--sasl-user', type=str) + '-U', '--sasl-user', type=str) connect_group.add_argument( - '-p', '--sasl-password', type=str) + '-P', '--sasl-password', type=str) def build_kwargs(props): @@ -57,11 +57,11 @@ def add_logging_cli_args(parser): '-L', '--enable-logger', type=str, action='append', help='enable a specific logger. Can be provided multiple times. If not provided, all loggers are enabled') logging_group.add_argument( - '-DL', '--disable-logger', type=str, action='append', + '-D', '--disable-logger', type=str, action='append', help='disable a specific logger. Can be provided multiple times.') extended_group = parser.add_argument_group('extended') extended_group.add_argument( - '-c', '--extra-config', type=str, action='append', + '-C', '--extra-config', type=str, action='append', help='additional configuration properties for client in "key=val" format. Can be provided multiple times.')