Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 41 additions & 20 deletions kafka/cli/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,65 @@
from pprint import pprint

from kafka.admin.client import KafkaAdminClient
from .acls import ACLsSubCommand
from .cluster import ClusterSubCommand
from .configs import ConfigsSubCommand
from .groups import GroupsSubCommand
from .partitions import PartitionsSubCommand
from .topics import TopicsSubCommand
from .users import UsersSubCommand
from .acls import ACLsCommandGroup
from .cluster import ClusterCommandGroup
from .configs import ConfigsCommandGroup
from .groups import GroupsCommandGroup
from .partitions import PartitionsCommandGroup
from .topics import TopicsCommandGroup
from .users import UsersCommandGroup
from ..common import add_common_cli_args, configure_logging, build_connect_kwargs
from kafka.errors import BrokerResponseError


def main_parser():
def build_parser(groups=()):
parser = argparse.ArgumentParser(
prog='python -m kafka.admin',
description='Kafka admin client',
description='Kafka Admin Client',
)
add_common_cli_args(parser)
parser.add_argument(
'-f', '--format', type=str, default='raw',
add_common_cli_args(parser, bootstrap_required=False)
parser.add_argument_group('output').add_argument(
'--format', type=str, default='raw',
help='output format: raw|json')
groups_sub = parser.add_subparsers(dest='group', metavar='GROUP', title='Available command groups')
for group in groups:
group_parser = groups_sub.add_parser(group.GROUP, help=group.HELP)
group_parser.set_defaults(group=group_parser) # refcycle..
commands_sub = group_parser.add_subparsers(dest='command', metavar='COMMAND', title='Available commands')
for cmd in group.COMMANDS:
command_parser = commands_sub.add_parser(cmd.COMMAND, help=cmd.HELP)
options = command_parser.add_argument_group('command options')
cmd.add_arguments(options)
command_parser.set_defaults(command=cmd.command)
return parser


def run_cli(args=None):
parser = main_parser()
subparsers = parser.add_subparsers(help='subcommands')
for cmd in [ACLsSubCommand, ClusterSubCommand, ConfigsSubCommand,
TopicsSubCommand, PartitionsSubCommand,
GroupsSubCommand, UsersSubCommand]:
cmd.add_subparser(subparsers)
parser = build_parser([
ACLsCommandGroup, ClusterCommandGroup, ConfigsCommandGroup,
TopicsCommandGroup, PartitionsCommandGroup, GroupsCommandGroup,
UsersCommandGroup,
])
config = parser.parse_args(args)
if not config.group:
parser.print_help()
return 1
elif not config.command:
config.group.print_help()
return 1
if config.format not in ('raw', 'json'):
raise ValueError('Unrecognized format: %s' % config.format)
print(f'Unrecognized format: {config.format}')
return 1

configure_logging(config)
logger = logging.getLogger(__name__)

kwargs = build_connect_kwargs(config)
try:
kwargs = build_connect_kwargs(config)
except ValueError as exc:
parser.print_usage()
print(f'{parser.prog}: {exc}')
return 1
client = KafkaAdminClient(**kwargs)

try:
Expand Down
15 changes: 4 additions & 11 deletions kafka/cli/admin/acls/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
import sys

from .create import CreateACLs
from .delete import DeleteACLs
from .describe import DescribeACLs


class ACLsSubCommand:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('acls', help='Manage Kafka ACLs')
commands = parser.add_subparsers()
for cmd in [DescribeACLs, CreateACLs, DeleteACLs]:
cmd.add_subparser(commands)
parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2))
class ACLsCommandGroup:
GROUP = 'acls'
HELP = 'Manage Kafka ACLs'
COMMANDS = [DescribeACLs, CreateACLs, DeleteACLs]
6 changes: 3 additions & 3 deletions kafka/cli/admin/acls/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@


class CreateACLs:
COMMAND = 'create'
HELP = 'Create Kafka ACLs'

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('create', help='Create Kafka ACLs')
def add_arguments(cls, parser):
add_acl_args(parser, required=True)
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
Expand Down
6 changes: 3 additions & 3 deletions kafka/cli/admin/acls/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@


