Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions tests/kafkatest/sanity_checks/test_bounce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from ducktape.mark import parametrize
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until

from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService


class TestBounce(Test):
"""Sanity checks on verifiable producer service class with cluster roll."""
def __init__(self, test_context):
super(TestBounce, self).__init__(test_context)

self.topic = "topic"
self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None
self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
topics={self.topic: {"partitions": 1, "replication-factor": 1}},
controller_num_nodes_override=3 if quorum.for_test(test_context) == quorum.remote_raft else 1)
self.num_messages = 1000

def create_producer(self):
# This will produce to source kafka cluster
self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic,
max_messages=self.num_messages, throughput=self.num_messages // 10)
def setUp(self):
if self.zk:
self.zk.start()

@cluster(num_nodes=6)
@parametrize(metadata_quorum=quorum.remote_raft)
@cluster(num_nodes=4)
@parametrize(metadata_quorum=quorum.colocated_raft)
@cluster(num_nodes=4)
@parametrize(metadata_quorum=quorum.zk)
def test_simple_run(self, metadata_quorum):
"""
Test that we can start VerifiableProducer on the current branch snapshot version, and
verify that we can produce a small number of messages both before and after a subsequent roll.
"""
self.kafka.start()
for first_time in [True, False]:
self.create_producer()
self.producer.start()
wait_until(lambda: self.producer.num_acked > 5, timeout_sec=15,
err_msg="Producer failed to start in a reasonable amount of time.")

self.producer.wait()
num_produced = self.producer.num_acked
assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages)
if first_time:
self.producer.stop()
if self.kafka.quorum_info.using_raft and self.kafka.remote_controller_quorum:
self.kafka.remote_controller_quorum.restart_cluster()
self.kafka.restart_cluster()
16 changes: 9 additions & 7 deletions tests/kafkatest/sanity_checks/test_console_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from ducktape.utils.util import wait_until

from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka import KafkaService
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.utils.remote_account import line_count, file_exists
Expand All @@ -34,20 +34,22 @@ def __init__(self, test_context):
super(ConsoleConsumerTest, self).__init__(test_context)

self.topic = "topic"
self.zk = ZookeeperService(test_context, num_nodes=1)
self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, zk_chroot="/kafka",
topics={self.topic: {"partitions": 1, "replication-factor": 1}})
self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic)

def setUp(self):
self.zk.start()
if self.zk:
self.zk.start()

@cluster(num_nodes=3)
@matrix(security_protocol=['PLAINTEXT', 'SSL'])
@matrix(security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all_raft)
@cluster(num_nodes=4)
@matrix(security_protocol=['SASL_SSL'], sasl_mechanism=['PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'])
@matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'])
def test_lifecycle(self, security_protocol, sasl_mechanism='GSSAPI'):
@matrix(security_protocol=['SASL_SSL'], sasl_mechanism=['PLAIN'], metadata_quorum=quorum.all_raft)
@matrix(security_protocol=['SASL_SSL'], sasl_mechanism=['SCRAM-SHA-256', 'SCRAM-SHA-512']) # SCRAM not yet supported with Raft
@matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], metadata_quorum=quorum.all_raft)
def test_lifecycle(self, security_protocol, sasl_mechanism='GSSAPI', metadata_quorum=quorum.zk):
"""Check that console consumer starts/stops properly, and that we are capturing log output."""

self.kafka.security_protocol = security_protocol
Expand Down
16 changes: 10 additions & 6 deletions tests/kafkatest/sanity_checks/test_performance_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ducktape.mark import parametrize
from ducktape.mark import matrix, parametrize
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test

from kafkatest.services.kafka import KafkaService
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService
from kafkatest.services.performance import latency, compute_aggregate_throughput
from kafkatest.services.zookeeper import ZookeeperService
Expand All @@ -31,10 +31,11 @@ def __init__(self, test_context):
self.num_records = 10000
self.topic = "topic"

self.zk = ZookeeperService(test_context, 1)
self.zk = ZookeeperService(test_context, 1) if quorum.for_test(test_context) == quorum.zk else None

def setUp(self):
self.zk.start()
if self.zk:
self.zk.start()

