Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,56 @@ def leader(self, topic, partition=0):
self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx))
return self.get_node(leader_idx)

def list_consumer_groups(self, node=None, new_consumer=False, command_config=None):
""" Get list of consumer groups.
"""
if node is None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there's a lot of redundant code between list_consumer_groups and describe_consumer_group since almost all of it is for setting up the kafka-consumer-groups.sh command. What about making that a common, fairly generic method and making each of these thin wrappers around it? For example, the generic common version might do all of this initial command setup, just accepting a list of extra argumets (i.e. empty in list_groups, just the group name in describe). Only thing I'm not sure about making generic is the way you're processing the output (which is the unfortunate result of not having something like JSON output for these commands).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I can do that.

Only thing I'm not sure about making generic is the way you're processing the output (which is the unfortunate result of not having something like JSON output for these commands)

True, Kafka-313 would have addressed that, but it never got traction.

node = self.nodes[0]

if command_config is None:
command_config = ""
else:
command_config = "--command-config " + command_config

if new_consumer:
cmd = "/opt/%s/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server %s %s --list" % \
(kafka_dir(node), self.bootstrap_servers(self.security_protocol), command_config)
else:
cmd = "/opt/%s/bin/kafka-consumer-groups.sh --zookeeper %s %s --list" % \
(kafka_dir(node), self.zk.connect_setting(), command_config)
output = ""
self.logger.debug(cmd)
for line in node.account.ssh_capture(cmd):
if not line.startswith("SLF4J"):
output += line
self.logger.debug(output)
return output

def describe_consumer_group(self, group, node=None, new_consumer=False, command_config=None):
""" Describe a consumer group.
"""
if node is None:
node = self.nodes[0]

if command_config is None:
command_config = ""
else:
command_config = "--command-config " + command_config

if new_consumer:
cmd = "/opt/%s/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server %s %s --group %s --describe" % \
(kafka_dir(node), self.bootstrap_servers(self.security_protocol), command_config, group)
else:
cmd = "/opt/%s/bin/kafka-consumer-groups.sh --zookeeper %s %s --group %s --describe" % \
(kafka_dir(node), self.zk.connect_setting(), command_config, group)
output = ""
self.logger.debug(cmd)
for line in node.account.ssh_capture(cmd):
if not (line.startswith("SLF4J") or line.startswith("GROUP, TOPIC") or line.startswith("Could not fetch offset")):
output += line
self.logger.debug(output)
return output

def bootstrap_servers(self, protocol='PLAINTEXT'):
"""Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,...

Expand Down
105 changes: 105 additions & 0 deletions tests/kafkatest/tests/consumer_group_command_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from ducktape.utils.util import wait_until
from ducktape.tests.test import Test
from ducktape.mark import matrix

from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.security.security_config import SecurityConfig

import os

TOPIC = "topic-consumer-group-command"

class ConsumerGroupCommandTest(Test):
"""
Tests ConsumerGroupCommand
"""
# Root directory for persistent output
PERSISTENT_ROOT = "/mnt/consumer_group_command"
COMMAND_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "command.properties")

def __init__(self, test_context):
super(ConsumerGroupCommandTest, self).__init__(test_context)
self.num_zk = 1
self.num_brokers = 1
self.topics = {
TOPIC: {'partitions': 1, 'replication-factor': 1}
}
self.zk = ZookeeperService(test_context, self.num_zk)

def setUp(self):
self.zk.start()

def start_kafka(self, security_protocol, interbroker_security_protocol):
self.kafka = KafkaService(
self.test_context, self.num_brokers,
self.zk, security_protocol=security_protocol,
interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
self.kafka.start()

def start_consumer(self, security_protocol):
enable_new_consumer = security_protocol == SecurityConfig.SSL
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this the condition for enabling the new consumer? I know you require it for SSL, but why not always use new consumer? Or make the choice of old vs new consumer explicit in the different tests you're running?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reasoning is explained in my previous response. Pasting here for reference.

Below are the combinations for ConsumerGroupCommand.
OldConsumer
NewConsumer
NewConsumer + Security

I chose to test following combinations and I think it is reasonable.
OldConsumer
NewConsumer + Security

self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
consumer_timeout_ms=None, new_consumer=enable_new_consumer)
self.consumer.start()

def setup_and_verify(self, security_protocol, group=None):
self.start_kafka(security_protocol, security_protocol)
self.start_consumer(security_protocol)
consumer_node = self.consumer.nodes[0]
wait_until(lambda: self.consumer.alive(consumer_node),
timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
kafka_node = self.kafka.nodes[0]
if security_protocol is not SecurityConfig.PLAINTEXT:
prop_file = str(self.kafka.security_config.client_config())
self.logger.debug(prop_file)
kafka_node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
kafka_node.account.create_file(self.COMMAND_CONFIG_FILE, prop_file)

# Verify ConsumerGroupCommand lists expected consumer groups
enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT
command_config_file = None
if enable_new_consumer:
command_config_file = self.COMMAND_CONFIG_FILE

if group:
wait_until(lambda: ("%s, topic-consumer-group-command, 0," % group) in self.kafka.describe_consumer_group(group=group, node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file), timeout_sec=10,
err_msg="Timed out waiting to list expected consumer groups.")
else:
wait_until(lambda: "test-consumer-group" in self.kafka.list_consumer_groups(node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file), timeout_sec=10,
err_msg="Timed out waiting to list expected consumer groups.")

self.consumer.stop()

@matrix(security_protocol=['PLAINTEXT', 'SSL'])
def test_list_consumer_groups(self, security_protocol='PLAINTEXT'):
"""
Tests if ConsumerGroupCommand is listing correct consumer groups
:return: None
"""
self.setup_and_verify(security_protocol)

@matrix(security_protocol=['PLAINTEXT', 'SSL'])
def test_describe_consumer_group(self, security_protocol='PLAINTEXT'):
"""
Tests if ConsumerGroupCommand is describing a consumer group correctly
:return: None
"""
self.setup_and_verify(security_protocol, group="test-consumer-group")