class DeleteACLs:
COMMAND = 'delete'
HELP = 'Delete Kafka ACLs'

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('delete', help='Delete Kafka ACLs')
def add_arguments(cls, parser):
add_acl_filter_args(parser)
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
Expand Down
6 changes: 3 additions & 3 deletions kafka/cli/admin/acls/describe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@


class DescribeACLs:
COMMAND = 'describe'
HELP = 'Describe Kafka ACLs'

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('describe', help='Describe Kafka ACLs')
def add_arguments(cls, parser):
add_acl_filter_args(parser)
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
Expand Down
16 changes: 5 additions & 11 deletions kafka/cli/admin/cluster/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
import sys

from .api_versions import GetApiVersions
from .broker_version import GetBrokerVersion
from .describe import DescribeCluster
from .log_dirs import DescribeLogDirs
from .features import DescribeFeatures, UpdateFeatures


class ClusterSubCommand:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('cluster', help='Manage Kafka Cluster')
commands = parser.add_subparsers()
for cmd in [DescribeCluster, DescribeFeatures, UpdateFeatures, GetApiVersions, GetBrokerVersion, DescribeLogDirs]:
cmd.add_subparser(commands)
parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2))
class ClusterCommandGroup:
GROUP = 'cluster'
HELP = 'Manage Kafka Cluster'
COMMANDS = [DescribeCluster, DescribeFeatures, UpdateFeatures,
GetApiVersions, GetBrokerVersion, DescribeLogDirs]
6 changes: 3 additions & 3 deletions kafka/cli/admin/cluster/api_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@


class GetApiVersions:
COMMAND = 'api-versions'
HELP = 'Get Supported Api Versions'

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('api-versions', help='Get Supported Api Versions')
def add_arguments(cls, parser):
parser.add_argument('-k', '--api-key', type=str, action='append', dest='api_keys', default=None)
parser.add_argument('--raw', action='store_true')
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
Expand Down
8 changes: 4 additions & 4 deletions kafka/cli/admin/cluster/broker_version.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
class GetBrokerVersion:
COMMAND = 'broker-version'
HELP = 'Get Version for Broker'

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('broker-version', help='Get Version for Broker')
parser.add_argument('-b', '--broker', required=True)
parser.set_defaults(command=cls.command)
def add_arguments(cls, parser):
parser.add_argument('--broker', required=True)

@classmethod
def command(cls, client, args):
Expand Down
11 changes: 8 additions & 3 deletions kafka/cli/admin/cluster/describe.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
class DescribeCluster:
COMMAND = 'describe'
HELP = 'Describe Kafka Cluster'

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('describe', help='Describe Kafka Cluster')
parser.set_defaults(command=lambda cli, _args: cli.describe_cluster())
def add_arguments(cls, parser):
pass

@classmethod
def command(cls, client, args):
return client.describe_cluster()
12 changes: 6 additions & 6 deletions kafka/cli/admin/cluster/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@


class DescribeFeatures:
COMMAND = 'describe-features'
HELP = 'Describe Features of Kafka Cluster'

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('describe-features', help='Describe Features of Kafka Cluster')
def add_arguments(cls, parser):
parser.add_argument('-f', '--feature', type=str, action='append', dest='features', default=[],
help='Show one or more specific features. If not provided, returns all features.')
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
Expand All @@ -20,16 +20,16 @@ def command(cls, client, args):


class UpdateFeatures:
COMMAND = 'update-features'
HELP = 'Update Features of Kafka Cluster'

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('update-features', help='Update Features of Kafka Cluster')
def add_arguments(cls, parser):
parser.add_argument('-f', '--feature', type=str, action='append', dest='features', default=[], help='set feature=value')
parser.add_argument('--downgrade', action='store_true')
parser.add_argument('--unsafe', action='store_true')
parser.add_argument('--timeout', type=int, default=60)
parser.add_argument('--validate-only', action='store_true')
parser.set_defaults(command=cls.command)