@cluster(num_nodes=5)
# We are keeping 0.8.2 here so that we don't inadvertently break support for it. Since this is just a sanity check,
Expand All @@ -43,8 +44,9 @@ def setUp(self):
@parametrize(version=str(LATEST_0_9), new_consumer=False)
@parametrize(version=str(LATEST_0_9))
@parametrize(version=str(LATEST_1_1), new_consumer=False)
@parametrize(version=str(DEV_BRANCH))
def test_version(self, version=str(LATEST_0_9), new_consumer=True):
@cluster(num_nodes=5)
@matrix(version=[str(DEV_BRANCH)], metadata_quorum=quorum.all)
def test_version(self, version=str(LATEST_0_9), new_consumer=True, metadata_quorum=quorum.zk):
"""
Sanity check out producer performance service - verify that we can run the service with a small
number of messages. The actual stats here are pretty meaningless since the number of messages is quite small.
Expand All @@ -67,6 +69,7 @@ def test_version(self, version=str(LATEST_0_9), new_consumer=True):
'buffer.memory': 64*1024*1024})
self.producer_perf.run()
producer_perf_data = compute_aggregate_throughput(self.producer_perf)
assert producer_perf_data['records_per_sec'] > 0

# check basic run of end to end latency
self.end_to_end = EndToEndLatencyService(
Expand All @@ -82,6 +85,7 @@ def test_version(self, version=str(LATEST_0_9), new_consumer=True):
self.consumer_perf.group = "test-consumer-group"
self.consumer_perf.run()
consumer_perf_data = compute_aggregate_throughput(self.consumer_perf)
assert consumer_perf_data['records_per_sec'] > 0

return {
"producer_performance": producer_perf_data,
Expand Down
32 changes: 24 additions & 8 deletions tests/kafkatest/sanity_checks/test_verifiable_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
# limitations under the License.


from ducktape.mark import parametrize
from ducktape.mark import matrix, parametrize
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until

from kafkatest.services.kafka import KafkaService
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.utils import is_version
Expand All @@ -32,7 +32,7 @@ def __init__(self, test_context):
super(TestVerifiableProducer, self).__init__(test_context)

self.topic = "topic"
self.zk = ZookeeperService(test_context, num_nodes=1)
self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None
self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
topics={self.topic: {"partitions": 1, "replication-factor": 1}})

Expand All @@ -41,24 +41,40 @@ def __init__(self, test_context):
self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.kafka, topic=self.topic,
max_messages=self.num_messages, throughput=self.num_messages // 10)
def setUp(self):
self.zk.start()
self.kafka.start()
if self.zk:
self.zk.start()

@cluster(num_nodes=3)
@parametrize(producer_version=str(LATEST_0_8_2))
@parametrize(producer_version=str(LATEST_0_9))
@parametrize(producer_version=str(LATEST_0_10_0))
@parametrize(producer_version=str(LATEST_0_10_1))
@parametrize(producer_version=str(DEV_BRANCH))
def test_simple_run(self, producer_version=DEV_BRANCH):
@matrix(producer_version=[str(DEV_BRANCH)], security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all)
@cluster(num_nodes=4)
@matrix(producer_version=[str(DEV_BRANCH)], security_protocol=['SASL_SSL'], sasl_mechanism=['PLAIN', 'GSSAPI'],
metadata_quorum=quorum.all)
def test_simple_run(self, producer_version, security_protocol = 'PLAINTEXT', sasl_mechanism='PLAIN',
metadata_quorum=quorum.zk):
"""
Test that we can start VerifiableProducer on the current branch snapshot version or against the 0.8.2 jar, and
verify that we can produce a small number of messages.
"""
self.kafka.security_protocol = security_protocol
self.kafka.client_sasl_mechanism = sasl_mechanism
self.kafka.interbroker_security_protocol = security_protocol
self.kafka.interbroker_sasl_mechanism = sasl_mechanism
if self.kafka.quorum_info.using_raft:
controller_quorum = self.kafka.controller_quorum
controller_quorum.controller_security_protocol = security_protocol
controller_quorum.controller_sasl_mechanism = sasl_mechanism
controller_quorum.intercontroller_security_protocol = security_protocol
controller_quorum.intercontroller_sasl_mechanism = sasl_mechanism
self.kafka.start()

node = self.producer.nodes[0]
node.version = KafkaVersion(producer_version)
self.producer.start()
wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5,
wait_until(lambda: self.producer.num_acked > 5, timeout_sec=15,
err_msg="Producer failed to start in a reasonable amount of time.")

# using version.vstring (distutils.version.LooseVersion) is a tricky way of ensuring
Expand Down
9 changes: 5 additions & 4 deletions tests/kafkatest/services/console_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.monitor.jmx import JmxMixin, JmxTool
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_9_0_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0
from kafkatest.services.kafka.util import fix_opts_for_new_jvm

"""
Expand Down Expand Up @@ -151,7 +151,9 @@ def prop_file(self, node):
def start_cmd(self, node):
"""Return the start command appropriate for the given node."""
args = self.args.copy()
args['zk_connect'] = self.kafka.zk_connect_setting()
args['broker_list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol)
if not self.new_consumer:
args['zk_connect'] = self.kafka.zk_connect_setting()
args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
args['stderr'] = ConsoleConsumer.STDERR_CAPTURE
args['log_dir'] = ConsoleConsumer.LOG_DIR
Expand All @@ -160,7 +162,6 @@ def start_cmd(self, node):
args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
args['jmx_port'] = self.jmx_port
args['console_consumer'] = self.path.script("kafka-console-consumer.sh", node)
args['broker_list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol)

if self.kafka_opts_override:
args['kafka_opts'] = "\"%s\"" % self.kafka_opts_override
Expand All @@ -177,7 +178,7 @@ def start_cmd(self, node):
"--consumer.config %(config_file)s " % args

if self.new_consumer:
assert node.version >= V_0_9_0_0, \
assert node.version.consumer_supports_bootstrap_server(), \
"new_consumer is only supported if version >= 0.9.0.0, version %s" % str(node.version)
if node.version <= LATEST_0_10_0:
cmd += " --new-consumer"
Expand Down
10 changes: 5 additions & 5 deletions tests/kafkatest/services/performance/consumer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from kafkatest.services.performance import PerformanceService
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.version import DEV_BRANCH, V_0_9_0_0, V_2_0_0, LATEST_0_10_0
from kafkatest.version import DEV_BRANCH, V_2_0_0, LATEST_0_10_0


class ConsumerPerformanceService(PerformanceService):
Expand Down Expand Up @@ -79,14 +79,14 @@ def __init__(self, context, num_nodes, kafka, topic, messages, version=DEV_BRANC
self.new_consumer = new_consumer
self.settings = settings

assert version >= V_0_9_0_0 or (not new_consumer), \
assert version.consumer_supports_bootstrap_server() or (not new_consumer), \
"new_consumer is only supported if version >= 0.9.0.0, version %s" % str(version)

assert version < V_2_0_0 or new_consumer, \
"new_consumer==false is only supported if version < 2.0.0, version %s" % str(version)

security_protocol = self.security_config.security_protocol
assert version >= V_0_9_0_0 or security_protocol == SecurityConfig.PLAINTEXT, \
assert version.consumer_supports_bootstrap_server() or security_protocol == SecurityConfig.PLAINTEXT, \
"Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version))

# These less-frequently used settings can be updated manually after instantiation
Expand Down Expand Up @@ -142,7 +142,7 @@ def start_cmd(self, node):
for key, value in self.args(node.version).items():
cmd += " --%s %s" % (key, value)

if node.version >= V_0_9_0_0:
if node.version.consumer_supports_bootstrap_server():
# This is only used for security settings
cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE

Expand All @@ -155,7 +155,7 @@ def start_cmd(self, node):

def parse_results(self, line, version):
parts = line.split(',')
if version >= V_0_9_0_0:
if version.consumer_supports_bootstrap_server():
result = {
'total_mb': float(parts[2]),
'mbps': float(parts[3]),
Expand Down
13 changes: 8 additions & 5 deletions tests/kafkatest/services/performance/end_to_end_latency.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from kafkatest.services.performance import PerformanceService
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.version import DEV_BRANCH, V_0_9_0_0
from kafkatest.version import DEV_BRANCH



Expand Down Expand Up @@ -53,7 +53,7 @@ def __init__(self, context, num_nodes, kafka, topic, num_records, compression_ty

security_protocol = self.security_config.security_protocol

if version < V_0_9_0_0:
if not version.consumer_supports_bootstrap_server():
assert security_protocol == SecurityConfig.PLAINTEXT, \
"Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version))
assert compression_type == "none", \
Expand All @@ -74,15 +74,18 @@ def __init__(self, context, num_nodes, kafka, topic, num_records, compression_ty
def start_cmd(self, node):
args = self.args.copy()
args.update({
'zk_connect': self.kafka.zk_connect_setting(),
'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol),
'config_file': EndToEndLatencyService.CONFIG_FILE,
'kafka_run_class': self.path.script("kafka-run-class.sh", node),
'java_class_name': self.java_class_name()
})
if not node.version.consumer_supports_bootstrap_server():
args.update({
'zk_connect': self.kafka.zk_connect_setting(),
})

cmd = "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % EndToEndLatencyService.LOG4J_CONFIG
if node.version >= V_0_9_0_0:
if node.version.consumer_supports_bootstrap_server():
cmd += "KAFKA_OPTS=%(kafka_opts)s %(kafka_run_class)s %(java_class_name)s " % args
cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d %(message_bytes)d %(config_file)s" % args
else:
Expand All @@ -102,7 +105,7 @@ def _worker(self, idx, node):

node.account.create_file(EndToEndLatencyService.LOG4J_CONFIG, log_config)
client_config = str(self.security_config)
if node.version >= V_0_9_0_0:
if node.version.consumer_supports_bootstrap_server():
client_config += "compression_type=%(compression_type)s" % self.args
node.account.create_file(EndToEndLatencyService.CONFIG_FILE, client_config)

Expand Down
4 changes: 2 additions & 2 deletions tests/kafkatest/services/performance/producer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from kafkatest.services.monitor.http import HttpMetricsCollector
from kafkatest.services.performance import PerformanceService
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.version import DEV_BRANCH, V_0_9_0_0
from kafkatest.version import DEV_BRANCH


class ProducerPerformanceService(HttpMetricsCollector, PerformanceService):
Expand Down Expand Up @@ -55,7 +55,7 @@ def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, t
self.security_config = kafka.security_config.client_config()

security_protocol = self.security_config.security_protocol
assert version >= V_0_9_0_0 or security_protocol == SecurityConfig.PLAINTEXT, \
assert version.consumer_supports_bootstrap_server() or security_protocol == SecurityConfig.PLAINTEXT, \
"Security protocol %s is only supported if version >= 0.9.0.0, version %s" % (self.security_config, str(version))

self.args = {
Expand Down
Loading