diff --git a/tests/kafkatest/sanity_checks/test_bounce.py b/tests/kafkatest/sanity_checks/test_bounce.py new file mode 100644 index 0000000000000..c01f23b0cbaa4 --- /dev/null +++ b/tests/kafkatest/sanity_checks/test_bounce.py @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ducktape.mark import parametrize +from ducktape.mark.resource import cluster +from ducktape.tests.test import Test +from ducktape.utils.util import wait_until + +from kafkatest.services.kafka import KafkaService, quorum +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.zookeeper import ZookeeperService + + +class TestBounce(Test): + """Sanity checks on verifiable producer service class with cluster roll.""" + def __init__(self, test_context): + super(TestBounce, self).__init__(test_context) + + self.topic = "topic" + self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None + self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, + topics={self.topic: {"partitions": 1, "replication-factor": 1}}, + controller_num_nodes_override=3 if quorum.for_test(test_context) == quorum.remote_raft else 1) + self.num_messages = 1000 + + def create_producer(self): + # This will produce to source kafka cluster + self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, + max_messages=self.num_messages, throughput=self.num_messages // 10) + def setUp(self): + if self.zk: + self.zk.start() + + @cluster(num_nodes=6) + @parametrize(metadata_quorum=quorum.remote_raft) + @cluster(num_nodes=4) + @parametrize(metadata_quorum=quorum.colocated_raft) + @cluster(num_nodes=4) + @parametrize(metadata_quorum=quorum.zk) + def test_simple_run(self, metadata_quorum): + """ + Test that we can start VerifiableProducer on the current branch snapshot version, and + verify that we can produce a small number of messages both before and after a subsequent roll. + """ + self.kafka.start() + for first_time in [True, False]: + self.create_producer() + self.producer.start() + wait_until(lambda: self.producer.num_acked > 5, timeout_sec=15, + err_msg="Producer failed to start in a reasonable amount of time.") + + self.producer.wait() + num_produced = self.producer.num_acked + assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages) + if first_time: + self.producer.stop() + if self.kafka.quorum_info.using_raft and self.kafka.remote_controller_quorum: + self.kafka.remote_controller_quorum.restart_cluster() + self.kafka.restart_cluster() diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py index 686cd42e123e5..0847ce0cb41d6 100644 --- a/tests/kafkatest/sanity_checks/test_console_consumer.py +++ b/tests/kafkatest/sanity_checks/test_console_consumer.py @@ -21,7 +21,7 @@ from ducktape.utils.util import wait_until from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService from kafkatest.utils.remote_account import line_count, file_exists @@ -34,20 +34,22 @@ def __init__(self, test_context): super(ConsoleConsumerTest, self).__init__(test_context) self.topic = "topic" - self.zk = ZookeeperService(test_context, num_nodes=1) + self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, zk_chroot="/kafka", topics={self.topic: {"partitions": 1, "replication-factor": 1}}) self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic) def setUp(self): - self.zk.start() + if self.zk: + self.zk.start() @cluster(num_nodes=3) - @matrix(security_protocol=['PLAINTEXT', 'SSL']) + @matrix(security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all_raft) @cluster(num_nodes=4) - @matrix(security_protocol=['SASL_SSL'], sasl_mechanism=['PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512']) - @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL']) - def test_lifecycle(self, security_protocol, sasl_mechanism='GSSAPI'): + @matrix(security_protocol=['SASL_SSL'], sasl_mechanism=['PLAIN'], metadata_quorum=quorum.all_raft) + @matrix(security_protocol=['SASL_SSL'], sasl_mechanism=['SCRAM-SHA-256', 'SCRAM-SHA-512']) # SCRAM not yet supported with Raft + @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], metadata_quorum=quorum.all_raft) + def test_lifecycle(self, security_protocol, sasl_mechanism='GSSAPI', metadata_quorum=quorum.zk): """Check that console consumer starts/stops properly, and that we are capturing log output.""" self.kafka.security_protocol = security_protocol diff --git a/tests/kafkatest/sanity_checks/test_performance_services.py b/tests/kafkatest/sanity_checks/test_performance_services.py index 280152c0f8621..f0d1a48bf04fe 100644 --- a/tests/kafkatest/sanity_checks/test_performance_services.py +++ b/tests/kafkatest/sanity_checks/test_performance_services.py @@ -13,11 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.mark import parametrize +from ducktape.mark import matrix, parametrize from ducktape.mark.resource import cluster from ducktape.tests.test import Test -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService from kafkatest.services.performance import latency, compute_aggregate_throughput from kafkatest.services.zookeeper import ZookeeperService @@ -31,10 +31,11 @@ def __init__(self, test_context): self.num_records = 10000 self.topic = "topic" - self.zk = ZookeeperService(test_context, 1) + self.zk = ZookeeperService(test_context, 1) if quorum.for_test(test_context) == quorum.zk else None def setUp(self): - self.zk.start() + if self.zk: + self.zk.start() @cluster(num_nodes=5) # We are keeping 0.8.2 here so that we don't inadvertently break support for it. Since this is just a sanity check, @@ -43,8 +44,9 @@ def setUp(self): @parametrize(version=str(LATEST_0_9), new_consumer=False) @parametrize(version=str(LATEST_0_9)) @parametrize(version=str(LATEST_1_1), new_consumer=False) - @parametrize(version=str(DEV_BRANCH)) - def test_version(self, version=str(LATEST_0_9), new_consumer=True): + @cluster(num_nodes=5) + @matrix(version=[str(DEV_BRANCH)], metadata_quorum=quorum.all) + def test_version(self, version=str(LATEST_0_9), new_consumer=True, metadata_quorum=quorum.zk): """ Sanity check out producer performance service - verify that we can run the service with a small number of messages. The actual stats here are pretty meaningless since the number of messages is quite small. @@ -67,6 +69,7 @@ def test_version(self, version=str(LATEST_0_9), new_consumer=True): 'buffer.memory': 64*1024*1024}) self.producer_perf.run() producer_perf_data = compute_aggregate_throughput(self.producer_perf) + assert producer_perf_data['records_per_sec'] > 0 # check basic run of end to end latency self.end_to_end = EndToEndLatencyService( @@ -82,6 +85,7 @@ def test_version(self, version=str(LATEST_0_9), new_consumer=True): self.consumer_perf.group = "test-consumer-group" self.consumer_perf.run() consumer_perf_data = compute_aggregate_throughput(self.consumer_perf) + assert consumer_perf_data['records_per_sec'] > 0 return { "producer_performance": producer_perf_data, diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py index 5a95e48898949..32961f1995dff 100644 --- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py +++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py @@ -14,12 +14,12 @@ # limitations under the License. -from ducktape.mark import parametrize +from ducktape.mark import matrix, parametrize from ducktape.mark.resource import cluster from ducktape.tests.test import Test from ducktape.utils.util import wait_until -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService from kafkatest.utils import is_version @@ -32,7 +32,7 @@ def __init__(self, test_context): super(TestVerifiableProducer, self).__init__(test_context) self.topic = "topic" - self.zk = ZookeeperService(test_context, num_nodes=1) + self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {"partitions": 1, "replication-factor": 1}}) @@ -41,24 +41,40 @@ def __init__(self, test_context): self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, max_messages=self.num_messages, throughput=self.num_messages // 10) def setUp(self): - self.zk.start() - self.kafka.start() + if self.zk: + self.zk.start() @cluster(num_nodes=3) @parametrize(producer_version=str(LATEST_0_8_2)) @parametrize(producer_version=str(LATEST_0_9)) @parametrize(producer_version=str(LATEST_0_10_0)) @parametrize(producer_version=str(LATEST_0_10_1)) - @parametrize(producer_version=str(DEV_BRANCH)) - def test_simple_run(self, producer_version=DEV_BRANCH): + @matrix(producer_version=[str(DEV_BRANCH)], security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all) + @cluster(num_nodes=4) + @matrix(producer_version=[str(DEV_BRANCH)], security_protocol=['SASL_SSL'], sasl_mechanism=['PLAIN', 'GSSAPI'], + metadata_quorum=quorum.all) + def test_simple_run(self, producer_version, security_protocol = 'PLAINTEXT', sasl_mechanism='PLAIN', + metadata_quorum=quorum.zk): """ Test that we can start VerifiableProducer on the current branch snapshot version or against the 0.8.2 jar, and verify that we can produce a small number of messages. """ + self.kafka.security_protocol = security_protocol + self.kafka.client_sasl_mechanism = sasl_mechanism + self.kafka.interbroker_security_protocol = security_protocol + self.kafka.interbroker_sasl_mechanism = sasl_mechanism + if self.kafka.quorum_info.using_raft: + controller_quorum = self.kafka.controller_quorum + controller_quorum.controller_security_protocol = security_protocol + controller_quorum.controller_sasl_mechanism = sasl_mechanism + controller_quorum.intercontroller_security_protocol = security_protocol + controller_quorum.intercontroller_sasl_mechanism = sasl_mechanism + self.kafka.start() + node = self.producer.nodes[0] node.version = KafkaVersion(producer_version) self.producer.start() - wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5, + wait_until(lambda: self.producer.num_acked > 5, timeout_sec=15, err_msg="Producer failed to start in a reasonable amount of time.") # using version.vstring (distutils.version.LooseVersion) is a tricky way of ensuring diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 14b450c0871e2..32e714543cd0a 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -21,7 +21,7 @@ from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.monitor.jmx import JmxMixin, JmxTool -from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_9_0_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0 +from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0 from kafkatest.services.kafka.util import fix_opts_for_new_jvm """ @@ -151,7 +151,9 @@ def prop_file(self, node): def start_cmd(self, node): """Return the start command appropriate for the given node.""" args = self.args.copy() - args['zk_connect'] = self.kafka.zk_connect_setting() + args['broker_list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol) + if not self.new_consumer: + args['zk_connect'] = self.kafka.zk_connect_setting() args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE args['stderr'] = ConsoleConsumer.STDERR_CAPTURE args['log_dir'] = ConsoleConsumer.LOG_DIR @@ -160,7 +162,6 @@ def start_cmd(self, node): args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE args['jmx_port'] = self.jmx_port args['console_consumer'] = self.path.script("kafka-console-consumer.sh", node) - args['broker_list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol) if self.kafka_opts_override: args['kafka_opts'] = "\"%s\"" % self.kafka_opts_override @@ -177,7 +178,7 @@ def start_cmd(self, node): "--consumer.config %(config_file)s " % args if self.new_consumer: - assert node.version >= V_0_9_0_0, \ + assert node.version.consumer_supports_bootstrap_server(), \ "new_consumer is only supported if version >= 0.9.0.0, version %s" % str(node.version) if node.version <= LATEST_0_10_0: cmd += " --new-consumer" diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py index 930a68fb0e91d..6df8dfb6be86a 100644 --- a/tests/kafkatest/services/performance/consumer_performance.py +++ b/tests/kafkatest/services/performance/consumer_performance.py @@ -18,7 +18,7 @@ from kafkatest.services.performance import PerformanceService from kafkatest.services.security.security_config import SecurityConfig -from kafkatest.version import DEV_BRANCH, V_0_9_0_0, V_2_0_0, LATEST_0_10_0 +from kafkatest.version import DEV_BRANCH, V_2_0_0, LATEST_0_10_0 class ConsumerPerformanceService(PerformanceService): @@ -79,14 +79,14 @@ def __init__(self, context, num_nodes, kafka, topic, messages, version=DEV_BRANC self.new_consumer = new_consumer self.settings = settings - assert version >= V_0_9_0_0 or (not new_consumer), \ + assert version.consumer_supports_bootstrap_server() or (not new_consumer), \ "new_consumer is only supported if version >= 0.9.0.0, version %s" % str(version) assert version < V_2_0_0 or new_consumer, \ "new_consumer==false is only supported if version < 2.0.0, version %s" % str(version) security_protocol = self.security_config.security_protocol - assert version >= V_0_9_0_0 or security_protocol == SecurityConfig.PLAINTEXT, \ + assert version.consumer_supports_bootstrap_server() or security_protocol == SecurityConfig.PLAINTEXT, \ "Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version)) # These less-frequently used settings can be updated manually after instantiation @@ -142,7 +142,7 @@ def start_cmd(self, node): for key, value in self.args(node.version).items(): cmd += " --%s %s" % (key, value) - if node.version >= V_0_9_0_0: + if node.version.consumer_supports_bootstrap_server(): # This is only used for security settings cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE @@ -155,7 +155,7 @@ def start_cmd(self, node): def parse_results(self, line, version): parts = line.split(',') - if version >= V_0_9_0_0: + if version.consumer_supports_bootstrap_server(): result = { 'total_mb': float(parts[2]), 'mbps': float(parts[3]), diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py index 2c7f69a04d60d..3cde3ef1a5d40 100644 --- a/tests/kafkatest/services/performance/end_to_end_latency.py +++ b/tests/kafkatest/services/performance/end_to_end_latency.py @@ -17,7 +17,7 @@ from kafkatest.services.performance import PerformanceService from kafkatest.services.security.security_config import SecurityConfig -from kafkatest.version import DEV_BRANCH, V_0_9_0_0 +from kafkatest.version import DEV_BRANCH @@ -53,7 +53,7 @@ def __init__(self, context, num_nodes, kafka, topic, num_records, compression_ty security_protocol = self.security_config.security_protocol - if version < V_0_9_0_0: + if not version.consumer_supports_bootstrap_server(): assert security_protocol == SecurityConfig.PLAINTEXT, \ "Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version)) assert compression_type == "none", \ @@ -74,15 +74,18 @@ def __init__(self, context, num_nodes, kafka, topic, num_records, compression_ty def start_cmd(self, node): args = self.args.copy() args.update({ - 'zk_connect': self.kafka.zk_connect_setting(), 'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol), 'config_file': EndToEndLatencyService.CONFIG_FILE, 'kafka_run_class': self.path.script("kafka-run-class.sh", node), 'java_class_name': self.java_class_name() }) + if not node.version.consumer_supports_bootstrap_server(): + args.update({ + 'zk_connect': self.kafka.zk_connect_setting(), + }) cmd = "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % EndToEndLatencyService.LOG4J_CONFIG - if node.version >= V_0_9_0_0: + if node.version.consumer_supports_bootstrap_server(): cmd += "KAFKA_OPTS=%(kafka_opts)s %(kafka_run_class)s %(java_class_name)s " % args cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d %(message_bytes)d %(config_file)s" % args else: @@ -102,7 +105,7 @@ def _worker(self, idx, node): node.account.create_file(EndToEndLatencyService.LOG4J_CONFIG, log_config) client_config = str(self.security_config) - if node.version >= V_0_9_0_0: + if node.version.consumer_supports_bootstrap_server(): client_config += "compression_type=%(compression_type)s" % self.args node.account.create_file(EndToEndLatencyService.CONFIG_FILE, client_config) diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py index 3c4369883040d..a990d4fe04527 100644 --- a/tests/kafkatest/services/performance/producer_performance.py +++ b/tests/kafkatest/services/performance/producer_performance.py @@ -22,7 +22,7 @@ from kafkatest.services.monitor.http import HttpMetricsCollector from kafkatest.services.performance import PerformanceService from kafkatest.services.security.security_config import SecurityConfig -from kafkatest.version import DEV_BRANCH, V_0_9_0_0 +from kafkatest.version import DEV_BRANCH class ProducerPerformanceService(HttpMetricsCollector, PerformanceService): @@ -55,7 +55,7 @@ def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, t self.security_config = kafka.security_config.client_config() security_protocol = self.security_config.security_protocol - assert version >= V_0_9_0_0 or security_protocol == SecurityConfig.PLAINTEXT, \ + assert version.consumer_supports_bootstrap_server() or security_protocol == SecurityConfig.PLAINTEXT, \ "Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version)) self.args = { diff --git a/tests/kafkatest/tests/client/client_compatibility_features_test.py b/tests/kafkatest/tests/client/client_compatibility_features_test.py index 4e7aeed6a832a..d98dffa3b8549 100644 --- a/tests/kafkatest/tests/client/client_compatibility_features_test.py +++ b/tests/kafkatest/tests/client/client_compatibility_features_test.py @@ -19,11 +19,12 @@ import time from random import randint -from ducktape.mark import parametrize +from ducktape.mark import matrix, parametrize +from ducktape.mark.resource import cluster from ducktape.tests.test import TestContext from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from ducktape.tests.test import Test from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, V_0_11_0_0, V_0_10_1_0, KafkaVersion @@ -69,7 +70,7 @@ def __init__(self, test_context): """:type test_context: ducktape.tests.test.TestContext""" super(ClientCompatibilityFeaturesTest, self).__init__(test_context=test_context) - self.zk = ZookeeperService(test_context, num_nodes=3) + self.zk = ZookeeperService(test_context, num_nodes=3) if quorum.for_test(test_context) == quorum.zk else None # Generate a unique topic name topic_name = "client_compat_features_topic_%d%d" % (int(time.time()), randint(0, 2147483647)) @@ -81,11 +82,11 @@ def __init__(self, test_context): def invoke_compatibility_program(self, features): # Run the compatibility test on the first Kafka node. - node = self.zk.nodes[0] + node = self.kafka.nodes[0] cmd = ("%s org.apache.kafka.tools.ClientCompatibilityTest " "--bootstrap-server %s " "--num-cluster-nodes %d " - "--topic %s " % (self.zk.path.script("kafka-run-class.sh", node), + "--topic %s " % (self.kafka.path.script("kafka-run-class.sh", node), self.kafka.bootstrap_servers(), len(self.kafka.nodes), list(self.topics.keys())[0])) @@ -107,7 +108,8 @@ def invoke_compatibility_program(self, features): self.logger.info("** Command failed. See %s for log messages." % ssh_log_file) raise - @parametrize(broker_version=str(DEV_BRANCH)) + @cluster(num_nodes=7) + @matrix(broker_version=[str(DEV_BRANCH)], metadata_quorum=quorum.all_non_upgrade) @parametrize(broker_version=str(LATEST_0_10_0)) @parametrize(broker_version=str(LATEST_0_10_1)) @parametrize(broker_version=str(LATEST_0_10_2)) @@ -122,8 +124,9 @@ def invoke_compatibility_program(self, features): @parametrize(broker_version=str(LATEST_2_5)) @parametrize(broker_version=str(LATEST_2_6)) @parametrize(broker_version=str(LATEST_2_7)) - def run_compatibility_test(self, broker_version): - self.zk.start() + def run_compatibility_test(self, broker_version, metadata_quorum=quorum.zk): + if self.zk: + self.zk.start() self.kafka.set_version(KafkaVersion(broker_version)) self.kafka.start() features = get_broker_features(broker_version) diff --git a/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py b/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py index 52d41f5aa1a52..317d0dd40fd64 100644 --- a/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py +++ b/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py @@ -13,11 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.mark import parametrize +from ducktape.mark import matrix, parametrize +from ducktape.mark.resource import cluster from ducktape.utils.util import wait_until from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest @@ -34,7 +35,7 @@ def __init__(self, test_context): super(ClientCompatibilityProduceConsumeTest, self).__init__(test_context=test_context) self.topic = "test_topic" - self.zk = ZookeeperService(test_context, num_nodes=3) + self.zk = ZookeeperService(test_context, num_nodes=3) if quorum.for_test(test_context) == quorum.zk else None self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic:{ "partitions": 10, "replication-factor": 2}}) @@ -46,13 +47,15 @@ def __init__(self, test_context): self.num_consumers = 1 def setUp(self): - self.zk.start() + if self.zk: + self.zk.start() def min_cluster_size(self): # Override this since we're adding services outside of the constructor return super(ClientCompatibilityProduceConsumeTest, self).min_cluster_size() + self.num_producers + self.num_consumers - @parametrize(broker_version=str(DEV_BRANCH)) + @cluster(num_nodes=9) + @matrix(broker_version=[str(DEV_BRANCH)], metadata_quorum=quorum.all_non_upgrade) @parametrize(broker_version=str(LATEST_0_10_0)) @parametrize(broker_version=str(LATEST_0_10_1)) @parametrize(broker_version=str(LATEST_0_10_2)) @@ -67,7 +70,7 @@ def min_cluster_size(self): @parametrize(broker_version=str(LATEST_2_5)) @parametrize(broker_version=str(LATEST_2_6)) @parametrize(broker_version=str(LATEST_2_7)) - def test_produce_consume(self, broker_version): + def test_produce_consume(self, broker_version, metadata_quorum=quorum.zk): print("running producer_consumer_compat with broker_version = %s" % broker_version, flush=True) self.kafka.set_version(KafkaVersion(broker_version)) self.kafka.security_protocol = "PLAINTEXT" diff --git a/tests/kafkatest/tests/client/compression_test.py b/tests/kafkatest/tests/client/compression_test.py index 23b30eac24c0e..37ce52d7efca0 100644 --- a/tests/kafkatest/tests/client/compression_test.py +++ b/tests/kafkatest/tests/client/compression_test.py @@ -13,12 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.mark import parametrize +from ducktape.mark import matrix from ducktape.utils.util import wait_until from ducktape.mark.resource import cluster from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest @@ -36,7 +36,7 @@ def __init__(self, test_context): super(CompressionTest, self).__init__(test_context=test_context) self.topic = "test_topic" - self.zk = ZookeeperService(test_context, num_nodes=1) + self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: { "partitions": 10, "replication-factor": 1}}) @@ -48,15 +48,16 @@ def __init__(self, test_context): self.num_consumers = 1 def setUp(self): - self.zk.start() + if self.zk: + self.zk.start() def min_cluster_size(self): # Override this since we're adding services outside of the constructor return super(CompressionTest, self).min_cluster_size() + self.num_producers + self.num_consumers @cluster(num_nodes=8) - @parametrize(compression_types=COMPRESSION_TYPES) - def test_compressed_topic(self, compression_types): + @matrix(compression_types=[COMPRESSION_TYPES], metadata_quorum=quorum.all_non_upgrade) + def test_compressed_topic(self, compression_types, metadata_quorum=quorum.zk): """Test produce => consume => validate for compressed topics Setup: 1 zk, 1 kafka node, 1 topic with partitions=10, replication-factor=1 diff --git a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py index 638a3fc068fbd..5beacf23c6376 100644 --- a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py +++ b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py @@ -13,11 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +from ducktape.mark import matrix from ducktape.mark.resource import cluster from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest -from kafkatest.services.kafka import TopicPartition +from kafkatest.services.kafka import TopicPartition, quorum class ConsumerRollingUpgradeTest(VerifiableConsumerTest): TOPIC = "test_topic" @@ -47,7 +48,8 @@ def _verify_roundrobin_assignment(self, consumer): "Mismatched assignment: %s" % assignment @cluster(num_nodes=4) - def rolling_update_test(self): + @matrix(metadata_quorum=quorum.all_non_upgrade) + def rolling_update_test(self, metadata_quorum=quorum.zk): """ Verify rolling updates of partition assignment strategies works correctly. In this test, we use a rolling restart to change the group's assignment strategy from "range" @@ -70,7 +72,7 @@ def rolling_update_test(self): consumer.start_node(consumer.nodes[0]) self.await_all_members(consumer) self._verify_range_assignment(consumer) - + # now restart the other node and verify that we have switched to round-robin consumer.stop_node(consumer.nodes[1]) consumer.start_node(consumer.nodes[1]) diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py index 4a9e89d8c9ae2..f41748078ca72 100644 --- a/tests/kafkatest/tests/client/consumer_test.py +++ b/tests/kafkatest/tests/client/consumer_test.py @@ -18,7 +18,7 @@ from ducktape.mark.resource import cluster from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest -from kafkatest.services.kafka import TopicPartition +from kafkatest.services.kafka import TopicPartition, quorum import signal @@ -75,7 +75,8 @@ def setup_consumer(self, topic, **kwargs): return consumer @cluster(num_nodes=7) - def test_broker_rolling_bounce(self): + @matrix(metadata_quorum=quorum.all_non_upgrade) + def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk): """ Verify correct consumer behavior when the brokers are consecutively restarted. @@ -117,8 +118,8 @@ def test_broker_rolling_bounce(self): (consumer.total_consumed(), consumer.current_position(partition)) @cluster(num_nodes=7) - @matrix(clean_shutdown=[True], bounce_mode=["all", "rolling"]) - def test_consumer_bounce(self, clean_shutdown, bounce_mode): + @matrix(clean_shutdown=[True], bounce_mode=["all", "rolling"], metadata_quorum=quorum.all_non_upgrade) + def test_consumer_bounce(self, clean_shutdown, bounce_mode, metadata_quorum=quorum.zk): """ Verify correct consumer behavior when the consumers in the group are consecutively restarted. @@ -160,8 +161,8 @@ def test_consumer_bounce(self, clean_shutdown, bounce_mode): (consumer.current_position(partition), consumer.total_consumed()) @cluster(num_nodes=7) - @matrix(clean_shutdown=[True], static_membership=[True, False], bounce_mode=["all", "rolling"], num_bounces=[5]) - def test_static_consumer_bounce(self, clean_shutdown, static_membership, bounce_mode, num_bounces): + @matrix(clean_shutdown=[True], static_membership=[True, False], bounce_mode=["all", "rolling"], num_bounces=[5], metadata_quorum=quorum.all_non_upgrade) + def test_static_consumer_bounce(self, clean_shutdown, static_membership, bounce_mode, num_bounces, metadata_quorum=quorum.zk): """ Verify correct static consumer behavior when the consumers in the group are restarted. In order to make sure the behavior of static members are different from dynamic ones, we take both static and dynamic @@ -222,8 +223,8 @@ def test_static_consumer_bounce(self, clean_shutdown, static_membership, bounce_ (consumer.current_position(partition), consumer.total_consumed()) @cluster(num_nodes=7) - @matrix(bounce_mode=["all", "rolling"]) - def test_static_consumer_persisted_after_rejoin(self, bounce_mode): + @matrix(bounce_mode=["all", "rolling"], metadata_quorum=quorum.all_non_upgrade) + def test_static_consumer_persisted_after_rejoin(self, bounce_mode, metadata_quorum=quorum.zk): """ Verify that the updated member.id(updated_member_id) caused by static member rejoin would be persisted. If not, after the brokers rolling bounce, the migrated group coordinator would load the stale persisted member.id and @@ -253,8 +254,8 @@ def test_static_consumer_persisted_after_rejoin(self, bounce_mode): self.rolling_bounce_brokers(consumer, num_bounces=1) @cluster(num_nodes=10) - @matrix(num_conflict_consumers=[1, 2], fencing_stage=["stable", "all"]) - def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage): + @matrix(num_conflict_consumers=[1, 2], fencing_stage=["stable", "all"], metadata_quorum=quorum.all_non_upgrade) + def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, metadata_quorum=quorum.zk): """ Verify correct static consumer behavior when there are conflicting consumers with same group.instance.id. @@ -306,8 +307,8 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage): ) @cluster(num_nodes=7) - @matrix(clean_shutdown=[True], enable_autocommit=[True, False]) - def test_consumer_failure(self, clean_shutdown, enable_autocommit): + @matrix(clean_shutdown=[True], enable_autocommit=[True, False], metadata_quorum=quorum.all_non_upgrade) + def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.zk): partition = TopicPartition(self.TOPIC, 0) consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit) @@ -353,8 +354,8 @@ def test_consumer_failure(self, clean_shutdown, enable_autocommit): (consumer.last_commit(partition), consumer.current_position(partition)) @cluster(num_nodes=7) - @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False]) - def test_broker_failure(self, clean_shutdown, enable_autocommit): + @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False], metadata_quorum=quorum.all_non_upgrade) + def test_broker_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.zk): partition = TopicPartition(self.TOPIC, 0) consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit) @@ -390,7 +391,8 @@ def test_broker_failure(self, clean_shutdown, enable_autocommit): (consumer.last_commit(partition), consumer.current_position(partition)) @cluster(num_nodes=7) - def test_group_consumption(self): + @matrix(metadata_quorum=quorum.all_non_upgrade) + def test_group_consumption(self, metadata_quorum=quorum.zk): """ Verifies correct group rebalance behavior as consumers are started and stopped. In particular, this test verifies that the partition is readable after every @@ -442,8 +444,8 @@ def __init__(self, test_context): @cluster(num_nodes=6) @matrix(assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor", "org.apache.kafka.clients.consumer.RoundRobinAssignor", - "org.apache.kafka.clients.consumer.StickyAssignor"]) - def test_valid_assignment(self, assignment_strategy): + "org.apache.kafka.clients.consumer.StickyAssignor"], metadata_quorum=quorum.all_non_upgrade) + def test_valid_assignment(self, assignment_strategy, metadata_quorum=quorum.zk): """ Verify assignment strategy correctness: each partition is assigned to exactly one consumer instance. diff --git a/tests/kafkatest/tests/client/message_format_change_test.py b/tests/kafkatest/tests/client/message_format_change_test.py index 1388330c6a00d..41e0f95fe8a66 100644 --- a/tests/kafkatest/tests/client/message_format_change_test.py +++ b/tests/kafkatest/tests/client/message_format_change_test.py @@ -12,12 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.mark import parametrize +from ducktape.mark import matrix from ducktape.utils.util import wait_until from ducktape.mark.resource import cluster from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest @@ -32,9 +32,10 @@ def __init__(self, test_context): def setUp(self): self.topic = "test_topic" - self.zk = ZookeeperService(self.test_context, num_nodes=1) - - self.zk.start() + self.zk = ZookeeperService(self.test_context, num_nodes=1) if quorum.for_test(self.test_context) == quorum.zk else None + + if self.zk: + self.zk.start() # Producer and consumer self.producer_throughput = 10000 @@ -58,10 +59,10 @@ def produce_and_consume(self, producer_version, consumer_version, group): err_msg="Producer did not produce all messages in reasonable amount of time")) @cluster(num_nodes=12) - @parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH)) - @parametrize(producer_version=str(LATEST_0_10), consumer_version=str(LATEST_0_10)) - @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9)) - def test_compatibility(self, producer_version, consumer_version): + @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(DEV_BRANCH)], metadata_quorum=quorum.all_non_upgrade) + @matrix(producer_version=[str(LATEST_0_10)], consumer_version=[str(LATEST_0_10)], metadata_quorum=quorum.all_non_upgrade) + @matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(LATEST_0_9)], metadata_quorum=quorum.all_non_upgrade) + def test_compatibility(self, producer_version, consumer_version, metadata_quorum=quorum.zk): """ This tests performs the following checks: The workload is a mix of 0.9.x, 0.10.x and 0.11.x producers and consumers that produce to and consume from a DEV_BRANCH cluster @@ -81,8 +82,9 @@ def test_compatibility(self, producer_version, consumer_version): self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=DEV_BRANCH, topics={self.topic: { "partitions": 3, "replication-factor": 3, - 'configs': {"min.insync.replicas": 2}}}) - + 'configs': {"min.insync.replicas": 2}}}, + controller_num_nodes_override=1) + self.kafka.start() self.logger.info("First format change to 0.9.0") self.kafka.alter_message_format(self.topic, str(LATEST_0_9)) diff --git a/tests/kafkatest/tests/client/pluggable_test.py b/tests/kafkatest/tests/client/pluggable_test.py index a2599d8b5572a..36b9172f18351 100644 --- a/tests/kafkatest/tests/client/pluggable_test.py +++ b/tests/kafkatest/tests/client/pluggable_test.py @@ -13,8 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +from ducktape.mark import matrix +from ducktape.mark.resource import cluster from ducktape.utils.util import wait_until +from kafkatest.services.kafka import quorum from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest class PluggableConsumerTest(VerifiableConsumerTest): @@ -29,7 +32,9 @@ def __init__(self, test_context): self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 }, }) - def test_start_stop(self): + @cluster(num_nodes=4) + @matrix(metadata_quorum=quorum.all_non_upgrade) + def test_start_stop(self, metadata_quorum=quorum.zk): """ Test that a pluggable VerifiableConsumer module load works """ diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py index 580d9f376c627..1a7f6abfeb8b7 100644 --- a/tests/kafkatest/tests/connect/connect_test.py +++ b/tests/kafkatest/tests/connect/connect_test.py @@ -16,12 +16,12 @@ from ducktape.tests.test import Test from ducktape.mark.resource import cluster from ducktape.utils.util import wait_until -from ducktape.mark import parametrize +from ducktape.mark import matrix, parametrize from ducktape.cluster.remoteaccount import RemoteCommandError from ducktape.errors import TimeoutError from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.connect import ConnectServiceBase, ConnectStandaloneService, ErrorTolerance from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.security.security_config import SecurityConfig @@ -63,7 +63,7 @@ def __init__(self, test_context): 'test' : { 'partitions': 1, 'replication-factor': 1 } } - self.zk = ZookeeperService(test_context, self.num_zk) + self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None @cluster(num_nodes=5) @parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=True) @@ -71,8 +71,9 @@ def __init__(self, test_context): @parametrize(converter="org.apache.kafka.connect.storage.StringConverter", schemas=None) @parametrize(security_protocol=SecurityConfig.PLAINTEXT) @cluster(num_nodes=6) - @parametrize(security_protocol=SecurityConfig.SASL_SSL) - def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True, security_protocol='PLAINTEXT'): + @matrix(security_protocol=[SecurityConfig.SASL_SSL], metadata_quorum=quorum.all_non_upgrade) + def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True, security_protocol='PLAINTEXT', + metadata_quorum=quorum.zk): """ Validates basic end-to-end functionality of Connect standalone using the file source and sink converters. Includes parameterizations to test different converters (which also test per-connector converter overrides), schema/schemaless @@ -88,14 +89,15 @@ def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.Jso self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk, security_protocol=security_protocol, interbroker_security_protocol=security_protocol, - topics=self.topics) + topics=self.topics, controller_num_nodes_override=self.num_zk) self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE]) self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE]) self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC_TEST, consumer_timeout_ms=10000) - self.zk.start() + if self.zk: + self.zk.start() self.kafka.start() self.source.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-source.properties")]) diff --git a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py index 5474112cae3da..db8aa1d0743ca 100644 --- a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py +++ b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py @@ -12,12 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.mark import parametrize +from ducktape.mark import matrix, parametrize from ducktape.utils.util import wait_until from ducktape.mark.resource import cluster from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.kafka import config_property from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService @@ -33,9 +33,10 @@ def __init__(self, test_context): def setUp(self): self.topic = "test_topic" - self.zk = ZookeeperService(self.test_context, num_nodes=1) - - self.zk.start() + self.zk = ZookeeperService(self.test_context, num_nodes=1) if quorum.for_test(self.test_context) == quorum.zk else None + + if self.zk: + self.zk.start() # Producer and consumer self.producer_throughput = 10000 @@ -44,39 +45,41 @@ def setUp(self): self.messages_per_producer = 1000 @cluster(num_nodes=6) - @parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH), compression_types=["snappy"], timestamp_type=str("LogAppendTime")) - @parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH), compression_types=["none"], timestamp_type=str("LogAppendTime")) + @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(DEV_BRANCH)], compression_types=[["snappy"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade) + @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(DEV_BRANCH)], compression_types=[["none"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade) @parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None) - @parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("CreateTime")) - @parametrize(producer_version=str(LATEST_2_2), consumer_version=str(LATEST_2_2), compression_types=["none"], timestamp_type=str("CreateTime")) - @parametrize(producer_version=str(LATEST_2_3), consumer_version=str(LATEST_2_3), compression_types=["none"], timestamp_type=str("CreateTime")) - @parametrize(producer_version=str(LATEST_2_4), consumer_version=str(LATEST_2_4), compression_types=["none"], timestamp_type=str("CreateTime")) - @parametrize(producer_version=str(LATEST_2_5), consumer_version=str(LATEST_2_5), compression_types=["none"], timestamp_type=str("CreateTime")) - @parametrize(producer_version=str(LATEST_2_6), consumer_version=str(LATEST_2_6), compression_types=["none"], timestamp_type=str("CreateTime")) - @parametrize(producer_version=str(LATEST_2_7), consumer_version=str(LATEST_2_7), compression_types=["none"], timestamp_type=str("CreateTime")) - @parametrize(producer_version=str(LATEST_2_1), consumer_version=str(LATEST_2_1), compression_types=["zstd"], timestamp_type=str("CreateTime")) - @parametrize(producer_version=str(LATEST_2_0), consumer_version=str(LATEST_2_0), compression_types=["snappy"], timestamp_type=str("CreateTime")) - @parametrize(producer_version=str(LATEST_1_1), consumer_version=str(LATEST_1_1), compression_types=["lz4"], timestamp_type=str("CreateTime")) - @parametrize(producer_version=str(LATEST_1_0), consumer_version=str(LATEST_1_0), compression_types=["none"], timestamp_type=str("CreateTime")) - @parametrize(producer_version=str(LATEST_0_11_0), consumer_version=str(LATEST_0_11_0), compression_types=["gzip"], timestamp_type=str("CreateTime")) - @parametrize(producer_version=str(LATEST_0_10_2), consumer_version=str(LATEST_0_10_2), compression_types=["lz4"], timestamp_type=str("CreateTime")) - @parametrize(producer_version=str(LATEST_0_10_1), consumer_version=str(LATEST_0_10_1), compression_types=["snappy"], timestamp_type=str("LogAppendTime")) - @parametrize(producer_version=str(LATEST_0_10_0), consumer_version=str(LATEST_0_10_0), compression_types=["snappy"], timestamp_type=str("LogAppendTime")) - @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(DEV_BRANCH), compression_types=["none"], timestamp_type=None) - @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(DEV_BRANCH), compression_types=["snappy"], timestamp_type=None) - @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("LogAppendTime")) + @matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(LATEST_0_9)], compression_types=[["snappy"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) + @matrix(producer_version=[str(LATEST_2_2)], consumer_version=[str(LATEST_2_2)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) + @matrix(producer_version=[str(LATEST_2_3)], consumer_version=[str(LATEST_2_3)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) + @matrix(producer_version=[str(LATEST_2_4)], consumer_version=[str(LATEST_2_4)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) + @matrix(producer_version=[str(LATEST_2_5)], consumer_version=[str(LATEST_2_5)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) + @matrix(producer_version=[str(LATEST_2_6)], consumer_version=[str(LATEST_2_6)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) + @matrix(producer_version=[str(LATEST_2_7)], consumer_version=[str(LATEST_2_7)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) + @matrix(producer_version=[str(LATEST_2_1)], consumer_version=[str(LATEST_2_1)], compression_types=[["zstd"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) + @matrix(producer_version=[str(LATEST_2_0)], consumer_version=[str(LATEST_2_0)], compression_types=[["snappy"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) + @matrix(producer_version=[str(LATEST_1_1)], consumer_version=[str(LATEST_1_1)], compression_types=[["lz4"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) + @matrix(producer_version=[str(LATEST_1_0)], consumer_version=[str(LATEST_1_0)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) + @matrix(producer_version=[str(LATEST_0_11_0)], consumer_version=[str(LATEST_0_11_0)], compression_types=[["gzip"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) + @matrix(producer_version=[str(LATEST_0_10_2)], consumer_version=[str(LATEST_0_10_2)], compression_types=[["lz4"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) + @matrix(producer_version=[str(LATEST_0_10_1)], consumer_version=[str(LATEST_0_10_1)], compression_types=[["snappy"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade) + @matrix(producer_version=[str(LATEST_0_10_0)], consumer_version=[str(LATEST_0_10_0)], compression_types=[["snappy"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade) + @matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(DEV_BRANCH)], compression_types=[["none"]], timestamp_type=[None], metadata_quorum=quorum.all_non_upgrade) + @matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(DEV_BRANCH)], compression_types=[["snappy"]], timestamp_type=[None], metadata_quorum=quorum.all_non_upgrade) + @matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(LATEST_0_9)], compression_types=[["snappy"]], timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade) @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None) - def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None): - + def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None, metadata_quorum=quorum.zk): + if not new_consumer and metadata_quorum != quorum.zk: + raise Exception("ZooKeeper-based consumers are not supported when using a Raft-based metadata quorum") self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=DEV_BRANCH, topics={self.topic: { "partitions": 3, "replication-factor": 3, - 'configs': {"min.insync.replicas": 2}}}) + 'configs': {"min.insync.replicas": 2}}}, + controller_num_nodes_override=1) for node in self.kafka.nodes: if timestamp_type is not None: node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type self.kafka.start() - + self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput, message_validator=is_int, diff --git a/tests/kafkatest/tests/core/consume_bench_test.py b/tests/kafkatest/tests/core/consume_bench_test.py index e731270646d4a..ce08d80832ab8 100644 --- a/tests/kafkatest/tests/core/consume_bench_test.py +++ b/tests/kafkatest/tests/core/consume_bench_test.py @@ -14,9 +14,10 @@ # limitations under the License. import json -from ducktape.mark import parametrize +from ducktape.mark import matrix +from ducktape.mark.resource import cluster from ducktape.tests.test import Test -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.trogdor.produce_bench_workload import ProduceBenchWorkloadService, ProduceBenchWorkloadSpec from kafkatest.services.trogdor.consume_bench_workload import ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec from kafkatest.services.trogdor.task_spec import TaskSpec @@ -28,7 +29,7 @@ class ConsumeBenchTest(Test): def __init__(self, test_context): """:type test_context: ducktape.tests.test.TestContext""" super(ConsumeBenchTest, self).__init__(test_context) - self.zk = ZookeeperService(test_context, num_nodes=3) + self.zk = ZookeeperService(test_context, num_nodes=3) if quorum.for_test(test_context) == quorum.zk else None self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk) self.producer_workload_service = ProduceBenchWorkloadService(test_context, self.kafka) self.consumer_workload_service = ConsumeBenchWorkloadService(test_context, self.kafka) @@ -41,13 +42,15 @@ def __init__(self, test_context): def setUp(self): self.trogdor.start() - self.zk.start() + if self.zk: + self.zk.start() self.kafka.start() def teardown(self): self.trogdor.stop() self.kafka.stop() - self.zk.stop() + if self.zk: + self.zk.stop() def produce_messages(self, topics, max_messages=10000): produce_spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS, @@ -64,9 +67,10 @@ def produce_messages(self, topics, max_messages=10000): produce_workload.wait_for_done(timeout_sec=180) self.logger.debug("Produce workload finished") - @parametrize(topics=["consume_bench_topic[0-5]"]) # topic subscription - @parametrize(topics=["consume_bench_topic[0-5]:[0-4]"]) # manual topic assignment - def test_consume_bench(self, topics): + @cluster(num_nodes=10) + @matrix(topics=[["consume_bench_topic[0-5]"]], metadata_quorum=quorum.all_non_upgrade) # topic subscription + @matrix(topics=[["consume_bench_topic[0-5]:[0-4]"]], metadata_quorum=quorum.all_non_upgrade) # manual topic assignment + def test_consume_bench(self, topics, metadata_quorum=quorum.zk): """ Runs a ConsumeBench workload to consume messages """ @@ -86,7 +90,9 @@ def test_consume_bench(self, topics): tasks = self.trogdor.tasks() self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) - def test_single_partition(self): + @cluster(num_nodes=10) + @matrix(metadata_quorum=quorum.all_non_upgrade) + def test_single_partition(self, metadata_quorum=quorum.zk): """ Run a ConsumeBench against a single partition """ @@ -107,7 +113,9 @@ def test_single_partition(self): tasks = self.trogdor.tasks() self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) - def test_multiple_consumers_random_group_topics(self): + @cluster(num_nodes=10) + @matrix(metadata_quorum=quorum.all_non_upgrade) + def test_multiple_consumers_random_group_topics(self, metadata_quorum=quorum.zk): """ Runs multiple consumers group to read messages from topics. Since a consumerGroup isn't specified, each consumer should read from all topics independently @@ -129,7 +137,9 @@ def test_multiple_consumers_random_group_topics(self): tasks = self.trogdor.tasks() self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) - def test_two_consumers_specified_group_topics(self): + @cluster(num_nodes=10) + @matrix(metadata_quorum=quorum.all_non_upgrade) + def test_two_consumers_specified_group_topics(self, metadata_quorum=quorum.zk): """ Runs two consumers in the same consumer group to read messages from topics. Since a consumerGroup is specified, each consumer should dynamically get assigned a partition from group @@ -152,7 +162,9 @@ def test_two_consumers_specified_group_topics(self): tasks = self.trogdor.tasks() self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) - def test_multiple_consumers_random_group_partitions(self): + @cluster(num_nodes=10) + @matrix(metadata_quorum=quorum.all_non_upgrade) + def test_multiple_consumers_random_group_partitions(self, metadata_quorum=quorum.zk): """ Runs multiple consumers in to read messages from specific partitions. Since a consumerGroup isn't specified, each consumer will get assigned a random group @@ -175,7 +187,9 @@ def test_multiple_consumers_random_group_partitions(self): tasks = self.trogdor.tasks() self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) - def test_multiple_consumers_specified_group_partitions_should_raise(self): + @cluster(num_nodes=10) + @matrix(metadata_quorum=quorum.all_non_upgrade) + def test_multiple_consumers_specified_group_partitions_should_raise(self, metadata_quorum=quorum.zk): """ Runs multiple consumers in the same group to read messages from specific partitions. It is an invalid configuration to provide a consumer group and specific partitions. diff --git a/tests/kafkatest/tests/core/consumer_group_command_test.py b/tests/kafkatest/tests/core/consumer_group_command_test.py index 871e2761ade25..f81eec8de8cf8 100644 --- a/tests/kafkatest/tests/core/consumer_group_command_test.py +++ b/tests/kafkatest/tests/core/consumer_group_command_test.py @@ -20,7 +20,7 @@ from ducktape.mark.resource import cluster from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.security.security_config import SecurityConfig @@ -45,16 +45,18 @@ def __init__(self, test_context): self.topics = { TOPIC: {'partitions': 1, 'replication-factor': 1} } - self.zk = ZookeeperService(test_context, self.num_zk) + self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None def setUp(self): - self.zk.start() + if self.zk: + self.zk.start() def start_kafka(self, security_protocol, interbroker_security_protocol): self.kafka = KafkaService( self.test_context, self.num_brokers, self.zk, security_protocol=security_protocol, - interbroker_security_protocol=interbroker_security_protocol, topics=self.topics) + interbroker_security_protocol=interbroker_security_protocol, topics=self.topics, + controller_num_nodes_override=self.num_zk) self.kafka.start() def start_consumer(self): @@ -88,8 +90,8 @@ def setup_and_verify(self, security_protocol, group=None): self.consumer.stop() @cluster(num_nodes=3) - @matrix(security_protocol=['PLAINTEXT', 'SSL']) - def test_list_consumer_groups(self, security_protocol='PLAINTEXT'): + @matrix(security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all_non_upgrade) + def test_list_consumer_groups(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk): """ Tests if ConsumerGroupCommand is listing correct consumer groups :return: None @@ -97,8 +99,8 @@ def test_list_consumer_groups(self, security_protocol='PLAINTEXT'): self.setup_and_verify(security_protocol) @cluster(num_nodes=3) - @matrix(security_protocol=['PLAINTEXT', 'SSL']) - def test_describe_consumer_group(self, security_protocol='PLAINTEXT'): + @matrix(security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all_non_upgrade) + def test_describe_consumer_group(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk): """ Tests if ConsumerGroupCommand is describing a consumer group correctly :return: None diff --git a/tests/kafkatest/tests/core/delegation_token_test.py b/tests/kafkatest/tests/core/delegation_token_test.py index feb593522e07c..5fe8d126210ba 100644 --- a/tests/kafkatest/tests/core/delegation_token_test.py +++ b/tests/kafkatest/tests/core/delegation_token_test.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from ducktape.mark.resource import cluster from ducktape.tests.test import Test from ducktape.utils.util import wait_until from kafkatest.services.kafka import config_property, KafkaService @@ -109,6 +110,7 @@ def renew_delegation_token(self): self.delegation_tokens.renew_delegation_token(dt["hmac"], new_expirydate_ms) + @cluster(num_nodes=5) def test_delegation_token_lifecycle(self): self.kafka.start() self.delegation_tokens = DelegationTokens(self.kafka, self.test_context) diff --git a/tests/kafkatest/tests/core/downgrade_test.py b/tests/kafkatest/tests/core/downgrade_test.py index beb103a7d82db..489ae7c2b6efa 100644 --- a/tests/kafkatest/tests/core/downgrade_test.py +++ b/tests/kafkatest/tests/core/downgrade_test.py @@ -53,7 +53,7 @@ def downgrade_to(self, kafka_version): self.wait_until_rejoin() def setup_services(self, kafka_version, compression_types, security_protocol, static_membership): - self.create_zookeeper() + self.create_zookeeper_if_necessary() self.zk.start() self.create_kafka(num_nodes=3, diff --git a/tests/kafkatest/tests/core/fetch_from_follower_test.py b/tests/kafkatest/tests/core/fetch_from_follower_test.py index ef3772880c82a..fab5cfa6269ae 100644 --- a/tests/kafkatest/tests/core/fetch_from_follower_test.py +++ b/tests/kafkatest/tests/core/fetch_from_follower_test.py @@ -16,10 +16,11 @@ import time from collections import defaultdict +from ducktape.mark import matrix from ducktape.mark.resource import cluster from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.monitor.jmx import JmxTool from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService @@ -36,7 +37,7 @@ def __init__(self, test_context): super(FetchFromFollowerTest, self).__init__(test_context=test_context) self.jmx_tool = JmxTool(test_context, jmx_poll_ms=100) self.topic = "test_topic" - self.zk = ZookeeperService(test_context, num_nodes=1) + self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, @@ -53,7 +54,8 @@ def __init__(self, test_context): 1: [("broker.rack", "rack-a")], 2: [("broker.rack", "rack-b")], 3: [("broker.rack", "rack-c")] - }) + }, + controller_num_nodes_override=1) self.producer_throughput = 1000 self.num_producers = 1 @@ -63,11 +65,13 @@ def min_cluster_size(self): return super(FetchFromFollowerTest, self).min_cluster_size() + self.num_producers * 2 + self.num_consumers * 2 def setUp(self): - self.zk.start() + if self.zk: + self.zk.start() self.kafka.start() @cluster(num_nodes=9) - def test_consumer_preferred_read_replica(self): + @matrix(metadata_quorum=quorum.all_non_upgrade) + def test_consumer_preferred_read_replica(self, metadata_quorum=quorum.zk): """ This test starts up brokers with "broker.rack" and "replica.selector.class" configurations set. The replica selector is set to the rack-aware implementation. One of the brokers has a different rack than the other two. diff --git a/tests/kafkatest/tests/core/get_offset_shell_test.py b/tests/kafkatest/tests/core/get_offset_shell_test.py index 82291d7db48c6..b85a0a81325af 100644 --- a/tests/kafkatest/tests/core/get_offset_shell_test.py +++ b/tests/kafkatest/tests/core/get_offset_shell_test.py @@ -16,11 +16,12 @@ from ducktape.utils.util import wait_until from ducktape.tests.test import Test +from ducktape.mark import matrix from ducktape.mark.resource import cluster from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.console_consumer import ConsoleConsumer TOPIC = "topic-get-offset-shell" @@ -42,11 +43,11 @@ def __init__(self, test_context): TOPIC: {'partitions': NUM_PARTITIONS, 'replication-factor': REPLICATION_FACTOR} } - self.zk = ZookeeperService(test_context, self.num_zk) - + self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None def setUp(self): - self.zk.start() + if self.zk: + self.zk.start() def start_kafka(self, security_protocol, interbroker_security_protocol): self.kafka = KafkaService( @@ -69,7 +70,8 @@ def start_consumer(self): self.consumer.start() @cluster(num_nodes=4) - def test_get_offset_shell(self, security_protocol='PLAINTEXT'): + @matrix(metadata_quorum=quorum.all_non_upgrade) + def test_get_offset_shell(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk): """ Tests if GetOffsetShell is getting offsets correctly :return: None diff --git a/tests/kafkatest/tests/core/group_mode_transactions_test.py b/tests/kafkatest/tests/core/group_mode_transactions_test.py index 141c613730d72..e9638d50ef4f1 100644 --- a/tests/kafkatest/tests/core/group_mode_transactions_test.py +++ b/tests/kafkatest/tests/core/group_mode_transactions_test.py @@ -14,7 +14,7 @@ # limitations under the License. from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.transactional_message_copier import TransactionalMessageCopier @@ -25,6 +25,7 @@ from ducktape.mark.resource import cluster from ducktape.utils.util import wait_until +import time class GroupModeTransactionsTest(Test): """Essentially testing the same functionality as TransactionsTest by transactionally copying data @@ -60,13 +61,14 @@ def __init__(self, test_context): self.progress_timeout_sec = 60 self.consumer_group = "grouped-transactions-test-consumer-group" - self.zk = ZookeeperService(test_context, num_nodes=1) + self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None self.kafka = KafkaService(test_context, num_nodes=self.num_brokers, - zk=self.zk) + zk=self.zk, controller_num_nodes_override=1) def setUp(self): - self.zk.start() + if self.zk: + self.zk.start() def seed_messages(self, topic, num_seed_messages): seed_timeout_sec = 10000 @@ -95,10 +97,17 @@ def bounce_brokers(self, clean_shutdown): self.kafka.restart_node(node, clean_shutdown = True) else: self.kafka.stop_node(node, clean_shutdown = False) - wait_until(lambda: len(self.kafka.pids(node)) == 0 and not self.kafka.is_registered(node), - timeout_sec=self.kafka.zk_session_timeout + 5, - err_msg="Failed to see timely deregistration of \ - hard-killed broker %s" % str(node.account)) + gracePeriodSecs = 5 + if self.zk: + wait_until(lambda: len(self.kafka.pids(node)) == 0 and not self.kafka.is_registered(node), + timeout_sec=self.kafka.zk_session_timeout + gracePeriodSecs, + err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(node.account)) + else: + brokerSessionTimeoutSecs = 18 + wait_until(lambda: len(self.kafka.pids(node)) == 0, + timeout_sec=brokerSessionTimeoutSecs + gracePeriodSecs, + err_msg="Failed to see timely disappearance of process for hard-killed broker %s" % str(node.account)) + time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs) self.kafka.start_node(node) def create_and_start_message_copier(self, input_topic, output_topic, transactional_id): @@ -260,8 +269,8 @@ def setup_topics(self): @cluster(num_nodes=10) @matrix(failure_mode=["hard_bounce", "clean_bounce"], - bounce_target=["brokers", "clients"]) - def test_transactions(self, failure_mode, bounce_target): + bounce_target=["brokers", "clients"], metadata_quorum=quorum.all_non_upgrade) + def test_transactions(self, failure_mode, bounce_target, metadata_quorum=quorum.zk): security_protocol = 'PLAINTEXT' self.kafka.security_protocol = security_protocol self.kafka.interbroker_security_protocol = security_protocol diff --git a/tests/kafkatest/tests/core/produce_bench_test.py b/tests/kafkatest/tests/core/produce_bench_test.py index a316520335baf..734dfb580b8df 100644 --- a/tests/kafkatest/tests/core/produce_bench_test.py +++ b/tests/kafkatest/tests/core/produce_bench_test.py @@ -14,19 +14,20 @@ # limitations under the License. import json +from ducktape.mark import matrix +from ducktape.mark.resource import cluster from ducktape.tests.test import Test -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.trogdor.produce_bench_workload import ProduceBenchWorkloadService, ProduceBenchWorkloadSpec from kafkatest.services.trogdor.task_spec import TaskSpec from kafkatest.services.trogdor.trogdor import TrogdorService from kafkatest.services.zookeeper import ZookeeperService - class ProduceBenchTest(Test): def __init__(self, test_context): """:type test_context: ducktape.tests.test.TestContext""" super(ProduceBenchTest, self).__init__(test_context) - self.zk = ZookeeperService(test_context, num_nodes=3) + self.zk = ZookeeperService(test_context, num_nodes=3) if quorum.for_test(test_context) == quorum.zk else None self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk) self.workload_service = ProduceBenchWorkloadService(test_context, self.kafka) self.trogdor = TrogdorService(context=self.test_context, @@ -36,15 +37,19 @@ def __init__(self, test_context): def setUp(self): self.trogdor.start() - self.zk.start() + if self.zk: + self.zk.start() self.kafka.start() def teardown(self): self.trogdor.stop() self.kafka.stop() - self.zk.stop() + if self.zk: + self.zk.stop() - def test_produce_bench(self): + @cluster(num_nodes=8) + @matrix(metadata_quorum=quorum.all_non_upgrade) + def test_produce_bench(self, metadata_quorum=quorum.zk): spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS, self.workload_service.producer_node, self.workload_service.bootstrap_servers, @@ -60,7 +65,9 @@ def test_produce_bench(self): tasks = self.trogdor.tasks() self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) - def test_produce_bench_transactions(self): + @cluster(num_nodes=8) + @matrix(metadata_quorum=quorum.all_non_upgrade) + def test_produce_bench_transactions(self, metadata_quorum=quorum.zk): spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS, self.workload_service.producer_node, self.workload_service.bootstrap_servers, diff --git a/tests/kafkatest/tests/core/replica_scale_test.py b/tests/kafkatest/tests/core/replica_scale_test.py index 8541d398d390d..f8d0c8379bdf7 100644 --- a/tests/kafkatest/tests/core/replica_scale_test.py +++ b/tests/kafkatest/tests/core/replica_scale_test.py @@ -14,13 +14,13 @@ # limitations under the License. from ducktape.mark.resource import cluster -from ducktape.mark import parametrize +from ducktape.mark import matrix from ducktape.tests.test import Test from kafkatest.services.trogdor.produce_bench_workload import ProduceBenchWorkloadService, ProduceBenchWorkloadSpec from kafkatest.services.trogdor.consume_bench_workload import ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec from kafkatest.services.trogdor.task_spec import TaskSpec -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.trogdor.trogdor import TrogdorService from kafkatest.services.zookeeper import ZookeeperService @@ -31,11 +31,12 @@ class ReplicaScaleTest(Test): def __init__(self, test_context): super(ReplicaScaleTest, self).__init__(test_context=test_context) self.test_context = test_context - self.zk = ZookeeperService(test_context, num_nodes=1) - self.kafka = KafkaService(self.test_context, num_nodes=8, zk=self.zk) + self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None + self.kafka = KafkaService(self.test_context, num_nodes=8, zk=self.zk, controller_num_nodes_override=1) def setUp(self): - self.zk.start() + if self.zk: + self.zk.start() self.kafka.start() def teardown(self): @@ -43,11 +44,12 @@ def teardown(self): for node in self.kafka.nodes: self.kafka.stop_node(node, clean_shutdown=False, timeout_sec=60) self.kafka.stop() - self.zk.stop() + if self.zk: + self.zk.stop() @cluster(num_nodes=12) - @parametrize(topic_count=50, partition_count=34, replication_factor=3) - def test_produce_consume(self, topic_count, partition_count, replication_factor): + @matrix(topic_count=[50], partition_count=[34], replication_factor=[3], metadata_quorum=quorum.all_non_upgrade) + def test_produce_consume(self, topic_count, partition_count, replication_factor, metadata_quorum=quorum.zk): topics_create_start_time = time.time() for i in range(topic_count): topic = "replicas_produce_consume_%d" % i @@ -101,8 +103,8 @@ def test_produce_consume(self, topic_count, partition_count, replication_factor) trogdor.stop() @cluster(num_nodes=12) - @parametrize(topic_count=50, partition_count=34, replication_factor=3) - def test_clean_bounce(self, topic_count, partition_count, replication_factor): + @matrix(topic_count=[50], partition_count=[34], replication_factor=[3], metadata_quorum=quorum.all_non_upgrade) + def test_clean_bounce(self, topic_count, partition_count, replication_factor, metadata_quorum=quorum.zk): topics_create_start_time = time.time() for i in range(topic_count): topic = "topic-%04d" % i diff --git a/tests/kafkatest/tests/core/replication_test.py b/tests/kafkatest/tests/core/replication_test.py index 01ef34f318390..a0c01567d4e9b 100644 --- a/tests/kafkatest/tests/core/replication_test.py +++ b/tests/kafkatest/tests/core/replication_test.py @@ -19,9 +19,11 @@ from ducktape.mark import parametrize from ducktape.mark.resource import cluster +from kafkatest.services.kafka import quorum from kafkatest.tests.end_to_end import EndToEndTest import signal +import time def broker_node(test, broker_type): """ Discover node of requested type. For leader type, discovers leader for our topic and partition 0 @@ -63,10 +65,19 @@ def hard_bounce(test, broker_type): # Since this is a hard kill, we need to make sure the process is down and that # zookeeper has registered the loss by expiring the broker's session timeout. - - wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0 and not test.kafka.is_registered(prev_broker_node), - timeout_sec=test.kafka.zk_session_timeout + 5, - err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(prev_broker_node.account)) + # Or, for a Raft-based quorum, we simply wait at least 18 seconds (the default for broker.session.timeout.ms) + + gracePeriodSecs = 5 + if test.zk: + wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0 and not test.kafka.is_registered(prev_broker_node), + timeout_sec=test.kafka.zk_session_timeout + gracePeriodSecs, + err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(prev_broker_node.account)) + else: + brokerSessionTimeoutSecs = 18 + wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0, + timeout_sec=brokerSessionTimeoutSecs + gracePeriodSecs, + err_msg="Failed to see timely disappearance of process for hard-killed broker %s" % str(prev_broker_node.account)) + time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs) test.kafka.start_node(prev_broker_node) @@ -98,11 +109,11 @@ class ReplicationTest(EndToEndTest): "replication-factor": 3, "configs": {"min.insync.replicas": 2} } - + def __init__(self, test_context): """:type test_context: ducktape.tests.test.TestContext""" super(ReplicationTest, self).__init__(test_context=test_context, topic_config=self.TOPIC_CONFIG) - + def min_cluster_size(self): """Override this since we're adding services outside of the constructor""" return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers @@ -111,29 +122,34 @@ def min_cluster_size(self): @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], broker_type=["leader"], security_protocol=["PLAINTEXT"], - enable_idempotence=[True]) + enable_idempotence=[True], + metadata_quorum=quorum.all_non_upgrade) @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], broker_type=["leader"], - security_protocol=["PLAINTEXT", "SASL_SSL"]) + security_protocol=["PLAINTEXT", "SASL_SSL"], + metadata_quorum=quorum.all_non_upgrade) @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], broker_type=["controller"], security_protocol=["PLAINTEXT", "SASL_SSL"]) @matrix(failure_mode=["hard_bounce"], broker_type=["leader"], - security_protocol=["SASL_SSL"], client_sasl_mechanism=["PLAIN"], interbroker_sasl_mechanism=["PLAIN", "GSSAPI"]) + security_protocol=["SASL_SSL"], client_sasl_mechanism=["PLAIN"], interbroker_sasl_mechanism=["PLAIN", "GSSAPI"], + metadata_quorum=quorum.all_non_upgrade) @parametrize(failure_mode="hard_bounce", broker_type="leader", security_protocol="SASL_SSL", client_sasl_mechanism="SCRAM-SHA-256", interbroker_sasl_mechanism="SCRAM-SHA-512") @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], - security_protocol=["PLAINTEXT"], broker_type=["leader"], compression_type=["gzip"], tls_version=["TLSv1.2", "TLSv1.3"]) + security_protocol=["PLAINTEXT"], broker_type=["leader"], compression_type=["gzip"], tls_version=["TLSv1.2", "TLSv1.3"], + metadata_quorum=quorum.all_non_upgrade) def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type, client_sasl_mechanism="GSSAPI", interbroker_sasl_mechanism="GSSAPI", - compression_type=None, enable_idempotence=False, tls_version=None): + compression_type=None, enable_idempotence=False, tls_version=None, + metadata_quorum=quorum.zk): """Replication tests. These tests verify that replication provides simple durability guarantees by checking that data acked by brokers is still available for consumption in the face of various failure scenarios. - Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2 + Setup: 1 zk/Raft-based controller, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2 - Produce messages in the background - Consume messages in the background @@ -142,15 +158,19 @@ def test_replication_with_broker_failure(self, failure_mode, security_protocol, - Validate that every acked message was consumed """ - self.create_zookeeper() - self.zk.start() + if failure_mode == "controller" and metadata_quorum != quorum.zk: + raise Exception("There is no controller broker when using a Raft-based metadata quorum") + self.create_zookeeper_if_necessary() + if self.zk: + self.zk.start() self.create_kafka(num_nodes=3, security_protocol=security_protocol, interbroker_security_protocol=security_protocol, client_sasl_mechanism=client_sasl_mechanism, interbroker_sasl_mechanism=interbroker_sasl_mechanism, - tls_version=tls_version) + tls_version=tls_version, + controller_num_nodes_override = 1) self.kafka.start() compression_types = None if not compression_type else [compression_type] diff --git a/tests/kafkatest/tests/core/round_trip_fault_test.py b/tests/kafkatest/tests/core/round_trip_fault_test.py index a0ce5aef54a3c..b9085cb8b5b71 100644 --- a/tests/kafkatest/tests/core/round_trip_fault_test.py +++ b/tests/kafkatest/tests/core/round_trip_fault_test.py @@ -14,10 +14,12 @@ # limitations under the License. import time +from ducktape.mark import matrix +from ducktape.mark.resource import cluster from ducktape.tests.test import Test from kafkatest.services.trogdor.network_partition_fault_spec import NetworkPartitionFaultSpec from kafkatest.services.trogdor.degraded_network_fault_spec import DegradedNetworkFaultSpec -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.trogdor.process_stop_fault_spec import ProcessStopFaultSpec from kafkatest.services.trogdor.round_trip_workload import RoundTripWorkloadService, RoundTripWorkloadSpec from kafkatest.services.trogdor.task_spec import TaskSpec @@ -31,11 +33,17 @@ class RoundTripFaultTest(Test): def __init__(self, test_context): """:type test_context: ducktape.tests.test.TestContext""" super(RoundTripFaultTest, self).__init__(test_context) - self.zk = ZookeeperService(test_context, num_nodes=3) + self.zk = ZookeeperService(test_context, num_nodes=3) if quorum.for_test(test_context) == quorum.zk else None self.kafka = KafkaService(test_context, num_nodes=4, zk=self.zk) self.workload_service = RoundTripWorkloadService(test_context, self.kafka) + if quorum.for_test(test_context) == quorum.zk: + trogdor_client_services = [self.zk, self.kafka, self.workload_service] + elif quorum.for_test(test_context) == quorum.remote_raft: + trogdor_client_services = [self.kafka.controller_quorum, self.kafka, self.workload_service] + else: #co-located case, which we currently don't test but handle here for completeness in case we do test it + trogdor_client_services = [self.kafka, self.workload_service] self.trogdor = TrogdorService(context=self.test_context, - client_services=[self.zk, self.kafka, self.workload_service]) + client_services=trogdor_client_services) topic_name = "round_trip_topic%d" % RoundTripFaultTest.topic_name_index RoundTripFaultTest.topic_name_index = RoundTripFaultTest.topic_name_index + 1 active_topics={topic_name : {"partitionAssignments":{"0": [0,1,2]}}} @@ -47,24 +55,38 @@ def __init__(self, test_context): active_topics=active_topics) def setUp(self): - self.zk.start() + if self.zk: + self.zk.start() self.kafka.start() self.trogdor.start() def teardown(self): self.trogdor.stop() self.kafka.stop() - self.zk.stop() + if self.zk: + self.zk.stop() - def test_round_trip_workload(self): + def remote_quorum_nodes(self): + if quorum.for_test(self.test_context) == quorum.zk: + return self.zk.nodes + elif quorum.for_test(self.test_context) == quorum.remote_raft: + return self.kafka.controller_quorum.nodes + else: # co-located case, which we currently don't test but handle here for completeness in case we do test it + return [] + + @cluster(num_nodes=9) + @matrix(metadata_quorum=quorum.all_non_upgrade) + def test_round_trip_workload(self, metadata_quorum=quorum.zk): workload1 = self.trogdor.create_task("workload1", self.round_trip_spec) workload1.wait_for_done(timeout_sec=600) - def test_round_trip_workload_with_broker_partition(self): + @cluster(num_nodes=9) + @matrix(metadata_quorum=quorum.all_non_upgrade) + def test_round_trip_workload_with_broker_partition(self, metadata_quorum=quorum.zk): workload1 = self.trogdor.create_task("workload1", self.round_trip_spec) time.sleep(2) part1 = [self.kafka.nodes[0]] - part2 = self.kafka.nodes[1:] + [self.workload_service.nodes[0]] + self.zk.nodes + part2 = self.kafka.nodes[1:] + [self.workload_service.nodes[0]] + self.remote_quorum_nodes() partition1_spec = NetworkPartitionFaultSpec(0, TaskSpec.MAX_DURATION_MS, [part1, part2]) partition1 = self.trogdor.create_task("partition1", partition1_spec) @@ -72,7 +94,9 @@ def test_round_trip_workload_with_broker_partition(self): partition1.stop() partition1.wait_for_done() - def test_produce_consume_with_broker_pause(self): + @cluster(num_nodes=9) + @matrix(metadata_quorum=quorum.all_non_upgrade) + def test_produce_consume_with_broker_pause(self, metadata_quorum=quorum.zk): workload1 = self.trogdor.create_task("workload1", self.round_trip_spec) time.sleep(2) stop1_spec = ProcessStopFaultSpec(0, TaskSpec.MAX_DURATION_MS, [self.kafka.nodes[0]], @@ -83,22 +107,26 @@ def test_produce_consume_with_broker_pause(self): stop1.wait_for_done() self.kafka.stop_node(self.kafka.nodes[0], False) - def test_produce_consume_with_client_partition(self): + @cluster(num_nodes=9) + @matrix(metadata_quorum=quorum.all_non_upgrade) + def test_produce_consume_with_client_partition(self, metadata_quorum=quorum.zk): workload1 = self.trogdor.create_task("workload1", self.round_trip_spec) time.sleep(2) part1 = [self.workload_service.nodes[0]] - part2 = self.kafka.nodes + self.zk.nodes + part2 = self.kafka.nodes + self.remote_quorum_nodes() partition1_spec = NetworkPartitionFaultSpec(0, 60000, [part1, part2]) stop1 = self.trogdor.create_task("stop1", partition1_spec) workload1.wait_for_done(timeout_sec=600) stop1.stop() stop1.wait_for_done() - def test_produce_consume_with_latency(self): + @cluster(num_nodes=9) + @matrix(metadata_quorum=quorum.all_non_upgrade) + def test_produce_consume_with_latency(self, metadata_quorum=quorum.zk): workload1 = self.trogdor.create_task("workload1", self.round_trip_spec) time.sleep(2) spec = DegradedNetworkFaultSpec(0, 60000) - for node in self.kafka.nodes + self.zk.nodes: + for node in self.kafka.nodes + self.remote_quorum_nodes(): spec.add_node_spec(node.name, "eth0", latencyMs=100, rateLimitKbit=3000) slow1 = self.trogdor.create_task("slow1", spec) workload1.wait_for_done(timeout_sec=600) diff --git a/tests/kafkatest/tests/core/security_test.py b/tests/kafkatest/tests/core/security_test.py index 7339873ed9ffe..5d1d88651fe73 100644 --- a/tests/kafkatest/tests/core/security_test.py +++ b/tests/kafkatest/tests/core/security_test.py @@ -14,11 +14,12 @@ # limitations under the License. from ducktape.cluster.remoteaccount import RemoteCommandError -from ducktape.mark import parametrize +from ducktape.mark import matrix from ducktape.mark.resource import cluster from ducktape.utils.util import wait_until from ducktape.errors import TimeoutError +from kafkatest.services.kafka import quorum from kafkatest.services.security.security_config import SecurityConfig from kafkatest.services.security.security_config import SslStores from kafkatest.tests.end_to_end import EndToEndTest @@ -57,9 +58,9 @@ def producer_consumer_have_expected_error(self, error): return True @cluster(num_nodes=7) - @parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='SSL') - @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') - def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol): + @matrix(security_protocol='PLAINTEXT', interbroker_security_protocol='SSL', metadata_quorum=quorum.all_non_upgrade) + @matrix(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT', metadata_quorum=quorum.all_non_upgrade) + def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol, metadata_quorum=quorum.zk): """ Test that invalid hostname in certificate results in connection failures. When security_protocol=SSL, client SSL handshakes are expected to fail due to hostname verification failure. @@ -71,8 +72,9 @@ def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbr SecurityConfig.ssl_stores = TestSslStores(self.test_context.local_scratch_dir, valid_hostname=True) - self.create_zookeeper() - self.zk.start() + self.create_zookeeper_if_necessary() + if self.zk: + self.zk.start() self.create_kafka(security_protocol=security_protocol, interbroker_security_protocol=interbroker_security_protocol) diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index ad8d0a7dd01f5..2891e70ff19a0 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -14,7 +14,7 @@ # limitations under the License. from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.transactional_message_copier import TransactionalMessageCopier @@ -25,6 +25,7 @@ from ducktape.mark.resource import cluster from ducktape.utils.util import wait_until +import time class TransactionsTest(Test): """Tests transactions by transactionally copying data from a source topic to @@ -58,13 +59,15 @@ def __init__(self, test_context): self.progress_timeout_sec = 60 self.consumer_group = "transactions-test-consumer-group" - self.zk = ZookeeperService(test_context, num_nodes=1) + self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None self.kafka = KafkaService(test_context, num_nodes=self.num_brokers, - zk=self.zk) + zk=self.zk, + controller_num_nodes_override=1) def setUp(self): - self.zk.start() + if self.zk: + self.zk.start() def seed_messages(self, topic, num_seed_messages): seed_timeout_sec = 10000 @@ -92,10 +95,17 @@ def bounce_brokers(self, clean_shutdown): self.kafka.restart_node(node, clean_shutdown = True) else: self.kafka.stop_node(node, clean_shutdown = False) - wait_until(lambda: len(self.kafka.pids(node)) == 0 and not self.kafka.is_registered(node), - timeout_sec=self.kafka.zk_session_timeout + 5, - err_msg="Failed to see timely deregistration of \ - hard-killed broker %s" % str(node.account)) + gracePeriodSecs = 5 + if self.zk: + wait_until(lambda: len(self.kafka.pids(node)) == 0 and not self.kafka.is_registered(node), + timeout_sec=self.kafka.zk_session_timeout + gracePeriodSecs, + err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(node.account)) + else: + brokerSessionTimeoutSecs = 18 + wait_until(lambda: len(self.kafka.pids(node)) == 0, + timeout_sec=brokerSessionTimeoutSecs + gracePeriodSecs, + err_msg="Failed to see timely disappearance of process for hard-killed broker %s" % str(node.account)) + time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs) self.kafka.start_node(node) def create_and_start_message_copier(self, input_topic, input_partition, output_topic, transactional_id, use_group_metadata): @@ -234,8 +244,9 @@ def setup_topics(self): @matrix(failure_mode=["hard_bounce", "clean_bounce"], bounce_target=["brokers", "clients"], check_order=[True, False], - use_group_metadata=[True, False]) - def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata): + use_group_metadata=[True, False], + metadata_quorum=quorum.all_non_upgrade) + def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata, metadata_quorum=quorum.zk): security_protocol = 'PLAINTEXT' self.kafka.security_protocol = security_protocol self.kafka.interbroker_security_protocol = security_protocol diff --git a/tests/kafkatest/tests/core/upgrade_test.py b/tests/kafkatest/tests/core/upgrade_test.py index ae0b3e7d276ff..183e4900e8c1d 100644 --- a/tests/kafkatest/tests/core/upgrade_test.py +++ b/tests/kafkatest/tests/core/upgrade_test.py @@ -24,7 +24,7 @@ from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int from kafkatest.utils.remote_account import java_version -from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, V_0_9_0_0, V_0_11_0_0, V_2_8_0, DEV_BRANCH, KafkaVersion +from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, V_0_11_0_0, V_2_8_0, DEV_BRANCH, KafkaVersion from kafkatest.services.kafka.util import new_jdk_not_supported class TestUpgrade(ProduceConsumeValidateTest): @@ -171,7 +171,7 @@ def test_upgrade(self, from_kafka_version, to_message_format_version, compressio # after leader change. Tolerate limited data loss for this case to avoid transient test failures. self.may_truncate_acked_records = False if from_kafka_version >= V_0_11_0_0 else True - new_consumer = from_kafka_version >= V_0_9_0_0 + new_consumer = from_kafka_version.consumer_supports_bootstrap_server() # TODO - reduce the timeout self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=30000, diff --git a/tests/kafkatest/tests/end_to_end.py b/tests/kafkatest/tests/end_to_end.py index 7ef6b974f6905..bfc316eeea17b 100644 --- a/tests/kafkatest/tests/end_to_end.py +++ b/tests/kafkatest/tests/end_to_end.py @@ -16,7 +16,7 @@ from ducktape.tests.test import Test from ducktape.utils.util import wait_until -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.kafka import TopicPartition from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.verifiable_consumer import VerifiableConsumer @@ -41,8 +41,8 @@ def __init__(self, test_context, topic="test_topic", topic_config=DEFAULT_TOPIC_ self.records_consumed = [] self.last_consumed_offsets = {} - def create_zookeeper(self, num_nodes=1, **kwargs): - self.zk = ZookeeperService(self.test_context, num_nodes=num_nodes, **kwargs) + def create_zookeeper_if_necessary(self, num_nodes=1, **kwargs): + self.zk = ZookeeperService(self.test_context, num_nodes=num_nodes, **kwargs) if quorum.for_test(self.test_context) == quorum.zk else None def create_kafka(self, num_nodes=1, **kwargs): group_metadata_config = { diff --git a/tests/kafkatest/tests/kafka_test.py b/tests/kafkatest/tests/kafka_test.py index 7118721b5ded2..7852768a5f651 100644 --- a/tests/kafkatest/tests/kafka_test.py +++ b/tests/kafkatest/tests/kafka_test.py @@ -17,7 +17,7 @@ from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum class KafkaTest(Test): @@ -34,12 +34,14 @@ def __init__(self, test_context, num_zk, num_brokers, topics=None): self.num_brokers = num_brokers self.topics = topics - self.zk = ZookeeperService(test_context, self.num_zk) + self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None self.kafka = KafkaService( test_context, self.num_brokers, - self.zk, topics=self.topics) + self.zk, topics=self.topics, + controller_num_nodes_override=self.num_zk) def setUp(self): - self.zk.start() + if self.zk: + self.zk.start() self.kafka.start() \ No newline at end of file diff --git a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py index 4ac2795875882..ca07828442bc7 100644 --- a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py @@ -15,6 +15,7 @@ import random from ducktape.mark import matrix +from ducktape.mark.resource import cluster from ducktape.tests.test import Test from ducktape.utils.util import wait_until from kafkatest.services.kafka import KafkaService @@ -50,6 +51,7 @@ def perform_broker_upgrade(self, to_version): node.version = KafkaVersion(to_version) self.kafka.start_node(node) + @cluster(num_nodes=6) @matrix(from_version=smoke_test_versions, to_version=dev_version, bounce_type=["full"]) def test_app_upgrade(self, from_version, to_version, bounce_type): """ diff --git a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py index 6ebd1a5e3557e..69fc3500ef380 100644 --- a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py @@ -14,6 +14,7 @@ # limitations under the License. from ducktape.mark import parametrize +from ducktape.mark.resource import cluster from ducktape.tests.test import Test from ducktape.utils.util import wait_until from kafkatest.services.kafka import KafkaService @@ -61,6 +62,7 @@ def setUp(self): self.zk.start() + @cluster(num_nodes=4) @parametrize(broker_version=str(LATEST_2_4)) @parametrize(broker_version=str(LATEST_2_3)) @parametrize(broker_version=str(LATEST_2_2)) @@ -85,6 +87,7 @@ def test_compatible_brokers_eos_disabled(self, broker_version): self.consumer.stop() self.kafka.stop() + @cluster(num_nodes=4) @parametrize(broker_version=str(LATEST_2_6)) @parametrize(broker_version=str(LATEST_2_5)) @parametrize(broker_version=str(LATEST_2_4)) @@ -129,6 +132,7 @@ def test_compatible_brokers_eos_alpha_enabled(self, broker_version): # self.consumer.stop() # self.kafka.stop() + @cluster(num_nodes=4) @parametrize(broker_version=str(LATEST_0_10_2)) @parametrize(broker_version=str(LATEST_0_10_1)) @parametrize(broker_version=str(LATEST_0_10_0)) @@ -146,6 +150,7 @@ def test_fail_fast_on_incompatible_brokers(self, broker_version): self.kafka.stop() + @cluster(num_nodes=4) @parametrize(broker_version=str(LATEST_2_4)) @parametrize(broker_version=str(LATEST_2_3)) @parametrize(broker_version=str(LATEST_2_2)) diff --git a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py index 8fcf14a3fcc6d..5026d7a23d6ea 100644 --- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py @@ -14,6 +14,7 @@ # limitations under the License. import time +from ducktape.mark.resource import cluster from kafkatest.services.streams import StreamsBrokerDownResilienceService from kafkatest.tests.streams.base_streams_test import BaseStreamsTest @@ -40,6 +41,7 @@ def __init__(self, test_context): def setUp(self): self.zk.start() + @cluster(num_nodes=5) def test_streams_resilient_to_broker_down(self): self.kafka.start() @@ -75,6 +77,7 @@ def test_streams_resilient_to_broker_down(self): self.kafka.stop() + @cluster(num_nodes=7) def test_streams_runs_with_broker_down_initially(self): self.kafka.start() node = self.kafka.leader(self.inputTopic) @@ -141,6 +144,7 @@ def test_streams_runs_with_broker_down_initially(self): self.kafka.stop() + @cluster(num_nodes=7) def test_streams_should_scale_in_while_brokers_down(self): self.kafka.start() @@ -218,6 +222,7 @@ def test_streams_should_scale_in_while_brokers_down(self): self.kafka.stop() + @cluster(num_nodes=7) def test_streams_should_failover_while_brokers_down(self): self.kafka.start() diff --git a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py b/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py index 461573a0a9e1a..4658a5326018a 100644 --- a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py @@ -15,6 +15,7 @@ import time from ducktape.mark import matrix +from ducktape.mark.resource import cluster from ducktape.tests.test import Test from kafkatest.services.kafka import KafkaService from kafkatest.services.verifiable_producer import VerifiableProducer @@ -66,6 +67,7 @@ def __init__(self, test_context): throughput=1000, acks=1) + @cluster(num_nodes=8) @matrix(upgrade_from_version=streams_eager_rebalance_upgrade_versions) def test_upgrade_to_cooperative_rebalance(self, upgrade_from_version): self.zookeeper.start() diff --git a/tests/kafkatest/tests/streams/streams_optimized_test.py b/tests/kafkatest/tests/streams/streams_optimized_test.py index 3209b2536963f..b96ec10d6ba45 100644 --- a/tests/kafkatest/tests/streams/streams_optimized_test.py +++ b/tests/kafkatest/tests/streams/streams_optimized_test.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time +from ducktape.mark.resource import cluster from ducktape.tests.test import Test from ducktape.utils.util import wait_until from kafkatest.services.kafka import KafkaService @@ -56,6 +56,7 @@ def __init__(self, test_context): throughput=1000, acks=1) + @cluster(num_nodes=9) def test_upgrade_optimized_topology(self): self.zookeeper.start() self.kafka.start() diff --git a/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py b/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py index 482da9c5d85f7..d190a1c311935 100644 --- a/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py +++ b/tests/kafkatest/tests/streams/streams_shutdown_deadlock_test.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from ducktape.mark.resource import cluster from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.streams import StreamsSmokeTestShutdownDeadlockService @@ -29,6 +30,7 @@ def __init__(self, test_context): self.driver = StreamsSmokeTestShutdownDeadlockService(test_context, self.kafka) + @cluster(num_nodes=3) def test_shutdown_wont_deadlock(self): """ Start ShutdownDeadLockTest, wait for upt to 1 minute, and check that the process exited. diff --git a/tests/kafkatest/tests/streams/streams_smoke_test.py b/tests/kafkatest/tests/streams/streams_smoke_test.py index 1a4f296eb2a72..b1f908ddcf3b2 100644 --- a/tests/kafkatest/tests/streams/streams_smoke_test.py +++ b/tests/kafkatest/tests/streams/streams_smoke_test.py @@ -16,6 +16,7 @@ from ducktape.mark import matrix from ducktape.mark.resource import cluster +from kafkatest.services.kafka import quorum from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService @@ -46,8 +47,8 @@ def __init__(self, test_context): self.driver = StreamsSmokeTestDriverService(test_context, self.kafka) @cluster(num_nodes=8) - @matrix(processing_guarantee=['at_least_once', 'exactly_once', 'exactly_once_beta'], crash=[True, False]) - def test_streams(self, processing_guarantee, crash): + @matrix(processing_guarantee=['at_least_once', 'exactly_once', 'exactly_once_beta'], crash=[True, False], metadata_quorum=quorum.all_non_upgrade) + def test_streams(self, processing_guarantee, crash, metadata_quorum=quorum.zk): processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee) processor2 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee) processor3 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee) diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py b/tests/kafkatest/tests/streams/streams_standby_replica_test.py index e847c3ebf9d90..a8c07513c1c2e 100644 --- a/tests/kafkatest/tests/streams/streams_standby_replica_test.py +++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from ducktape.mark.resource import cluster from ducktape.utils.util import wait_until from kafkatest.services.streams import StreamsStandbyTaskService from kafkatest.tests.streams.base_streams_test import BaseStreamsTest @@ -43,6 +44,7 @@ def __init__(self, test_context): 'replication-factor': 1} }) + @cluster(num_nodes=10) def test_standby_tasks_rebalance(self): # TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor configs = self.get_configs( diff --git a/tests/kafkatest/tests/streams/streams_static_membership_test.py b/tests/kafkatest/tests/streams/streams_static_membership_test.py index e6072f4b3dc42..f31c38a75d488 100644 --- a/tests/kafkatest/tests/streams/streams_static_membership_test.py +++ b/tests/kafkatest/tests/streams/streams_static_membership_test.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from ducktape.mark.resource import cluster from ducktape.tests.test import Test from kafkatest.services.kafka import KafkaService from kafkatest.services.streams import StaticMemberTestService @@ -48,6 +49,7 @@ def __init__(self, test_context): throughput=1000, acks=1) + @cluster(num_nodes=8) def test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self): self.zookeeper.start() self.kafka.start() diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index 78f171aa83bdd..9aff673349c8d 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -186,6 +186,7 @@ def test_upgrade_downgrade_brokers(self, from_version, to_version): processor.stop() processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False) + @cluster(num_nodes=6) @matrix(from_version=metadata_1_versions, to_version=[str(DEV_VERSION)]) @matrix(from_version=metadata_2_versions, to_version=[str(DEV_VERSION)]) def test_metadata_upgrade(self, from_version, to_version): @@ -238,6 +239,7 @@ def test_metadata_upgrade(self, from_version, to_version): timeout_sec=60, err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account)) + @cluster(num_nodes=6) def test_version_probing_upgrade(self): """ Starts 3 KafkaStreams instances, and upgrades one-by-one to "future version" diff --git a/tests/kafkatest/tests/tools/log4j_appender_test.py b/tests/kafkatest/tests/tools/log4j_appender_test.py index 61a5d2a8932ba..0287f2f4d0e83 100644 --- a/tests/kafkatest/tests/tools/log4j_appender_test.py +++ b/tests/kafkatest/tests/tools/log4j_appender_test.py @@ -20,7 +20,7 @@ from ducktape.mark.resource import cluster from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.kafka_log4j_appender import KafkaLog4jAppender @@ -41,16 +41,18 @@ def __init__(self, test_context): TOPIC: {'partitions': 1, 'replication-factor': 1} } - self.zk = ZookeeperService(test_context, self.num_zk) + self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None def setUp(self): - self.zk.start() + if self.zk: + self.zk.start() def start_kafka(self, security_protocol, interbroker_security_protocol): self.kafka = KafkaService( self.test_context, self.num_brokers, self.zk, security_protocol=security_protocol, - interbroker_security_protocol=interbroker_security_protocol, topics=self.topics) + interbroker_security_protocol=interbroker_security_protocol, topics=self.topics, + controller_num_nodes_override=self.num_zk) self.kafka.start() def start_appender(self, security_protocol): @@ -70,10 +72,10 @@ def start_consumer(self): self.consumer.start() @cluster(num_nodes=4) - @matrix(security_protocol=['PLAINTEXT', 'SSL']) + @matrix(security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all_non_upgrade) @cluster(num_nodes=5) - @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL']) - def test_log4j_appender(self, security_protocol='PLAINTEXT'): + @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], metadata_quorum=quorum.all_non_upgrade) + def test_log4j_appender(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk): """ Tests if KafkaLog4jAppender is producing to Kafka topic :return: None diff --git a/tests/kafkatest/tests/tools/log_compaction_test.py b/tests/kafkatest/tests/tools/log_compaction_test.py index 338060f72175b..a91a976550a9f 100644 --- a/tests/kafkatest/tests/tools/log_compaction_test.py +++ b/tests/kafkatest/tests/tools/log_compaction_test.py @@ -14,13 +14,14 @@ # limitations under the License. +from ducktape.mark import matrix from ducktape.utils.util import wait_until from ducktape.tests.test import Test from ducktape.mark.resource import cluster from kafkatest.services.kafka import config_property from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.log_compaction_tester import LogCompactionTester class LogCompactionTest(Test): @@ -33,12 +34,13 @@ def __init__(self, test_context): self.num_zk = 1 self.num_brokers = 1 - self.zk = ZookeeperService(test_context, self.num_zk) + self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None self.kafka = None self.compaction_verifier = None def setUp(self): - self.zk.start() + if self.zk: + self.zk.start() def start_kafka(self, security_protocol, interbroker_security_protocol): self.kafka = KafkaService( @@ -49,7 +51,8 @@ def start_kafka(self, security_protocol, interbroker_security_protocol): interbroker_security_protocol=interbroker_security_protocol, server_prop_overides=[ [config_property.LOG_SEGMENT_BYTES, LogCompactionTest.LOG_SEGMENT_BYTES], - ]) + ], + controller_num_nodes_override=self.num_zk) self.kafka.start() def start_test_log_compaction_tool(self, security_protocol): @@ -57,7 +60,8 @@ def start_test_log_compaction_tool(self, security_protocol): self.compaction_verifier.start() @cluster(num_nodes=4) - def test_log_compaction(self, security_protocol='PLAINTEXT'): + @matrix(metadata_quorum=quorum.all_non_upgrade) + def test_log_compaction(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk): self.start_kafka(security_protocol, security_protocol) self.start_test_log_compaction_tool(security_protocol) diff --git a/tests/kafkatest/tests/tools/replica_verification_test.py b/tests/kafkatest/tests/tools/replica_verification_test.py index f296c73b76455..baa0536f218d0 100644 --- a/tests/kafkatest/tests/tools/replica_verification_test.py +++ b/tests/kafkatest/tests/tools/replica_verification_test.py @@ -14,13 +14,14 @@ # limitations under the License. +from ducktape.mark import matrix +from ducktape.mark.resource import cluster from ducktape.utils.util import wait_until from ducktape.tests.test import Test -from ducktape.mark.resource import cluster from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.replica_verification_tool import ReplicaVerificationTool TOPIC = "topic-replica-verification" @@ -39,19 +40,21 @@ def __init__(self, test_context): TOPIC: {'partitions': 1, 'replication-factor': 2} } - self.zk = ZookeeperService(test_context, self.num_zk) + self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None self.kafka = None self.producer = None self.replica_verifier = None def setUp(self): - self.zk.start() + if self.zk: + self.zk.start() def start_kafka(self, security_protocol, interbroker_security_protocol): self.kafka = KafkaService( self.test_context, self.num_brokers, self.zk, security_protocol=security_protocol, - interbroker_security_protocol=interbroker_security_protocol, topics=self.topics) + interbroker_security_protocol=interbroker_security_protocol, topics=self.topics, + controller_num_nodes_override=self.num_zk) self.kafka.start() def start_replica_verification_tool(self, security_protocol): @@ -70,7 +73,8 @@ def stop_producer(self): self.producer.stop() @cluster(num_nodes=6) - def test_replica_lags(self, security_protocol='PLAINTEXT'): + @matrix(metadata_quorum=quorum.all_non_upgrade) + def test_replica_lags(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk): """ Tests ReplicaVerificationTool :return: None diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index e2e7408b9fe7a..9c329cf486abb 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -62,6 +62,18 @@ def _cmp(self, other): return LooseVersion._cmp(self, other) + def consumer_supports_bootstrap_server(self): + """ + Kafka supported a new consumer beginning with v0.9.0 where + we can specify --bootstrap-server instead of --zookeeper. + + This version also allowed a --consumer-config file where we could specify + a security protocol other than PLAINTEXT. + + :return: true if the version of Kafka supports a new consumer with --bootstrap-server + """ + return self >= V_0_9_0_0 + def supports_named_listeners(self): return self >= V_0_10_2_0