@staticmethod
def _feature_type(args):
Expand Down
14 changes: 9 additions & 5 deletions kafka/cli/admin/cluster/log_dirs.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
class DescribeLogDirs:
COMMAND = 'log-dirs'
HELP = 'Get topic log directories and stats'

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('log-dirs', help='Get topic log directories and stats')
parser.add_argument('-b', '--broker', type=int, action='append', dest='brokers', help='Query specific broker(s)')
parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', help='Get data about specific topic(s)')
parser.set_defaults(command=lambda cli, args: cli.describe_log_dirs(topic_partitions=args.topics, brokers=args.brokers))
def add_arguments(cls, parser):
parser.add_argument('--broker', type=int, action='append', dest='brokers', help='Query specific broker(s)')
parser.add_argument('--topic', type=str, action='append', dest='topics', help='Get data about specific topic(s)')

@classmethod
def command(cls, client, args):
return client.describe_log_dirs(topic_partitions=args.topics, brokers=args.brokers)
15 changes: 4 additions & 11 deletions kafka/cli/admin/configs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
import sys

from .alter import AlterConfigs
from .describe import DescribeConfigs
from .list import ListConfigResources
from .reset import ResetConfigs


class ConfigsSubCommand:

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('configs', help='Manage Kafka Configuration')
commands = parser.add_subparsers()
for cmd in [DescribeConfigs, AlterConfigs, ListConfigResources, ResetConfigs]:
cmd.add_subparser(commands)
parser.set_defaults(command=lambda *_args: parser.print_help() or sys.exit(2))
class ConfigsCommandGroup:
GROUP = 'configs'
HELP = 'Manage Kafka Configuration'
COMMANDS = [DescribeConfigs, AlterConfigs, ListConfigResources, ResetConfigs]
6 changes: 3 additions & 3 deletions kafka/cli/admin/configs/alter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@


class AlterConfigs:
COMMAND = 'alter'
HELP = 'Alter Kafka Configs'

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('alter', help='Alter Kafka Configs')
def add_arguments(cls, parser):
add_resource_arguments(parser)
parser.add_argument('-c', '--config', type=str, action='append', dest='configs', required=True, help='key=value to alter')
parser.add_argument('-v', '--validate-only', action='store_true', default=False)
parser.add_argument('--allow-unknown', action='store_false', dest='raise_on_unknown', default=True)
incremental = parser.add_mutually_exclusive_group()
incremental.add_argument('--force-incremental', action='store_true', dest='incremental', default=None)
incremental.add_argument('--force-alter', action='store_false', dest='incremental', default=None)
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
Expand Down
6 changes: 3 additions & 3 deletions kafka/cli/admin/configs/describe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@


class DescribeConfigs:
COMMAND = 'describe'
HELP = 'Describe Kafka Configs'

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('describe', help='Describe Kafka Configs')
def add_arguments(cls, parser):
add_resource_arguments(parser)
parser.add_argument('-c', '--config', type=str, action='append', dest='configs', default=None)
parser.add_argument('--dynamic', action='store_true', default=False)
parser.add_argument('--modified', action='store_true', default=False)
parser.add_argument('--static', action='store_true', default=False)
parser.add_argument('--default', action='store_true', default=False)
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
Expand Down
9 changes: 3 additions & 6 deletions kafka/cli/admin/configs/list.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
class ListConfigResources:
COMMAND = 'list'
HELP = 'List config resources known to the cluster (requires broker >= 4.1 for non client_metrics types)'

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser(
'list',
help='List config resources known to the cluster (requires broker >= 4.1 '
'for non client_metrics types)')
def add_arguments(cls, parser):
parser.add_argument(
'-r', '--resource-type', type=str, action='append', dest='resource_types', default=[],
help='Filter by resource type (repeatable): topic, broker, '
'broker_logger, client_metrics, group. Omit to list all '
'supported types.')
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
Expand Down
6 changes: 3 additions & 3 deletions kafka/cli/admin/configs/reset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@


class ResetConfigs:
COMMAND = 'reset'
HELP = 'Reset Kafka Configs'

@classmethod
def add_subparser(cls, subparsers):
parser = subparsers.add_parser('reset', help='Reset Kafka Configs')
def add_arguments(cls, parser):
add_resource_arguments(parser)
parser.add_argument('-c', '--config', type=str, action='append', dest='configs', default=[], help='key to reset')
parser.add_argument('-v', '--validate-only', action='store_true', default=False)
parser.add_argument('--allow-unknown', action='store_false', dest='raise_on_unknown', default=True)
parser.set_defaults(command=cls.command)

@classmethod
def command(cls, client, args):
Expand Down
Loading
Loading