From 974479424a7e429e8c97acab49014b08689c44c6 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 22 Apr 2026 19:18:21 -0700 Subject: [PATCH] kafka.cli: common configuration for logging and connect kwargs --- kafka/cli/admin/__init__.py | 52 +++----------------------- kafka/cli/common.py | 68 +++++++++++++++++++++++++++++++++- kafka/cli/consumer/__init__.py | 42 ++------------------- kafka/cli/producer/__init__.py | 43 ++------------------- 4 files changed, 80 insertions(+), 125 deletions(-) diff --git a/kafka/cli/admin/__init__.py b/kafka/cli/admin/__init__.py index 7151ceae6..e8a3ede79 100644 --- a/kafka/cli/admin/__init__.py +++ b/kafka/cli/admin/__init__.py @@ -11,7 +11,7 @@ from .partitions import PartitionsSubCommand from .topics import TopicsSubCommand from .users import UsersSubCommand -from ..common import add_common_cli_args +from ..common import add_common_cli_args, configure_logging, build_connect_kwargs from kafka.errors import BrokerResponseError @@ -27,27 +27,6 @@ def main_parser(): return parser -_LOGGING_LEVELS = {'NOTSET': 0, 'DEBUG': 10, 'INFO': 20, 'WARNING': 30, 'ERROR': 40, 'CRITICAL': 50} - - -def build_kwargs(props): - kwargs = {} - for prop in props or []: - k, v = prop.split('=') - try: - v = int(v) - except ValueError: - pass - if v == 'None': - v = None - elif v == 'False': - v = False - elif v == 'True': - v = True - kwargs[k] = v - return kwargs - - def run_cli(args=None): parser = main_parser() subparsers = parser.add_subparsers(help='subcommands') @@ -56,34 +35,15 @@ def run_cli(args=None): GroupsSubCommand, UsersSubCommand]: cmd.add_subparser(subparsers) config = parser.parse_args(args) - - if config.enable_logger is not None: - log_level = _LOGGING_LEVELS[config.log_level.upper()] - handler = logging.StreamHandler() - handler.setLevel(log_level) - handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT)) - for name in config.enable_logger: - logger = logging.getLogger(name) - logger.setLevel(log_level) - logger.addHandler(handler) - else: - logging.basicConfig(level=_LOGGING_LEVELS[config.log_level.upper()]) - if config.disable_logger is not None: - for name in config.disable_logger: - logging.getLogger(name).setLevel(logging.CRITICAL + 1) - if config.format not in ('raw', 'json'): raise ValueError('Unrecognized format: %s' % config.format) + + configure_logging(config) logger = logging.getLogger(__name__) - kwargs = build_kwargs(config.extra_config) - client = KafkaAdminClient( - bootstrap_servers=config.bootstrap_servers, - security_protocol=config.security_protocol, - sasl_mechanism=config.sasl_mechanism, - sasl_plain_username=config.sasl_user, - sasl_plain_password=config.sasl_password, - **kwargs) + kwargs = build_connect_kwargs(config) + client = KafkaAdminClient(**kwargs) + try: result = config.command(client, config) if config.format == 'raw': diff --git a/kafka/cli/common.py b/kafka/cli/common.py index 88fdf543e..5d4e29aea 100644 --- a/kafka/cli/common.py +++ b/kafka/cli/common.py @@ -1,4 +1,7 @@ -def add_common_cli_args(parser): +import logging + + +def add_connect_cli_args(parser): connect_group = parser.add_argument_group('connection') connect_group.add_argument( '-b', '--bootstrap-servers', type=str, action='append', required=True, @@ -11,6 +14,39 @@ def add_common_cli_args(parser): '-u', '--sasl-user', type=str) connect_group.add_argument( '-p', '--sasl-password', type=str) + + +def build_kwargs(props): + kwargs = {} + for prop in props or []: + k, v = prop.split('=') + try: + v = int(v) + except ValueError: + pass + if v == 'None': + v = None + elif v == 'False': + v = False + elif v == 'True': + v = True + kwargs[k] = v + return kwargs + + +def build_connect_kwargs(config): + kwargs = build_kwargs(config.extra_config) + kwargs.update({ + 'bootstrap_servers': config.bootstrap_servers, + 'security_protocol': config.security_protocol, + 'sasl_mechanism': config.sasl_mechanism, + 'sasl_plain_username': config.sasl_user, + 'sasl_plain_password': config.sasl_password, + }) + return kwargs + + +def add_logging_cli_args(parser): logging_group = parser.add_argument_group('logging') logging_group.add_argument( '-l', '--log-level', type=str, default='CRITICAL', @@ -25,3 +61,33 @@ def add_common_cli_args(parser): extended_group.add_argument( '-c', '--extra-config', type=str, action='append', help='additional configuration properties for client in "key=val" format. Can be provided multiple times.') + + +def configure_logging(config): + _LOGGING_LEVELS = { + 'NOTSET': 0, + 'DEBUG': 10, + 'INFO': 20, + 'WARNING': 30, + 'ERROR': 40, + 'CRITICAL': 50, + } + if config.enable_logger is not None: + log_level = _LOGGING_LEVELS[config.log_level.upper()] + handler = logging.StreamHandler() + handler.setLevel(log_level) + handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT)) + for name in config.enable_logger: + logger = logging.getLogger(name) + logger.setLevel(log_level) + logger.addHandler(handler) + else: + logging.basicConfig(level=_LOGGING_LEVELS[config.log_level.upper()]) + if config.disable_logger is not None: + for name in config.disable_logger: + logging.getLogger(name).setLevel(logging.CRITICAL + 1) + + +def add_common_cli_args(parser): + add_connect_cli_args(parser) + add_logging_cli_args(parser) diff --git a/kafka/cli/consumer/__init__.py b/kafka/cli/consumer/__init__.py index 186b17360..56af3a14a 100644 --- a/kafka/cli/consumer/__init__.py +++ b/kafka/cli/consumer/__init__.py @@ -3,7 +3,7 @@ from kafka import KafkaConsumer from kafka.errors import KafkaError -from ..common import add_common_cli_args +from ..common import add_common_cli_args, configure_logging, build_connect_kwargs def main_parser(): @@ -29,53 +29,17 @@ def main_parser(): return parser -_LOGGING_LEVELS = {'NOTSET': 0, 'DEBUG': 10, 'INFO': 20, 'WARNING': 30, 'ERROR': 40, 'CRITICAL': 50} - - -def build_kwargs(props): - kwargs = {} - for prop in props or []: - k, v = prop.split('=') - try: - v = int(v) - except ValueError: - pass - if v == 'None': - v = None - elif v == 'False': - v = False - elif v == 'True': - v = True - kwargs[k] = v - return kwargs - - def run_cli(args=None): parser = main_parser() config = parser.parse_args(args) - if config.enable_logger is not None: - log_level = _LOGGING_LEVELS[config.log_level.upper()] - handler = logging.StreamHandler() - handler.setLevel(log_level) - handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT)) - for name in config.enable_logger: - logger = logging.getLogger(name) - logger.setLevel(log_level) - logger.addHandler(handler) - else: - logging.basicConfig(level=_LOGGING_LEVELS[config.log_level.upper()]) - if config.disable_logger is not None: - for name in config.disable_logger: - logging.getLogger(name).setLevel(logging.CRITICAL + 1) - if config.format not in ('str', 'raw', 'full'): raise ValueError('Unrecognized format: %s' % config.format) + configure_logging(config) logger = logging.getLogger(__name__) - kwargs = build_kwargs(config.extra_config) + kwargs = build_connect_kwargs(config) consumer = KafkaConsumer( - bootstrap_servers=config.bootstrap_servers, group_id=config.group, group_instance_id=config.group_instance_id, **kwargs) diff --git a/kafka/cli/producer/__init__.py b/kafka/cli/producer/__init__.py index cb2ea73da..09c6bfc78 100644 --- a/kafka/cli/producer/__init__.py +++ b/kafka/cli/producer/__init__.py @@ -3,7 +3,7 @@ import sys from kafka import KafkaProducer -from ..common import add_common_cli_args +from ..common import add_common_cli_args, configure_logging, build_connect_kwargs def main_parser(): @@ -21,50 +21,15 @@ def main_parser(): return parser -_LOGGING_LEVELS = {'NOTSET': 0, 'DEBUG': 10, 'INFO': 20, 'WARNING': 30, 'ERROR': 40, 'CRITICAL': 50} - - -def build_kwargs(props): - kwargs = {} - for prop in props or []: - k, v = prop.split('=') - try: - v = int(v) - except ValueError: - pass - if v == 'None': - v = None - elif v == 'False': - v = False - elif v == 'True': - v = True - kwargs[k] = v - return kwargs - - def run_cli(args=None): parser = main_parser() config = parser.parse_args(args) - if config.enable_logger is not None: - log_level = _LOGGING_LEVELS[config.log_level.upper()] - handler = logging.StreamHandler() - handler.setLevel(log_level) - handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT)) - for name in config.enable_logger: - logger = logging.getLogger(name) - logger.setLevel(log_level) - logger.addHandler(handler) - else: - logging.basicConfig(level=_LOGGING_LEVELS[config.log_level.upper()]) - if config.disable_logger is not None: - for name in config.disable_logger: - logging.getLogger(name).setLevel(logging.CRITICAL + 1) - + configure_logging(config) logger = logging.getLogger(__name__) - kwargs = build_kwargs(config.extra_config) - producer = KafkaProducer(bootstrap_servers=config.bootstrap_servers, **kwargs) + kwargs = build_connect_kwargs(config) + producer = KafkaProducer(**kwargs) def log_result(res_or_err): if isinstance(res_or_err, Exception):