Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 6 additions & 46 deletions kafka/cli/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from .partitions import PartitionsSubCommand
from .topics import TopicsSubCommand
from .users import UsersSubCommand
from ..common import add_common_cli_args
from ..common import add_common_cli_args, configure_logging, build_connect_kwargs
from kafka.errors import BrokerResponseError


Expand All @@ -27,27 +27,6 @@ def main_parser():
return parser


_LOGGING_LEVELS = {'NOTSET': 0, 'DEBUG': 10, 'INFO': 20, 'WARNING': 30, 'ERROR': 40, 'CRITICAL': 50}


def build_kwargs(props):
kwargs = {}
for prop in props or []:
k, v = prop.split('=')
try:
v = int(v)
except ValueError:
pass
if v == 'None':
v = None
elif v == 'False':
v = False
elif v == 'True':
v = True
kwargs[k] = v
return kwargs


def run_cli(args=None):
parser = main_parser()
subparsers = parser.add_subparsers(help='subcommands')
Expand All @@ -56,34 +35,15 @@ def run_cli(args=None):
GroupsSubCommand, UsersSubCommand]:
cmd.add_subparser(subparsers)
config = parser.parse_args(args)

if config.enable_logger is not None:
log_level = _LOGGING_LEVELS[config.log_level.upper()]
handler = logging.StreamHandler()
handler.setLevel(log_level)
handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
for name in config.enable_logger:
logger = logging.getLogger(name)
logger.setLevel(log_level)
logger.addHandler(handler)
else:
logging.basicConfig(level=_LOGGING_LEVELS[config.log_level.upper()])
if config.disable_logger is not None:
for name in config.disable_logger:
logging.getLogger(name).setLevel(logging.CRITICAL + 1)

if config.format not in ('raw', 'json'):
raise ValueError('Unrecognized format: %s' % config.format)

configure_logging(config)
logger = logging.getLogger(__name__)

kwargs = build_kwargs(config.extra_config)
client = KafkaAdminClient(
bootstrap_servers=config.bootstrap_servers,
security_protocol=config.security_protocol,
sasl_mechanism=config.sasl_mechanism,
sasl_plain_username=config.sasl_user,
sasl_plain_password=config.sasl_password,
**kwargs)
kwargs = build_connect_kwargs(config)
client = KafkaAdminClient(**kwargs)

try:
result = config.command(client, config)
if config.format == 'raw':
Expand Down
68 changes: 67 additions & 1 deletion kafka/cli/common.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
def add_common_cli_args(parser):
import logging


def add_connect_cli_args(parser):
connect_group = parser.add_argument_group('connection')
connect_group.add_argument(
'-b', '--bootstrap-servers', type=str, action='append', required=True,
Expand All @@ -11,6 +14,39 @@ def add_common_cli_args(parser):
'-u', '--sasl-user', type=str)
connect_group.add_argument(
'-p', '--sasl-password', type=str)


def build_kwargs(props):
kwargs = {}
for prop in props or []:
k, v = prop.split('=')
try:
v = int(v)
except ValueError:
pass
if v == 'None':
v = None
elif v == 'False':
v = False
elif v == 'True':
v = True
kwargs[k] = v
return kwargs


def build_connect_kwargs(config):
kwargs = build_kwargs(config.extra_config)
kwargs.update({
'bootstrap_servers': config.bootstrap_servers,
'security_protocol': config.security_protocol,
'sasl_mechanism': config.sasl_mechanism,
'sasl_plain_username': config.sasl_user,
'sasl_plain_password': config.sasl_password,
})
return kwargs


