From 915e87face3b69d9072d9e04ad1b5a54e2c28884 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Wed, 18 Nov 2015 16:28:53 -0800 Subject: [PATCH 1/3] KAFKA-2846: Add Ducktape test for kafka-consumer-groups --- tests/kafkatest/services/kafka/kafka.py | 50 +++++++ .../tests/consumer_group_command_test.py | 125 ++++++++++++++++++ 2 files changed, 175 insertions(+) create mode 100644 tests/kafkatest/tests/consumer_group_command_test.py diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 9a9feda55d4f2..ffcd27139929c 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -410,6 +410,56 @@ def leader(self, topic, partition=0): self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx)) return self.get_node(leader_idx) + def list_consumer_groups(self, node=None, new_consumer=False, command_config=None): + """ Get list of consumer groups. + """ + if node is None: + node = self.nodes[0] + + if command_config is None: + command_config = "" + else: + command_config = "--command-config " + command_config + + if new_consumer: + cmd = "/opt/%s/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server %s %s --list" % \ + (kafka_dir(node), self.bootstrap_servers(), command_config) + else: + cmd = "/opt/%s/bin/kafka-consumer-groups.sh --zookeeper %s %s --list" % \ + (kafka_dir(node), self.zk.connect_setting(), command_config) + output = "" + self.logger.debug(cmd) + for line in node.account.ssh_capture(cmd): + if not line.startswith("SLF4J"): + output += line + self.logger.debug(output) + return output + + def describe_consumer_group(self, group, node=None, new_consumer=False, command_config=None): + """ Describe a consumer group. + """ + if node is None: + node = self.nodes[0] + + if command_config is None: + command_config = "" + else: + command_config = "--command-config " + command_config + + if new_consumer: + cmd = "/opt/%s/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server %s %s --group %s --describe" % \ + (kafka_dir(node), self.bootstrap_servers(), command_config, group) + else: + cmd = "/opt/%s/bin/kafka-consumer-groups.sh --zookeeper %s %s --group %s --describe" % \ + (kafka_dir(node), self.zk.connect_setting(), command_config, group) + output = "" + self.logger.debug(cmd) + for line in node.account.ssh_capture(cmd): + if not (line.startswith("SLF4J") or line.startswith("GROUP, TOPIC") or line.startswith("Could not fetch offset")): + output += line + self.logger.debug(output) + return output + def bootstrap_servers(self, protocol='PLAINTEXT'): """Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,... diff --git a/tests/kafkatest/tests/consumer_group_command_test.py b/tests/kafkatest/tests/consumer_group_command_test.py new file mode 100644 index 0000000000000..8aa37a8c4821a --- /dev/null +++ b/tests/kafkatest/tests/consumer_group_command_test.py @@ -0,0 +1,125 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ducktape.utils.util import wait_until +from ducktape.tests.test import Test +from ducktape.mark import matrix + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.services.security.security_config import SecurityConfig + +import os + +TOPIC = "topic-consumer-group-command" + +class ConsumerGroupCommandTest(Test): + """ + Tests ConsumerGroupCommand + """ + # Root directory for persistent output + PERSISTENT_ROOT = "/mnt/consumer_group_command" + COMMAND_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "command.properties") + + def __init__(self, test_context): + super(ConsumerGroupCommandTest, self).__init__(test_context) + self.num_zk = 1 + self.num_brokers = 1 + self.topics = { + TOPIC: {'partitions': 1, 'replication-factor': 1} + } + self.test_context = test_context + + self.zk = ZookeeperService(test_context, self.num_zk) + + def setUp(self): + self.zk.start() + + def start_kafka(self, security_protocol, interbroker_security_protocol): + self.kafka = KafkaService( + self.test_context, self.num_brokers, + self.zk, security_protocol=security_protocol, + interbroker_security_protocol=interbroker_security_protocol, topics=self.topics) + self.kafka.start() + + def start_consumer(self, security_protocol): + enable_new_consumer = security_protocol == SecurityConfig.SSL + self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC, + consumer_timeout_ms=None, new_consumer=enable_new_consumer) + self.consumer.start() + + @matrix(security_protocol=['PLAINTEXT', 'SSL']) + def test_list_consumer_groups(self, security_protocol='PLAINTEXT'): + """ + Tests if ConsumerGroupCommand is listing correct consumer groups + :return: None + """ + self.start_kafka(security_protocol, security_protocol) + + self.start_consumer(security_protocol) + consumer_node = self.consumer.nodes[0] + + wait_until(lambda: self.consumer.alive(consumer_node), + timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start") + + kafka_node = self.kafka.nodes[0] + if security_protocol is not SecurityConfig.PLAINTEXT: + prop_file = str(self.kafka.security_config.client_config()) + self.logger.debug(prop_file) + kafka_node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False) + kafka_node.account.create_file(self.COMMAND_CONFIG_FILE, prop_file) + + # Verify ConsumerGroupCommand lists expected consumer groups + enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT + command_config_file = None + if enable_new_consumer: + command_config_file = self.COMMAND_CONFIG_FILE + wait_until(lambda: "test-consumer-group" in self.kafka.list_consumer_groups(node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file), timeout_sec=10, + err_msg="Timed out waiting to list expected consumer groups.") + + self.consumer.stop() + + @matrix(security_protocol=['PLAINTEXT', 'SSL']) + def test_describe_consumer_group(self, security_protocol='PLAINTEXT'): + """ + Tests if ConsumerGroupCommand is describing a consumer group correctly + :return: None + """ + self.start_kafka(security_protocol, security_protocol) + + self.start_consumer(security_protocol) + node = self.consumer.nodes[0] + + wait_until(lambda: self.consumer.alive(node), + timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start") + + kafka_node = self.kafka.nodes[0] + if security_protocol is not SecurityConfig.PLAINTEXT: + prop_file = str(self.kafka.security_config.client_config()) + self.logger.debug(prop_file) + kafka_node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False) + kafka_node.account.create_file(self.COMMAND_CONFIG_FILE, prop_file) + + # Verify ConsumerGroupCommand lists expected consumer groups + enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT + command_config_file = None + if enable_new_consumer: + command_config_file = self.COMMAND_CONFIG_FILE + wait_until(lambda: "test-consumer-group, topic-consumer-group-command, 0," in self.kafka.describe_consumer_group(group="test-consumer-group", node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file), timeout_sec=10, + err_msg="Timed out waiting to list expected consumer groups.") + + self.consumer.stop() From 96214869af15d9fc80adc9cbf07b61bc8ad78c50 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 7 Jan 2016 14:10:21 -0800 Subject: [PATCH 2/3] Rebase on latest trunk --- tests/kafkatest/services/kafka/kafka.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index ffcd27139929c..06e0e4d5aef5d 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -423,7 +423,7 @@ def list_consumer_groups(self, node=None, new_consumer=False, command_config=Non if new_consumer: cmd = "/opt/%s/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server %s %s --list" % \ - (kafka_dir(node), self.bootstrap_servers(), command_config) + (kafka_dir(node), self.bootstrap_servers(self.security_protocol), command_config) else: cmd = "/opt/%s/bin/kafka-consumer-groups.sh --zookeeper %s %s --list" % \ (kafka_dir(node), self.zk.connect_setting(), command_config) @@ -448,7 +448,7 @@ def describe_consumer_group(self, group, node=None, new_consumer=False, command_ if new_consumer: cmd = "/opt/%s/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server %s %s --group %s --describe" % \ - (kafka_dir(node), self.bootstrap_servers(), command_config, group) + (kafka_dir(node), self.bootstrap_servers(self.security_protocol), command_config, group) else: cmd = "/opt/%s/bin/kafka-consumer-groups.sh --zookeeper %s %s --group %s --describe" % \ (kafka_dir(node), self.zk.connect_setting(), command_config, group) From 2e35e67536daf7f0a69479b76cd85ec0b4f0dcc0 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Wed, 13 Jan 2016 11:31:47 -0800 Subject: [PATCH 3/3] Remove unnecessary assignement of test_context. Refactor out redundant code. --- .../tests/consumer_group_command_test.py | 56 ++++++------------- 1 file changed, 18 insertions(+), 38 deletions(-) diff --git a/tests/kafkatest/tests/consumer_group_command_test.py b/tests/kafkatest/tests/consumer_group_command_test.py index 8aa37a8c4821a..a7b43a15b9dfb 100644 --- a/tests/kafkatest/tests/consumer_group_command_test.py +++ b/tests/kafkatest/tests/consumer_group_command_test.py @@ -42,8 +42,6 @@ def __init__(self, test_context): self.topics = { TOPIC: {'partitions': 1, 'replication-factor': 1} } - self.test_context = test_context - self.zk = ZookeeperService(test_context, self.num_zk) def setUp(self): @@ -62,20 +60,12 @@ def start_consumer(self, security_protocol): consumer_timeout_ms=None, new_consumer=enable_new_consumer) self.consumer.start() - @matrix(security_protocol=['PLAINTEXT', 'SSL']) - def test_list_consumer_groups(self, security_protocol='PLAINTEXT'): - """ - Tests if ConsumerGroupCommand is listing correct consumer groups - :return: None - """ + def setup_and_verify(self, security_protocol, group=None): self.start_kafka(security_protocol, security_protocol) - self.start_consumer(security_protocol) consumer_node = self.consumer.nodes[0] - wait_until(lambda: self.consumer.alive(consumer_node), - timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start") - + timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start") kafka_node = self.kafka.nodes[0] if security_protocol is not SecurityConfig.PLAINTEXT: prop_file = str(self.kafka.security_config.client_config()) @@ -88,38 +78,28 @@ def test_list_consumer_groups(self, security_protocol='PLAINTEXT'): command_config_file = None if enable_new_consumer: command_config_file = self.COMMAND_CONFIG_FILE - wait_until(lambda: "test-consumer-group" in self.kafka.list_consumer_groups(node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file), timeout_sec=10, - err_msg="Timed out waiting to list expected consumer groups.") + + if group: + wait_until(lambda: ("%s, topic-consumer-group-command, 0," % group) in self.kafka.describe_consumer_group(group=group, node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file), timeout_sec=10, + err_msg="Timed out waiting to list expected consumer groups.") + else: + wait_until(lambda: "test-consumer-group" in self.kafka.list_consumer_groups(node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file), timeout_sec=10, + err_msg="Timed out waiting to list expected consumer groups.") self.consumer.stop() + @matrix(security_protocol=['PLAINTEXT', 'SSL']) + def test_list_consumer_groups(self, security_protocol='PLAINTEXT'): + """ + Tests if ConsumerGroupCommand is listing correct consumer groups + :return: None + """ + self.setup_and_verify(security_protocol) + @matrix(security_protocol=['PLAINTEXT', 'SSL']) def test_describe_consumer_group(self, security_protocol='PLAINTEXT'): """ Tests if ConsumerGroupCommand is describing a consumer group correctly :return: None """ - self.start_kafka(security_protocol, security_protocol) - - self.start_consumer(security_protocol) - node = self.consumer.nodes[0] - - wait_until(lambda: self.consumer.alive(node), - timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start") - - kafka_node = self.kafka.nodes[0] - if security_protocol is not SecurityConfig.PLAINTEXT: - prop_file = str(self.kafka.security_config.client_config()) - self.logger.debug(prop_file) - kafka_node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False) - kafka_node.account.create_file(self.COMMAND_CONFIG_FILE, prop_file) - - # Verify ConsumerGroupCommand lists expected consumer groups - enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT - command_config_file = None - if enable_new_consumer: - command_config_file = self.COMMAND_CONFIG_FILE - wait_until(lambda: "test-consumer-group, topic-consumer-group-command, 0," in self.kafka.describe_consumer_group(group="test-consumer-group", node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file), timeout_sec=10, - err_msg="Timed out waiting to list expected consumer groups.") - - self.consumer.stop() + self.setup_and_verify(security_protocol, group="test-consumer-group")