def add_logging_cli_args(parser):
logging_group = parser.add_argument_group('logging')
logging_group.add_argument(
'-l', '--log-level', type=str, default='CRITICAL',
Expand All @@ -25,3 +61,33 @@ def add_common_cli_args(parser):
extended_group.add_argument(
'-c', '--extra-config', type=str, action='append',
help='additional configuration properties for client in "key=val" format. Can be provided multiple times.')


def configure_logging(config):
_LOGGING_LEVELS = {
'NOTSET': 0,
'DEBUG': 10,
'INFO': 20,
'WARNING': 30,
'ERROR': 40,
'CRITICAL': 50,
}
if config.enable_logger is not None:
log_level = _LOGGING_LEVELS[config.log_level.upper()]
handler = logging.StreamHandler()
handler.setLevel(log_level)
handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
for name in config.enable_logger:
logger = logging.getLogger(name)
logger.setLevel(log_level)
logger.addHandler(handler)
else:
logging.basicConfig(level=_LOGGING_LEVELS[config.log_level.upper()])
if config.disable_logger is not None:
for name in config.disable_logger:
logging.getLogger(name).setLevel(logging.CRITICAL + 1)


def add_common_cli_args(parser):
add_connect_cli_args(parser)
add_logging_cli_args(parser)
42 changes: 3 additions & 39 deletions kafka/cli/consumer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from kafka import KafkaConsumer
from kafka.errors import KafkaError
from ..common import add_common_cli_args
from ..common import add_common_cli_args, configure_logging, build_connect_kwargs


def main_parser():
Expand All @@ -29,53 +29,17 @@ def main_parser():
return parser


_LOGGING_LEVELS = {'NOTSET': 0, 'DEBUG': 10, 'INFO': 20, 'WARNING': 30, 'ERROR': 40, 'CRITICAL': 50}


def build_kwargs(props):
kwargs = {}
for prop in props or []:
k, v = prop.split('=')
try:
v = int(v)
except ValueError:
pass
if v == 'None':
v = None
elif v == 'False':
v = False
elif v == 'True':
v = True
kwargs[k] = v
return kwargs


def run_cli(args=None):
parser = main_parser()
config = parser.parse_args(args)

if config.enable_logger is not None:
log_level = _LOGGING_LEVELS[config.log_level.upper()]
handler = logging.StreamHandler()
handler.setLevel(log_level)
handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
for name in config.enable_logger:
logger = logging.getLogger(name)
logger.setLevel(log_level)
logger.addHandler(handler)
else:
logging.basicConfig(level=_LOGGING_LEVELS[config.log_level.upper()])
if config.disable_logger is not None:
for name in config.disable_logger:
logging.getLogger(name).setLevel(logging.CRITICAL + 1)

if config.format not in ('str', 'raw', 'full'):
raise ValueError('Unrecognized format: %s' % config.format)
configure_logging(config)
logger = logging.getLogger(__name__)

kwargs = build_kwargs(config.extra_config)
kwargs = build_connect_kwargs(config)
consumer = KafkaConsumer(
bootstrap_servers=config.bootstrap_servers,
group_id=config.group,
group_instance_id=config.group_instance_id,
**kwargs)
Expand Down
43 changes: 4 additions & 39 deletions kafka/cli/producer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import sys

from kafka import KafkaProducer
from ..common import add_common_cli_args
from ..common import add_common_cli_args, configure_logging, build_connect_kwargs


def main_parser():
Expand All @@ -21,50 +21,15 @@ def main_parser():
return parser


_LOGGING_LEVELS = {'NOTSET': 0, 'DEBUG': 10, 'INFO': 20, 'WARNING': 30, 'ERROR': 40, 'CRITICAL': 50}


def build_kwargs(props):
kwargs = {}
for prop in props or []:
k, v = prop.split('=')
try:
v = int(v)
except ValueError:
pass
if v == 'None':
v = None
elif v == 'False':
v = False
elif v == 'True':
v = True
kwargs[k] = v
return kwargs


def run_cli(args=None):
parser = main_parser()
config = parser.parse_args(args)

if config.enable_logger is not None:
log_level = _LOGGING_LEVELS[config.log_level.upper()]
handler = logging.StreamHandler()
handler.setLevel(log_level)
handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
for name in config.enable_logger:
logger = logging.getLogger(name)
logger.setLevel(log_level)
logger.addHandler(handler)
else:
logging.basicConfig(level=_LOGGING_LEVELS[config.log_level.upper()])
if config.disable_logger is not None:
for name in config.disable_logger:
logging.getLogger(name).setLevel(logging.CRITICAL + 1)

configure_logging(config)
logger = logging.getLogger(__name__)

kwargs = build_kwargs(config.extra_config)
producer = KafkaProducer(bootstrap_servers=config.bootstrap_servers, **kwargs)
kwargs = build_connect_kwargs(config)
producer = KafkaProducer(**kwargs)

def log_result(res_or_err):
if isinstance(res_or_err, Exception):
Expand Down
Loading