From 26ae395ea647a8f75c8242085f4623445e22e20f Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Thu, 1 Oct 2015 19:41:33 +0000 Subject: [PATCH 1/4] KAFKA-2581: Run some existing ducktape tests with SSL-enabled clients and brokers --- .../kafka/tools/ConsumerPerformance.scala | 10 +- .../sanity_checks/test_console_consumer.py | 9 +- tests/kafkatest/services/console_consumer.py | 19 ++- tests/kafkatest/services/kafka.py | 20 ++- .../performance/consumer_performance.py | 10 +- .../performance/end_to_end_latency.py | 14 +- .../performance/producer_performance.py | 14 +- .../services/templates/kafka.properties | 12 ++ .../templates/verifiable_producer.properties | 16 +++ .../kafkatest/services/verifiable_producer.py | 21 ++- tests/kafkatest/tests/benchmark_test.py | 81 +++++++---- tests/kafkatest/tests/replication_test.py | 43 +++--- tests/kafkatest/utils/security_config.py | 132 ++++++++++++++++++ .../clients/tools/VerifiableProducer.java | 16 +++ 14 files changed, 353 insertions(+), 64 deletions(-) create mode 100644 tests/kafkatest/services/templates/verifiable_producer.properties create mode 100644 tests/kafkatest/utils/security_config.py diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index 826703072b921..c7f907238b152 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -27,6 +27,7 @@ import java.nio.channels.ClosedByInterruptException import org.apache.log4j.Logger import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer} import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.utils.Utils import kafka.utils.CommandLineUtils import java.util.{ Random, Properties } import kafka.consumer.Consumer @@ -203,6 +204,10 @@ object ConsumerPerformance { .ofType(classOf[java.lang.Integer]) .defaultsTo(1) val useNewConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation.") + val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.") + .withRequiredArg + .describedAs("config file") + .ofType(classOf[String]) val options = parser.parse(args: _*) @@ -210,7 +215,10 @@ object ConsumerPerformance { val useNewConsumer = options.has(useNewConsumerOpt) - val props = new Properties + val props = if (options.has(consumerConfigOpt)) + Utils.loadProps(options.valueOf(consumerConfigOpt)) + else + new Properties if(useNewConsumer) { import org.apache.kafka.clients.consumer.ConsumerConfig props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServersOpt)) diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py index 4544c0097db0e..b35834b783033 100644 --- a/tests/kafkatest/sanity_checks/test_console_consumer.py +++ b/tests/kafkatest/sanity_checks/test_console_consumer.py @@ -15,6 +15,8 @@ from ducktape.tests.test import Test from ducktape.utils.util import wait_until +from ducktape.mark import parametrize +from ducktape.mark import matrix from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService @@ -28,18 +30,21 @@ class ConsoleConsumerTest(Test): def __init__(self, test_context): super(ConsoleConsumerTest, self).__init__(test_context) + self.test_context = test_context self.topic = "topic" self.zk = ZookeeperService(test_context, num_nodes=1) self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {"partitions": 1, "replication-factor": 1}}) - self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.kafka, topic=self.topic) def setUp(self): self.zk.start() self.kafka.start() - def test_lifecycle(self): + @parametrize(security_protocol='SSL', new_consumer=True) + @matrix(security_protocol=['PLAINTEXT'], new_consumer=[False, True]) + def test_lifecycle(self, security_protocol, new_consumer): t0 = time.time() + self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, security_protocol=security_protocol, new_consumer=new_consumer) self.consumer.start() node = self.consumer.nodes[0] diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 9286654007039..f4ea91ead67bb 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -15,6 +15,7 @@ from ducktape.services.background_thread import BackgroundThreadService from ducktape.utils.util import wait_until +from kafkatest.utils.security_config import SecurityConfig import os import subprocess @@ -93,13 +94,15 @@ class ConsoleConsumer(BackgroundThreadService): "collect_default": True} } - def __init__(self, context, num_nodes, kafka, topic, message_validator=None, from_beginning=True, consumer_timeout_ms=None): + def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, new_consumer=False, message_validator=None, from_beginning=True, consumer_timeout_ms=None): """ Args: context: standard context num_nodes: number of nodes to use (this should be 1) kafka: kafka service topic: consume from this topic + security_protocol: security protocol for Kafka connections + new_consumer: use new Kafka consumer if True message_validator: function which returns message or None from_beginning: consume from beginning if True, else from the end consumer_timeout_ms: corresponds to consumer.timeout.ms. consumer process ends if time between @@ -109,6 +112,8 @@ def __init__(self, context, num_nodes, kafka, topic, message_validator=None, fro """ super(ConsoleConsumer, self).__init__(context, num_nodes) self.kafka = kafka + self.security_protocol = security_protocol + self.new_consumer = new_consumer self.args = { 'topic': topic, } @@ -132,6 +137,8 @@ def start_cmd(self): cmd += " /opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s --zookeeper %(zk_connect)s" \ " --consumer.config %(config_file)s" % args + if self.new_consumer: + cmd += " --new-consumer --bootstrap-server %s" % self.kafka.bootstrap_servers(self.security_protocol) if self.from_beginning: cmd += " --from-beginning" @@ -158,6 +165,14 @@ def _worker(self, idx, node): else: prop_file = self.render('console_consumer.properties') + # Add security properties to the config. If security protocol is not specified, + # use the default in the template properties. + security_config = SecurityConfig(node.account, self.security_protocol, prop_file) + prop_file += str(security_config) + if self.security_protocol is None: + self.security_protocol = security_config.security_protocol + if self.security_protocol == SecurityConfig.SSL: + self.new_consumer = True self.logger.info("console_consumer.properties:") self.logger.info(prop_file) node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file) @@ -189,5 +204,5 @@ def clean_node(self, node): self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." % (self.__class__.__name__, node.account)) node.account.kill_process("java", clean_shutdown=False, allow_fail=True) - node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False) + node.account.ssh("rm -rf %s /mnt/ssl" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False) diff --git a/tests/kafkatest/services/kafka.py b/tests/kafkatest/services/kafka.py index 5ff804766f1df..0f8e1e391270b 100644 --- a/tests/kafkatest/services/kafka.py +++ b/tests/kafkatest/services/kafka.py @@ -15,6 +15,7 @@ from ducktape.services.service import Service from ducktape.utils.util import wait_until +from kafkatest.utils.security_config import SecurityConfig import json import re @@ -33,7 +34,7 @@ class KafkaService(Service): "collect_default": False} } - def __init__(self, context, num_nodes, zk, topics=None): + def __init__(self, context, num_nodes, zk, interbroker_security_protocol=SecurityConfig.PLAINTEXT, topics=None): """ :type context :type zk: ZookeeperService @@ -41,6 +42,7 @@ def __init__(self, context, num_nodes, zk, topics=None): """ super(KafkaService, self).__init__(context, num_nodes) self.zk = zk + self.interbroker_security_protocol = interbroker_security_protocol self.topics = topics def start(self): @@ -56,10 +58,12 @@ def start(self): self.create_topic(topic_cfg) def start_node(self, node): - props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node)) + props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node), interbroker_security_protocol=self.interbroker_security_protocol) self.logger.info("kafka.properties:") self.logger.info(props_file) node.account.create_file("/mnt/kafka.properties", props_file) + # Create keystore and truststore for SSL endpoint + SecurityConfig(node.account, SecurityConfig.SSL) cmd = "/opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log & echo $! > /mnt/kafka.pid" self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd)) @@ -96,7 +100,7 @@ def stop_node(self, node, clean_shutdown=True): def clean_node(self, node): node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True) - node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid", allow_fail=False) + node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid /mnt/ssl", allow_fail=False) def create_topic(self, topic_cfg): node = self.nodes[0] # any node is fine here @@ -226,5 +230,11 @@ def leader(self, topic, partition=0): self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx)) return self.get_node(leader_idx) - def bootstrap_servers(self): - return ','.join([node.account.hostname + ":9092" for node in self.nodes]) + def bootstrap_servers(self, security_protocol=SecurityConfig.PLAINTEXT): + """Get the broker list to connect to Kafka using the specified security protocol + """ + if security_protocol == SecurityConfig.SSL: + port = 9093 + else: + port = 9092 + return ','.join([node.account.hostname + ":" + `port` for node in self.nodes]) diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py index ecaef43f14ba4..e756b6715ad38 100644 --- a/tests/kafkatest/services/performance/consumer_performance.py +++ b/tests/kafkatest/services/performance/consumer_performance.py @@ -14,6 +14,7 @@ # limitations under the License. from kafkatest.services.performance import PerformanceService +from kafkatest.utils.security_config import SecurityConfig import os @@ -43,6 +44,7 @@ class ConsumerPerformanceService(PerformanceService): "num-fetch-threads", "Number of fetcher threads. Defaults to 1" "new-consumer", "Use the new consumer implementation." + "consumer.config", "Consumer config properties file." """ # Root directory for persistent output @@ -51,6 +53,7 @@ class ConsumerPerformanceService(PerformanceService): STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "consumer_performance.stdout") LOG_FILE = os.path.join(LOG_DIR, "consumer_performance.log") LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") + CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "consumer.properties") logs = { "consumer_performance_output": { @@ -62,9 +65,10 @@ class ConsumerPerformanceService(PerformanceService): "collect_default": True} } - def __init__(self, context, num_nodes, kafka, topic, messages, new_consumer=False, settings={}): + def __init__(self, context, num_nodes, kafka, security_protocol, topic, messages, new_consumer=False, settings={}): super(ConsumerPerformanceService, self).__init__(context, num_nodes) self.kafka = kafka + self.security_protocol = security_protocol self.topic = topic self.messages = messages self.new_consumer = new_consumer @@ -88,7 +92,7 @@ def args(self): if self.new_consumer: args['new-consumer'] = "" - args['broker-list'] = self.kafka.bootstrap_servers() + args['broker-list'] = self.kafka.bootstrap_servers(self.security_protocol) else: args['zookeeper'] = self.kafka.zk.connect_setting() @@ -119,6 +123,7 @@ def start_cmd(self): cmd += " /opt/kafka/bin/kafka-consumer-perf-test.sh" for key, value in self.args.items(): cmd += " --%s %s" % (key, value) + cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE for key, value in self.settings.items(): cmd += " %s=%s" % (str(key), str(value)) @@ -131,6 +136,7 @@ def _worker(self, idx, node): log_config = self.render('tools_log4j.properties', log_file=ConsumerPerformanceService.LOG_FILE) node.account.create_file(ConsumerPerformanceService.LOG4J_CONFIG, log_config) + SecurityConfig(node.account, self.security_protocol).write_to_file(ConsumerPerformanceService.CONFIG_FILE) cmd = self.start_cmd self.logger.debug("Consumer performance %d command: %s", idx, cmd) diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py index 4c61a93e8f259..3562563de64f2 100644 --- a/tests/kafkatest/services/performance/end_to_end_latency.py +++ b/tests/kafkatest/services/performance/end_to_end_latency.py @@ -14,6 +14,7 @@ # limitations under the License. from kafkatest.services.performance import PerformanceService +from kafkatest.utils.security_config import SecurityConfig class EndToEndLatencyService(PerformanceService): @@ -24,9 +25,10 @@ class EndToEndLatencyService(PerformanceService): "collect_default": True}, } - def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1): + def __init__(self, context, num_nodes, kafka, security_protocol, topic, num_records, consumer_fetch_max_wait=100, acks=1): super(EndToEndLatencyService, self).__init__(context, num_nodes) self.kafka = kafka + self.security_protocol = security_protocol self.args = { 'topic': topic, 'num_records': num_records, @@ -36,14 +38,20 @@ def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch def _worker(self, idx, node): args = self.args.copy() + if self.security_protocol == SecurityConfig.SSL: + ssl_config = SecurityConfig(node.account, self.security_protocol) + ssl_config_file = ssl_config.write_to_file() + else: + ssl_config_file = "" args.update({ 'zk_connect': self.kafka.zk.connect_setting(), - 'bootstrap_servers': self.kafka.bootstrap_servers(), + 'bootstrap_servers': self.kafka.bootstrap_servers(self.security_protocol), + 'ssl_config_file': ssl_config_file }) cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\ "%(bootstrap_servers)s %(topic)s %(num_records)d "\ - "%(acks)d 20" % args + "%(acks)d 20 %(ssl_config_file)s" % args cmd += " | tee /mnt/end-to-end-latency.log" diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py index c46a910f29323..1af35a52527f6 100644 --- a/tests/kafkatest/services/performance/producer_performance.py +++ b/tests/kafkatest/services/performance/producer_performance.py @@ -14,6 +14,7 @@ # limitations under the License. from kafkatest.services.performance import PerformanceService +from kafkatest.utils.security_config import SecurityConfig class ProducerPerformanceService(PerformanceService): @@ -24,9 +25,10 @@ class ProducerPerformanceService(PerformanceService): "collect_default": True}, } - def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False): + def __init__(self, context, num_nodes, kafka, security_protocol, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False): super(ProducerPerformanceService, self).__init__(context, num_nodes) self.kafka = kafka + self.security_protocol = security_protocol self.args = { 'topic': topic, 'num_records': num_records, @@ -38,13 +40,17 @@ def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, t def _worker(self, idx, node): args = self.args.copy() - args.update({'bootstrap_servers': self.kafka.bootstrap_servers()}) + args.update({'bootstrap_servers': self.kafka.bootstrap_servers(self.security_protocol)}) cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance "\ - "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s"\ - " | tee /mnt/producer-performance.log" % args + "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s" % args + if self.security_protocol == SecurityConfig.SSL: + security_config = SecurityConfig(node.account, self.security_protocol) + self.settings.update(security_config.properties) for key, value in self.settings.items(): cmd += " %s=%s" % (str(key), str(value)) + cmd += " | tee /mnt/producer-performance.log" + self.logger.debug("Producer performance %d command: %s", idx, cmd) def parse_stats(line): diff --git a/tests/kafkatest/services/templates/kafka.properties b/tests/kafkatest/services/templates/kafka.properties index 6650d23784924..1e924babe1d74 100644 --- a/tests/kafkatest/services/templates/kafka.properties +++ b/tests/kafkatest/services/templates/kafka.properties @@ -20,6 +20,8 @@ port=9092 #host.name=localhost advertised.host.name={{ node.account.hostname }} #advertised.port= +listeners=PLAINTEXT://:9092,SSL://:9093 +advertised.listeners=PLAINTEXT://{{ node.account.hostname }}:9092,SSL://{{ node.account.hostname }}:9093 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 @@ -39,3 +41,13 @@ log.cleaner.enable=false zookeeper.connect={{ zk.connect_setting() }} zookeeper.connection.timeout.ms=2000 + +security.inter.broker.protocol={{ interbroker_security_protocol }} +ssl.keystore.location=/mnt/ssl/test.keystore.jks +ssl.keystore.password=test-ks-passwd +ssl.key.password=test-key-passwd +ssl.keystore.type=JKS +ssl.truststore.location=/mnt/ssl/test.truststore.jks +ssl.truststore.password=test-ts-passwd +ssl.truststore.type=JKS + diff --git a/tests/kafkatest/services/templates/verifiable_producer.properties b/tests/kafkatest/services/templates/verifiable_producer.properties new file mode 100644 index 0000000000000..e85cf131add84 --- /dev/null +++ b/tests/kafkatest/services/templates/verifiable_producer.properties @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see org.apache.kafka.clients.producer.ProducerConfig for more details + diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index 158db7ab3d1ac..a3660ed8b5888 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -14,22 +14,25 @@ # limitations under the License. from ducktape.services.background_thread import BackgroundThreadService +from kafkatest.utils.security_config import SecurityConfig import json class VerifiableProducer(BackgroundThreadService): + CONFIG_FILE = "/mnt/verifiable_producer.properties" logs = { "producer_log": { "path": "/mnt/producer.log", "collect_default": False} } - def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000): + def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, max_messages=-1, throughput=100000): super(VerifiableProducer, self).__init__(context, num_nodes) self.kafka = kafka + self.security_protocol = security_protocol self.topic = topic self.max_messages = max_messages self.throughput = throughput @@ -38,6 +41,17 @@ def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput self.not_acked_values = [] def _worker(self, idx, node): + # Create and upload config file + # If security protocol is not specified, use the default in the template properties. + prop_file = self.render('verifiable_producer.properties') + security_config = SecurityConfig(node.account, self.security_protocol, prop_file) + prop_file += str(security_config) + if self.security_protocol is None: + self.security_protocol = security_config.security_protocol + self.logger.info("verifiable_producer.properties:") + self.logger.info(prop_file) + node.account.create_file(VerifiableProducer.CONFIG_FILE, prop_file) + cmd = self.start_cmd self.logger.debug("VerifiableProducer %d command: %s" % (idx, cmd)) @@ -58,12 +72,13 @@ def _worker(self, idx, node): @property def start_cmd(self): cmd = "/opt/kafka/bin/kafka-verifiable-producer.sh" \ - " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers()) + " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_protocol)) if self.max_messages > 0: cmd += " --max-messages %s" % str(self.max_messages) if self.throughput > 0: cmd += " --throughput %s" % str(self.throughput) + cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE cmd += " 2>> /mnt/producer.log | tee -a /mnt/producer.log &" return cmd @@ -99,7 +114,7 @@ def stop_node(self, node): def clean_node(self, node): node.account.kill_process("VerifiableProducer", clean_shutdown=False, allow_fail=False) - node.account.ssh("rm -rf /mnt/producer.log", allow_fail=False) + node.account.ssh("rm -rf /mnt/producer.log /mnt/verifiable_producer.properties /mnt/ssl", allow_fail=False) def try_parse_json(self, string): """Try to parse a string as json. Return None if not parseable.""" diff --git a/tests/kafkatest/tests/benchmark_test.py b/tests/kafkatest/tests/benchmark_test.py index 02503ec44b025..b489167952e08 100644 --- a/tests/kafkatest/tests/benchmark_test.py +++ b/tests/kafkatest/tests/benchmark_test.py @@ -14,10 +14,12 @@ # limitations under the License. from ducktape.services.service import Service +from ducktape.tests.test import Test from ducktape.mark import parametrize from ducktape.mark import matrix -from kafkatest.tests.kafka_test import KafkaTest +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService @@ -26,16 +28,22 @@ DEFAULT_RECORD_SIZE = 100 # bytes -class Benchmark(KafkaTest): +class Benchmark(Test): """A benchmark of Kafka producer/consumer performance. This replicates the test run here: https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines """ def __init__(self, test_context): - super(Benchmark, self).__init__(test_context, num_zk=1, num_brokers=3, topics={ + super(Benchmark, self).__init__(test_context) + self.test_context = test_context + self.num_zk = 1 + self.num_brokers = 3 + self.topics = { TOPIC_REP_ONE: {'partitions': 6, 'replication-factor': 1}, TOPIC_REP_THREE: {'partitions': 6, 'replication-factor': 3} - }) + } + + self.zk = ZookeeperService(test_context, self.num_zk) self.msgs_large = 10000000 self.batch_size = 8*1024 @@ -44,25 +52,35 @@ def __init__(self, test_context): self.target_data_size = 128*1024*1024 self.target_data_size_gb = self.target_data_size/float(1024*1024*1024) - @parametrize(acks=1, topic=TOPIC_REP_ONE, num_producers=1, message_size=DEFAULT_RECORD_SIZE) - @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=1, message_size=DEFAULT_RECORD_SIZE) - @parametrize(acks=-1, topic=TOPIC_REP_THREE, num_producers=1, message_size=DEFAULT_RECORD_SIZE) - @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3, message_size=DEFAULT_RECORD_SIZE) - @matrix(acks=[1], topic=[TOPIC_REP_THREE], num_producers=[1], message_size=[10, 100, 1000, 10000, 100000]) - def test_producer_throughput(self, acks, topic, num_producers, message_size): + def setUp(self): + self.zk.start() + + def start_kafka(self, interbroker_security_protocol): + self.kafka = KafkaService( + self.test_context, self.num_brokers, + self.zk, interbroker_security_protocol=interbroker_security_protocol, topics=self.topics) + self.kafka.start() + + @parametrize(acks=1, topic=TOPIC_REP_ONE, num_producers=1, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT') + @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=1, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT') + @parametrize(acks=-1, topic=TOPIC_REP_THREE, num_producers=1, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT') + @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT') + @matrix(acks=[1], topic=[TOPIC_REP_THREE], num_producers=[1], message_size=[10, 100, 1000, 10000, 100000], security_protocol=['PLAINTEXT', 'SSL']) + def test_producer_throughput(self, acks, topic, num_producers, message_size, security_protocol): """ Setup: 1 node zk + 3 node kafka cluster Produce ~128MB worth of messages to a topic with 6 partitions. Required acks, topic replication factor, - and message size are varied depending on arguments injected into this test. + security protocol and message size are varied depending on arguments injected into this test. Collect and return aggregate throughput statistics after all messages have been acknowledged. (This runs ProducerPerformance.java under the hood) """ + self.start_kafka(security_protocol) # Always generate the same total amount of data nrecords = int(self.target_data_size / message_size) self.producer = ProducerPerformanceService( - self.test_context, num_producers, self.kafka, topic=topic, + self.test_context, num_producers, self.kafka, security_protocol=security_protocol, topic=topic, num_records=nrecords, record_size=message_size, throughput=-1, settings={ 'acks': acks, @@ -71,7 +89,8 @@ def test_producer_throughput(self, acks, topic, num_producers, message_size): self.producer.run() return compute_aggregate_throughput(self.producer) - def test_long_term_producer_throughput(self): + @matrix(security_protocol=['PLAINTEXT', 'SSL']) + def test_long_term_producer_throughput(self, security_protocol): """ Setup: 1 node zk + 3 node kafka cluster Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1. @@ -80,8 +99,9 @@ def test_long_term_producer_throughput(self): (This runs ProducerPerformance.java under the hood) """ + self.start_kafka(security_protocol) self.producer = ProducerPerformanceService( - self.test_context, 1, self.kafka, + self.test_context, 1, self.kafka, security_protocol=security_protocol, topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE, throughput=-1, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}, intermediate_stats=True @@ -111,7 +131,11 @@ def test_long_term_producer_throughput(self): self.logger.info("\n".join(summary)) return data - def test_end_to_end_latency(self): + + @parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='PLAINTEXT') + @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') + @parametrize(security_protocol='SSL', interbroker_security_protocol='SSL') + def test_end_to_end_latency(self, security_protocol, interbroker_security_protocol): """ Setup: 1 node zk + 3 node kafka cluster Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3, @@ -121,16 +145,18 @@ def test_end_to_end_latency(self): (Under the hood, this simply runs EndToEndLatency.scala) """ + self.start_kafka(interbroker_security_protocol) self.logger.info("BENCHMARK: End to end latency") self.perf = EndToEndLatencyService( self.test_context, 1, self.kafka, - topic=TOPIC_REP_THREE, num_records=10000 + topic=TOPIC_REP_THREE, security_protocol=security_protocol, num_records=10000 ) self.perf.run() return latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms']) - @matrix(new_consumer=[True, False]) - def test_producer_and_consumer(self, new_consumer=False): + @parametrize(new_consumer=True, security_protocol='SSL') + @matrix(new_consumer=[True, False], security_protocol=['PLAINTEXT']) + def test_producer_and_consumer(self, new_consumer, security_protocol): """ Setup: 1 node zk + 3 node kafka cluster Concurrently produce and consume 10e6 messages with a single producer and a single consumer, @@ -140,15 +166,17 @@ def test_producer_and_consumer(self, new_consumer=False): (Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.scala) """ + self.start_kafka(security_protocol) num_records = 10 * 1000 * 1000 # 10e6 self.producer = ProducerPerformanceService( - self.test_context, 1, self.kafka, - topic=TOPIC_REP_THREE, num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, + self.test_context, 1, self.kafka, + topic=TOPIC_REP_THREE, security_protocol=security_protocol, + num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory} ) self.consumer = ConsumerPerformanceService( - self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records) + self.test_context, 1, self.kafka, security_protocol, topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records) Service.run_parallel(self.producer, self.consumer) data = { @@ -161,18 +189,21 @@ def test_producer_and_consumer(self, new_consumer=False): self.logger.info("\n".join(summary)) return data - @matrix(new_consumer=[True, False], num_consumers=[1]) - def test_consumer_throughput(self, new_consumer, num_consumers): + @parametrize(new_consumer=True, security_protocol='SSL', num_consumers=1) + @matrix(new_consumer=[True, False], security_protocol=['PLAINTEXT'], num_consumers=[1]) + def test_consumer_throughput(self, new_consumer, security_protocol, num_consumers): """ Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions (using new consumer iff new_consumer == True), and report throughput. """ + self.start_kafka(security_protocol) num_records = 10 * 1000 * 1000 # 10e6 # seed kafka w/messages self.producer = ProducerPerformanceService( self.test_context, 1, self.kafka, - topic=TOPIC_REP_THREE, num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, + topic=TOPIC_REP_THREE, security_protocol=security_protocol, + num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory} ) self.producer.run() @@ -180,7 +211,7 @@ def test_consumer_throughput(self, new_consumer, num_consumers): # consume self.consumer = ConsumerPerformanceService( self.test_context, num_consumers, self.kafka, - topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records) + topic=TOPIC_REP_THREE, security_protocol=security_protocol, new_consumer=new_consumer, messages=num_records) self.consumer.group = "test-consumer-group" self.consumer.run() return compute_aggregate_throughput(self.consumer) diff --git a/tests/kafkatest/tests/replication_test.py b/tests/kafkatest/tests/replication_test.py index a83769b509439..52173bdfacece 100644 --- a/tests/kafkatest/tests/replication_test.py +++ b/tests/kafkatest/tests/replication_test.py @@ -15,6 +15,8 @@ from ducktape.tests.test import Test from ducktape.utils.util import wait_until +from ducktape.mark import parametrize +from ducktape.mark import matrix from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService @@ -36,24 +38,18 @@ def __init__(self, test_context): self.topic = "test_topic" self.zk = ZookeeperService(test_context, num_nodes=1) - self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: { - "partitions": 3, - "replication-factor": 3, - "min.insync.replicas": 2} - }) self.producer_throughput = 10000 self.num_producers = 1 self.num_consumers = 1 def setUp(self): self.zk.start() - self.kafka.start() def min_cluster_size(self): """Override this since we're adding services outside of the constructor""" return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers - def run_with_failure(self, failure): + def run_with_failure(self, failure, interbroker_security_protocol): """This is the top-level test template. The steps are: @@ -75,8 +71,16 @@ def run_with_failure(self, failure): indicator that nothing is left to consume. """ - self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput) - self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=3000, message_validator=is_int) + self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, + interbroker_security_protocol=interbroker_security_protocol, + topics={self.topic: { + "partitions": 3, + "replication-factor": 3, + "min.insync.replicas": 2} + }) + self.kafka.start() + self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, security_protocol='PLAINTEXT', throughput=self.producer_throughput) + self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, security_protocol='PLAINTEXT', new_consumer=False, consumer_timeout_ms=3000, message_validator=is_int) # Produce in a background thread while driving broker failures self.producer.start() @@ -149,14 +153,19 @@ def validate(self): return success, msg - def test_clean_shutdown(self): - self.run_with_failure(self.clean_shutdown) + + @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL']) + def test_clean_shutdown(self, interbroker_security_protocol): + self.run_with_failure(self.clean_shutdown, interbroker_security_protocol) - def test_hard_shutdown(self): - self.run_with_failure(self.hard_shutdown) + @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL']) + def test_hard_shutdown(self, interbroker_security_protocol): + self.run_with_failure(self.hard_shutdown, interbroker_security_protocol) - def test_clean_bounce(self): - self.run_with_failure(self.clean_bounce) + @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL']) + def test_clean_bounce(self, interbroker_security_protocol): + self.run_with_failure(self.clean_bounce, interbroker_security_protocol) - def test_hard_bounce(self): - self.run_with_failure(self.hard_bounce) + @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL']) + def test_hard_bounce(self, interbroker_security_protocol): + self.run_with_failure(self.hard_bounce, interbroker_security_protocol) diff --git a/tests/kafkatest/utils/security_config.py b/tests/kafkatest/utils/security_config.py new file mode 100644 index 0000000000000..c98477272a898 --- /dev/null +++ b/tests/kafkatest/utils/security_config.py @@ -0,0 +1,132 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import subprocess +import tempfile + + +class Keytool: + + @staticmethod + def generate_keystore_truststore(dir='.'): + """ + Generate JKS keystore and truststore and return + Kafka SSL properties with these stores. + """ + ksPath = os.path.join(dir, 'test.keystore.jks') + ksPassword = 'test-ks-passwd' + keyPassword = 'test-key-passwd' + tsPath = os.path.join(dir, 'test.truststore.jks') + tsPassword = 'test-ts-passwd' + if os.path.exists(ksPath): + os.remove(ksPath) + if os.path.exists(tsPath): + os.remove(tsPath) + + Keytool.runcmd("keytool -genkeypair -alias test -keyalg RSA -keysize 2048 -keystore %s -storetype JKS -keypass %s -storepass %s -dname CN=systemtest" % (ksPath, keyPassword, ksPassword)) + Keytool.runcmd("keytool -export -alias test -keystore %s -storepass %s -storetype JKS -rfc -file test.crt" % (ksPath, ksPassword)) + Keytool.runcmd("keytool -import -alias test -file test.crt -keystore %s -storepass %s -storetype JKS -noprompt" % (tsPath, tsPassword)) + os.remove('test.crt') + + return { + 'ssl.keystore.location' : ksPath, + 'ssl.keystore.password' : ksPassword, + 'ssl.key.password' : keyPassword, + 'ssl.truststore.location' : tsPath, + 'ssl.truststore.password' : tsPassword + } + + @staticmethod + def runcmd(cmd): + proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + proc.communicate() + if proc.returncode != 0: + raise subprocess.CalledProcessError(proc.returncode, cmd) + + +class SecurityConfig: + + PLAINTEXT = 'PLAINTEXT' + SSL = 'SSL' + SSL_DIR = "/mnt/ssl" + KEYSTORE_PATH = "/mnt/ssl/test.keystore.jks" + TRUSTSTORE_PATH = "/mnt/ssl/test.truststore.jks" + + ssl_stores = Keytool.generate_keystore_truststore('.') + + def __init__(self, node_account, security_protocol, template_props=""): + """ + Initialize the security properties for the node and copy + keystore and truststore to the remote node if the transport protocol + is SSL. If security_protocol is None, the protocol specified in the + template properties file is used. If no protocol is specified in the + template properties either, PLAINTEXT is used as default. + """ + + self.node_account = node_account + if security_protocol is None: + for line in template_props.split("\n"): + items = line.split("=") + if len(items) == 2 and items[0].strip() == 'security.protocol': + security_protocol = str(items[1].strip()) + if security_protocol not in [SecurityConfig.PLAINTEXT, SecurityConfig.SSL]: + raise Exception("Invalid security.protocol in template properties: " + security_protocol) + if security_protocol is None: + security_protocol = SecurityConfig.PLAINTEXT + if security_protocol == SecurityConfig.SSL: + node_account.ssh("mkdir -p %s" % SecurityConfig.SSL_DIR, allow_fail=False) + node_account.scp_to(SecurityConfig.ssl_stores['ssl.keystore.location'], SecurityConfig.KEYSTORE_PATH) + node_account.scp_to(SecurityConfig.ssl_stores['ssl.truststore.location'], SecurityConfig.TRUSTSTORE_PATH) + + self.properties = { + 'security.protocol' : security_protocol, + 'ssl.keystore.location' : SecurityConfig.KEYSTORE_PATH, + 'ssl.keystore.password' : SecurityConfig.ssl_stores['ssl.keystore.password'], + 'ssl.key.password' : SecurityConfig.ssl_stores['ssl.key.password'], + 'ssl.truststore.location' : SecurityConfig.TRUSTSTORE_PATH, + 'ssl.truststore.password' : SecurityConfig.ssl_stores['ssl.truststore.password'] + } + + def write_to_file(self, prop_file='/mnt/ssl/security.properties'): + """Write security properties to a remote file""" + tmpfile = tempfile.NamedTemporaryFile(delete=False) + localfile = tmpfile.name + with open(localfile, 'w') as fd: + for key, value in self.properties.items(): + fd.write(key + "=" + value + "\n") + self.node_account.scp_to(localfile, prop_file) + os.remove(localfile) + return prop_file + + @property + def security_protocol(self): + return self.properties['security.protocol'] + + def __str__(self): + """ + Return properties as string with line separators. + This is used to append security config properties to + a properties file. + """ + + prop_str = "" + if self.security_protocol == SecurityConfig.SSL: + for key, value in self.properties.items(): + prop_str += ("\n" + key + "=" + value) + prop_str += "\n" + return prop_str + diff --git a/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java index b0e19fc4f46eb..a79f78e305c53 100644 --- a/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java +++ b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.utils.Utils; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -128,6 +129,13 @@ private static ArgumentParser argParser() { .metavar("ACKS") .help("Acks required on each produced message. See Kafka docs on request.required.acks for details."); + parser.addArgument("--producer.config") + .action(store()) + .required(false) + .type(String.class) + .metavar("CONFIG_FILE") + .help("Producer config properties file."); + return parser; } @@ -143,6 +151,7 @@ public static VerifiableProducer createFromArgs(String[] args) { int maxMessages = res.getInt("maxMessages"); String topic = res.getString("topic"); int throughput = res.getInt("throughput"); + String configFile = res.getString("producer.config"); Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList")); @@ -153,6 +162,13 @@ public static VerifiableProducer createFromArgs(String[] args) { producerProps.put(ProducerConfig.ACKS_CONFIG, Integer.toString(res.getInt("acks"))); // No producer retries producerProps.put("retries", "0"); + if (configFile != null) { + try { + producerProps.putAll(Utils.loadProps(configFile)); + } catch (IOException e) { + throw new ArgumentParserException(e.getMessage(), parser); + } + } producer = new VerifiableProducer(producerProps, topic, throughput, maxMessages); } catch (ArgumentParserException e) { From d0385afaa2070444ccb5d3201484da264d8cbd31 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Wed, 7 Oct 2015 13:28:46 +0000 Subject: [PATCH 2/4] KAFKA-2581: Address review comments --- .../sanity_checks/test_console_consumer.py | 8 +- tests/kafkatest/services/console_consumer.py | 47 +++++----- tests/kafkatest/services/kafka.py | 26 +++--- .../performance/consumer_performance.py | 6 +- .../performance/end_to_end_latency.py | 8 +- .../performance/producer_performance.py | 7 +- .../services/templates/kafka.properties | 5 ++ .../kafkatest/services/verifiable_producer.py | 23 ++--- tests/kafkatest/tests/benchmark_test.py | 49 ++++++----- tests/kafkatest/tests/replication_test.py | 6 +- tests/kafkatest/utils/security_config.py | 87 ++++++++++--------- 11 files changed, 150 insertions(+), 122 deletions(-) diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py index b35834b783033..d6c21e1a20bc9 100644 --- a/tests/kafkatest/sanity_checks/test_console_consumer.py +++ b/tests/kafkatest/sanity_checks/test_console_consumer.py @@ -33,16 +33,18 @@ def __init__(self, test_context): self.test_context = test_context self.topic = "topic" self.zk = ZookeeperService(test_context, num_nodes=1) - self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, - topics={self.topic: {"partitions": 1, "replication-factor": 1}}) def setUp(self): self.zk.start() - self.kafka.start() @parametrize(security_protocol='SSL', new_consumer=True) @matrix(security_protocol=['PLAINTEXT'], new_consumer=[False, True]) def test_lifecycle(self, security_protocol, new_consumer): + self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, + security_protocol=security_protocol, + topics={self.topic: {"partitions": 1, "replication-factor": 1}}) + self.kafka.start() + t0 = time.time() self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, security_protocol=security_protocol, new_consumer=new_consumer) self.consumer.start() diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index f4ea91ead67bb..5e5d6a9702532 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -94,7 +94,7 @@ class ConsoleConsumer(BackgroundThreadService): "collect_default": True} } - def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, new_consumer=False, message_validator=None, from_beginning=True, consumer_timeout_ms=None): + def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, new_consumer=None, message_validator=None, from_beginning=True, consumer_timeout_ms=None): """ Args: context: standard context @@ -112,7 +112,6 @@ def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, new """ super(ConsoleConsumer, self).__init__(context, num_nodes) self.kafka = kafka - self.security_protocol = security_protocol self.new_consumer = new_consumer self.args = { 'topic': topic, @@ -124,6 +123,22 @@ def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, new self.message_validator = message_validator self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)} + # Process client configuration + if self.consumer_timeout_ms is not None: + self.prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms) + else: + self.prop_file = self.render('console_consumer.properties') + + # Add security properties to the config. If security protocol is not specified, + # use the default in the template properties. + self.security_config = SecurityConfig(security_protocol, self.prop_file) + self.security_protocol = self.security_config.security_protocol + if self.new_consumer is None: + self.new_consumer = self.security_protocol == SecurityConfig.SSL + if self.security_protocol == SecurityConfig.SSL and not self.new_consumer: + raise Exception("SSL protocol is supported only with the new consumer") + self.prop_file += str(self.security_config) + @property def start_cmd(self): args = self.args.copy() @@ -134,11 +149,13 @@ def start_cmd(self): cmd = "export LOG_DIR=%s;" % ConsoleConsumer.LOG_DIR cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsoleConsumer.LOG4J_CONFIG - cmd += " /opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s --zookeeper %(zk_connect)s" \ + cmd += " /opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s" \ " --consumer.config %(config_file)s" % args if self.new_consumer: - cmd += " --new-consumer --bootstrap-server %s" % self.kafka.bootstrap_servers(self.security_protocol) + cmd += " --new-consumer --bootstrap-server %s" % self.kafka.bootstrap_servers() + else: + cmd += " --zookeeper %(zk_connect)s" % args if self.from_beginning: cmd += " --from-beginning" @@ -159,23 +176,10 @@ def alive(self, node): def _worker(self, idx, node): node.account.ssh("mkdir -p %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False) - # Create and upload config file - if self.consumer_timeout_ms is not None: - prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms) - else: - prop_file = self.render('console_consumer.properties') - - # Add security properties to the config. If security protocol is not specified, - # use the default in the template properties. - security_config = SecurityConfig(node.account, self.security_protocol, prop_file) - prop_file += str(security_config) - if self.security_protocol is None: - self.security_protocol = security_config.security_protocol - if self.security_protocol == SecurityConfig.SSL: - self.new_consumer = True self.logger.info("console_consumer.properties:") - self.logger.info(prop_file) - node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file) + self.logger.info(self.prop_file) + node.account.create_file(ConsoleConsumer.CONFIG_FILE, self.prop_file) + self.security_config.setup_node(node) # Create and upload log properties log_config = self.render('tools_log4j.properties', log_file=ConsoleConsumer.LOG_FILE) @@ -204,5 +208,6 @@ def clean_node(self, node): self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." % (self.__class__.__name__, node.account)) node.account.kill_process("java", clean_shutdown=False, allow_fail=True) - node.account.ssh("rm -rf %s /mnt/ssl" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False) + node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False) + self.security_config.clean_node(node) diff --git a/tests/kafkatest/services/kafka.py b/tests/kafkatest/services/kafka.py index 0f8e1e391270b..2c200f37927ef 100644 --- a/tests/kafkatest/services/kafka.py +++ b/tests/kafkatest/services/kafka.py @@ -34,7 +34,7 @@ class KafkaService(Service): "collect_default": False} } - def __init__(self, context, num_nodes, zk, interbroker_security_protocol=SecurityConfig.PLAINTEXT, topics=None): + def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT, topics=None): """ :type context :type zk: ZookeeperService @@ -42,7 +42,13 @@ def __init__(self, context, num_nodes, zk, interbroker_security_protocol=Securit """ super(KafkaService, self).__init__(context, num_nodes) self.zk = zk + if security_protocol == SecurityConfig.SSL or interbroker_security_protocol == SecurityConfig.SSL: + self.security_config = SecurityConfig(SecurityConfig.SSL) + else: + self.security_config = SecurityConfig(SecurityConfig.PLAINTEXT) + self.security_protocol = security_protocol self.interbroker_security_protocol = interbroker_security_protocol + self.port = 9092 if security_protocol == SecurityConfig.PLAINTEXT else 9093 self.topics = topics def start(self): @@ -58,12 +64,13 @@ def start(self): self.create_topic(topic_cfg) def start_node(self, node): - props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node), interbroker_security_protocol=self.interbroker_security_protocol) + props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node), + port = self.port, security_protocol = self.security_protocol, + interbroker_security_protocol=self.interbroker_security_protocol) self.logger.info("kafka.properties:") self.logger.info(props_file) node.account.create_file("/mnt/kafka.properties", props_file) - # Create keystore and truststore for SSL endpoint - SecurityConfig(node.account, SecurityConfig.SSL) + self.security_config.setup_node(node) cmd = "/opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log & echo $! > /mnt/kafka.pid" self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd)) @@ -100,7 +107,8 @@ def stop_node(self, node, clean_shutdown=True): def clean_node(self, node): node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True) - node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid /mnt/ssl", allow_fail=False) + node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid", allow_fail=False) + self.security_config.clean_node(node) def create_topic(self, topic_cfg): node = self.nodes[0] # any node is fine here @@ -230,11 +238,7 @@ def leader(self, topic, partition=0): self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx)) return self.get_node(leader_idx) - def bootstrap_servers(self, security_protocol=SecurityConfig.PLAINTEXT): + def bootstrap_servers(self): """Get the broker list to connect to Kafka using the specified security protocol """ - if security_protocol == SecurityConfig.SSL: - port = 9093 - else: - port = 9092 - return ','.join([node.account.hostname + ":" + `port` for node in self.nodes]) + return ','.join([node.account.hostname + ":" + `self.port` for node in self.nodes]) diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py index e756b6715ad38..b8eab22471749 100644 --- a/tests/kafkatest/services/performance/consumer_performance.py +++ b/tests/kafkatest/services/performance/consumer_performance.py @@ -68,6 +68,7 @@ class ConsumerPerformanceService(PerformanceService): def __init__(self, context, num_nodes, kafka, security_protocol, topic, messages, new_consumer=False, settings={}): super(ConsumerPerformanceService, self).__init__(context, num_nodes) self.kafka = kafka + self.security_config = SecurityConfig(security_protocol) self.security_protocol = security_protocol self.topic = topic self.messages = messages @@ -92,7 +93,7 @@ def args(self): if self.new_consumer: args['new-consumer'] = "" - args['broker-list'] = self.kafka.bootstrap_servers(self.security_protocol) + args['broker-list'] = self.kafka.bootstrap_servers() else: args['zookeeper'] = self.kafka.zk.connect_setting() @@ -136,7 +137,8 @@ def _worker(self, idx, node): log_config = self.render('tools_log4j.properties', log_file=ConsumerPerformanceService.LOG_FILE) node.account.create_file(ConsumerPerformanceService.LOG4J_CONFIG, log_config) - SecurityConfig(node.account, self.security_protocol).write_to_file(ConsumerPerformanceService.CONFIG_FILE) + node.account.create_file(ConsumerPerformanceService.CONFIG_FILE, str(self.security_config)) + self.security_config.setup_node(node) cmd = self.start_cmd self.logger.debug("Consumer performance %d command: %s", idx, cmd) diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py index 3562563de64f2..0559a43174e81 100644 --- a/tests/kafkatest/services/performance/end_to_end_latency.py +++ b/tests/kafkatest/services/performance/end_to_end_latency.py @@ -28,6 +28,7 @@ class EndToEndLatencyService(PerformanceService): def __init__(self, context, num_nodes, kafka, security_protocol, topic, num_records, consumer_fetch_max_wait=100, acks=1): super(EndToEndLatencyService, self).__init__(context, num_nodes) self.kafka = kafka + self.security_config = SecurityConfig(security_protocol) self.security_protocol = security_protocol self.args = { 'topic': topic, @@ -38,14 +39,15 @@ def __init__(self, context, num_nodes, kafka, security_protocol, topic, num_reco def _worker(self, idx, node): args = self.args.copy() + self.security_config.setup_node(node) if self.security_protocol == SecurityConfig.SSL: - ssl_config = SecurityConfig(node.account, self.security_protocol) - ssl_config_file = ssl_config.write_to_file() + ssl_config_file = SecurityConfig.SSL_DIR + "/security.properties" + node.account.create_file(ssl_config_file, str(self.security_config)) else: ssl_config_file = "" args.update({ 'zk_connect': self.kafka.zk.connect_setting(), - 'bootstrap_servers': self.kafka.bootstrap_servers(self.security_protocol), + 'bootstrap_servers': self.kafka.bootstrap_servers(), 'ssl_config_file': ssl_config_file }) diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py index 1af35a52527f6..7a026fcfd7806 100644 --- a/tests/kafkatest/services/performance/producer_performance.py +++ b/tests/kafkatest/services/performance/producer_performance.py @@ -28,6 +28,7 @@ class ProducerPerformanceService(PerformanceService): def __init__(self, context, num_nodes, kafka, security_protocol, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False): super(ProducerPerformanceService, self).__init__(context, num_nodes) self.kafka = kafka + self.security_config = SecurityConfig(security_protocol) self.security_protocol = security_protocol self.args = { 'topic': topic, @@ -40,13 +41,13 @@ def __init__(self, context, num_nodes, kafka, security_protocol, topic, num_reco def _worker(self, idx, node): args = self.args.copy() - args.update({'bootstrap_servers': self.kafka.bootstrap_servers(self.security_protocol)}) + args.update({'bootstrap_servers': self.kafka.bootstrap_servers()}) cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance "\ "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s" % args + self.security_config.setup_node(node) if self.security_protocol == SecurityConfig.SSL: - security_config = SecurityConfig(node.account, self.security_protocol) - self.settings.update(security_config.properties) + self.settings.update(self.security_config.properties) for key, value in self.settings.items(): cmd += " %s=%s" % (str(key), str(value)) cmd += " | tee /mnt/producer-performance.log" diff --git a/tests/kafkatest/services/templates/kafka.properties b/tests/kafkatest/services/templates/kafka.properties index 1e924babe1d74..036a8dba6a91e 100644 --- a/tests/kafkatest/services/templates/kafka.properties +++ b/tests/kafkatest/services/templates/kafka.properties @@ -20,8 +20,13 @@ port=9092 #host.name=localhost advertised.host.name={{ node.account.hostname }} #advertised.port= +{% if security_protocol == interbroker_security_protocol %} +listeners={{ security_protocol }}://:{{ port }} +advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:{{ port }} +{% else %} listeners=PLAINTEXT://:9092,SSL://:9093 advertised.listeners=PLAINTEXT://{{ node.account.hostname }}:9092,SSL://{{ node.account.hostname }}:9093 +{% endif %} num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index a3660ed8b5888..91189fde17266 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -32,7 +32,6 @@ def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, max super(VerifiableProducer, self).__init__(context, num_nodes) self.kafka = kafka - self.security_protocol = security_protocol self.topic = topic self.max_messages = max_messages self.throughput = throughput @@ -40,17 +39,18 @@ def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, max self.acked_values = [] self.not_acked_values = [] + # If security protocol is not specified, use the default in the template properties. + self.prop_file = self.render('verifiable_producer.properties') + self.security_config = SecurityConfig(security_protocol, self.prop_file) + self.security_protocol = self.security_config.security_protocol + self.prop_file += str(self.security_config) + def _worker(self, idx, node): # Create and upload config file - # If security protocol is not specified, use the default in the template properties. - prop_file = self.render('verifiable_producer.properties') - security_config = SecurityConfig(node.account, self.security_protocol, prop_file) - prop_file += str(security_config) - if self.security_protocol is None: - self.security_protocol = security_config.security_protocol self.logger.info("verifiable_producer.properties:") - self.logger.info(prop_file) - node.account.create_file(VerifiableProducer.CONFIG_FILE, prop_file) + self.logger.info(self.prop_file) + node.account.create_file(VerifiableProducer.CONFIG_FILE, self.prop_file) + self.security_config.setup_node(node) cmd = self.start_cmd self.logger.debug("VerifiableProducer %d command: %s" % (idx, cmd)) @@ -72,7 +72,7 @@ def _worker(self, idx, node): @property def start_cmd(self): cmd = "/opt/kafka/bin/kafka-verifiable-producer.sh" \ - " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_protocol)) + " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers()) if self.max_messages > 0: cmd += " --max-messages %s" % str(self.max_messages) if self.throughput > 0: @@ -114,7 +114,8 @@ def stop_node(self, node): def clean_node(self, node): node.account.kill_process("VerifiableProducer", clean_shutdown=False, allow_fail=False) - node.account.ssh("rm -rf /mnt/producer.log /mnt/verifiable_producer.properties /mnt/ssl", allow_fail=False) + node.account.ssh("rm -rf /mnt/producer.log /mnt/verifiable_producer.properties", allow_fail=False) + self.security_config.clean_node(node) def try_parse_json(self, string): """Try to parse a string as json. Return None if not parseable.""" diff --git a/tests/kafkatest/tests/benchmark_test.py b/tests/kafkatest/tests/benchmark_test.py index b489167952e08..0e936162b44dd 100644 --- a/tests/kafkatest/tests/benchmark_test.py +++ b/tests/kafkatest/tests/benchmark_test.py @@ -55,18 +55,19 @@ def __init__(self, test_context): def setUp(self): self.zk.start() - def start_kafka(self, interbroker_security_protocol): + def start_kafka(self, security_protocol, interbroker_security_protocol): self.kafka = KafkaService( - self.test_context, self.num_brokers, - self.zk, interbroker_security_protocol=interbroker_security_protocol, topics=self.topics) + self.test_context, self.num_brokers, + self.zk, security_protocol=security_protocol, + interbroker_security_protocol=interbroker_security_protocol, topics=self.topics) self.kafka.start() - @parametrize(acks=1, topic=TOPIC_REP_ONE, num_producers=1, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT') - @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=1, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT') - @parametrize(acks=-1, topic=TOPIC_REP_THREE, num_producers=1, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT') - @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT') - @matrix(acks=[1], topic=[TOPIC_REP_THREE], num_producers=[1], message_size=[10, 100, 1000, 10000, 100000], security_protocol=['PLAINTEXT', 'SSL']) - def test_producer_throughput(self, acks, topic, num_producers, message_size, security_protocol): + @parametrize(acks=1, topic=TOPIC_REP_ONE) + @parametrize(acks=1, topic=TOPIC_REP_THREE) + @parametrize(acks=-1, topic=TOPIC_REP_THREE) + @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3) + @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], security_protocol=['PLAINTEXT', 'SSL']) + def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT'): """ Setup: 1 node zk + 3 node kafka cluster Produce ~128MB worth of messages to a topic with 6 partitions. Required acks, topic replication factor, @@ -75,7 +76,7 @@ def test_producer_throughput(self, acks, topic, num_producers, message_size, sec Collect and return aggregate throughput statistics after all messages have been acknowledged. (This runs ProducerPerformance.java under the hood) """ - self.start_kafka(security_protocol) + self.start_kafka(security_protocol, security_protocol) # Always generate the same total amount of data nrecords = int(self.target_data_size / message_size) @@ -89,8 +90,9 @@ def test_producer_throughput(self, acks, topic, num_producers, message_size, sec self.producer.run() return compute_aggregate_throughput(self.producer) - @matrix(security_protocol=['PLAINTEXT', 'SSL']) - def test_long_term_producer_throughput(self, security_protocol): + @parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='PLAINTEXT') + @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT', 'SSL']) + def test_long_term_producer_throughput(self, security_protocol, interbroker_security_protocol): """ Setup: 1 node zk + 3 node kafka cluster Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1. @@ -99,7 +101,7 @@ def test_long_term_producer_throughput(self, security_protocol): (This runs ProducerPerformance.java under the hood) """ - self.start_kafka(security_protocol) + self.start_kafka(security_protocol, security_protocol) self.producer = ProducerPerformanceService( self.test_context, 1, self.kafka, security_protocol=security_protocol, topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE, @@ -133,8 +135,7 @@ def test_long_term_producer_throughput(self, security_protocol): @parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='PLAINTEXT') - @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') - @parametrize(security_protocol='SSL', interbroker_security_protocol='SSL') + @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT', 'SSL']) def test_end_to_end_latency(self, security_protocol, interbroker_security_protocol): """ Setup: 1 node zk + 3 node kafka cluster @@ -145,7 +146,7 @@ def test_end_to_end_latency(self, security_protocol, interbroker_security_protoc (Under the hood, this simply runs EndToEndLatency.scala) """ - self.start_kafka(interbroker_security_protocol) + self.start_kafka(security_protocol, interbroker_security_protocol) self.logger.info("BENCHMARK: End to end latency") self.perf = EndToEndLatencyService( self.test_context, 1, self.kafka, @@ -154,9 +155,10 @@ def test_end_to_end_latency(self, security_protocol, interbroker_security_protoc self.perf.run() return latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms']) - @parametrize(new_consumer=True, security_protocol='SSL') + @parametrize(new_consumer=True, security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') + @parametrize(new_consumer=True, security_protocol='SSL', interbroker_security_protocol='SSL') @matrix(new_consumer=[True, False], security_protocol=['PLAINTEXT']) - def test_producer_and_consumer(self, new_consumer, security_protocol): + def test_producer_and_consumer(self, new_consumer, security_protocol, interbroker_security_protocol='PLAINTEXT'): """ Setup: 1 node zk + 3 node kafka cluster Concurrently produce and consume 10e6 messages with a single producer and a single consumer, @@ -166,7 +168,7 @@ def test_producer_and_consumer(self, new_consumer, security_protocol): (Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.scala) """ - self.start_kafka(security_protocol) + self.start_kafka(security_protocol, interbroker_security_protocol) num_records = 10 * 1000 * 1000 # 10e6 self.producer = ProducerPerformanceService( @@ -189,14 +191,15 @@ def test_producer_and_consumer(self, new_consumer, security_protocol): self.logger.info("\n".join(summary)) return data - @parametrize(new_consumer=True, security_protocol='SSL', num_consumers=1) - @matrix(new_consumer=[True, False], security_protocol=['PLAINTEXT'], num_consumers=[1]) - def test_consumer_throughput(self, new_consumer, security_protocol, num_consumers): + @parametrize(new_consumer=True, security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') + @parametrize(new_consumer=True, security_protocol='SSL', interbroker_security_protocol='SSL') + @matrix(new_consumer=[True, False], security_protocol=['PLAINTEXT']) + def test_consumer_throughput(self, new_consumer, security_protocol, interbroker_security_protocol='PLAINTEXT', num_consumers=1): """ Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions (using new consumer iff new_consumer == True), and report throughput. """ - self.start_kafka(security_protocol) + self.start_kafka(security_protocol, interbroker_security_protocol) num_records = 10 * 1000 * 1000 # 10e6 # seed kafka w/messages diff --git a/tests/kafkatest/tests/replication_test.py b/tests/kafkatest/tests/replication_test.py index 52173bdfacece..d20cc220b4b32 100644 --- a/tests/kafkatest/tests/replication_test.py +++ b/tests/kafkatest/tests/replication_test.py @@ -71,7 +71,9 @@ def run_with_failure(self, failure, interbroker_security_protocol): indicator that nothing is left to consume. """ + security_protocol='PLAINTEXT' self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, + security_protocol=security_protocol, interbroker_security_protocol=interbroker_security_protocol, topics={self.topic: { "partitions": 3, @@ -79,8 +81,8 @@ def run_with_failure(self, failure, interbroker_security_protocol): "min.insync.replicas": 2} }) self.kafka.start() - self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, security_protocol='PLAINTEXT', throughput=self.producer_throughput) - self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, security_protocol='PLAINTEXT', new_consumer=False, consumer_timeout_ms=3000, message_validator=is_int) + self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, security_protocol=security_protocol, throughput=self.producer_throughput) + self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, security_protocol=security_protocol, new_consumer=False, consumer_timeout_ms=3000, message_validator=is_int) # Produce in a background thread while driving broker failures self.producer.start() diff --git a/tests/kafkatest/utils/security_config.py b/tests/kafkatest/utils/security_config.py index c98477272a898..ae60e3154992e 100644 --- a/tests/kafkatest/utils/security_config.py +++ b/tests/kafkatest/utils/security_config.py @@ -14,12 +14,10 @@ # limitations under the License. import os -import sys import subprocess -import tempfile -class Keytool: +class Keytool(object): @staticmethod def generate_keystore_truststore(dir='.'): @@ -27,27 +25,27 @@ def generate_keystore_truststore(dir='.'): Generate JKS keystore and truststore and return Kafka SSL properties with these stores. """ - ksPath = os.path.join(dir, 'test.keystore.jks') - ksPassword = 'test-ks-passwd' - keyPassword = 'test-key-passwd' - tsPath = os.path.join(dir, 'test.truststore.jks') - tsPassword = 'test-ts-passwd' - if os.path.exists(ksPath): - os.remove(ksPath) - if os.path.exists(tsPath): - os.remove(tsPath) + ks_path = os.path.join(dir, 'test.keystore.jks') + ks_password = 'test-ks-passwd' + key_password = 'test-key-passwd' + ts_path = os.path.join(dir, 'test.truststore.jks') + ts_password = 'test-ts-passwd' + if os.path.exists(ks_path): + os.remove(ks_path) + if os.path.exists(ts_path): + os.remove(ts_path) - Keytool.runcmd("keytool -genkeypair -alias test -keyalg RSA -keysize 2048 -keystore %s -storetype JKS -keypass %s -storepass %s -dname CN=systemtest" % (ksPath, keyPassword, ksPassword)) - Keytool.runcmd("keytool -export -alias test -keystore %s -storepass %s -storetype JKS -rfc -file test.crt" % (ksPath, ksPassword)) - Keytool.runcmd("keytool -import -alias test -file test.crt -keystore %s -storepass %s -storetype JKS -noprompt" % (tsPath, tsPassword)) + Keytool.runcmd("keytool -genkeypair -alias test -keyalg RSA -keysize 2048 -keystore %s -storetype JKS -keypass %s -storepass %s -dname CN=systemtest" % (ks_path, key_password, ks_password)) + Keytool.runcmd("keytool -export -alias test -keystore %s -storepass %s -storetype JKS -rfc -file test.crt" % (ks_path, ks_password)) + Keytool.runcmd("keytool -import -alias test -file test.crt -keystore %s -storepass %s -storetype JKS -noprompt" % (ts_path, ts_password)) os.remove('test.crt') return { - 'ssl.keystore.location' : ksPath, - 'ssl.keystore.password' : ksPassword, - 'ssl.key.password' : keyPassword, - 'ssl.truststore.location' : tsPath, - 'ssl.truststore.password' : tsPassword + 'ssl.keystore.location' : ks_path, + 'ssl.keystore.password' : ks_password, + 'ssl.key.password' : key_password, + 'ssl.truststore.location' : ts_path, + 'ssl.truststore.password' : ts_password } @staticmethod @@ -58,7 +56,7 @@ def runcmd(cmd): raise subprocess.CalledProcessError(proc.returncode, cmd) -class SecurityConfig: +class SecurityConfig(object): PLAINTEXT = 'PLAINTEXT' SSL = 'SSL' @@ -68,7 +66,7 @@ class SecurityConfig: ssl_stores = Keytool.generate_keystore_truststore('.') - def __init__(self, node_account, security_protocol, template_props=""): + def __init__(self, security_protocol, template_props=""): """ Initialize the security properties for the node and copy keystore and truststore to the remote node if the transport protocol @@ -77,20 +75,12 @@ def __init__(self, node_account, security_protocol, template_props=""): template properties either, PLAINTEXT is used as default. """ - self.node_account = node_account if security_protocol is None: - for line in template_props.split("\n"): - items = line.split("=") - if len(items) == 2 and items[0].strip() == 'security.protocol': - security_protocol = str(items[1].strip()) - if security_protocol not in [SecurityConfig.PLAINTEXT, SecurityConfig.SSL]: - raise Exception("Invalid security.protocol in template properties: " + security_protocol) + security_protocol = self.get_property('security.protocol', template_props) if security_protocol is None: security_protocol = SecurityConfig.PLAINTEXT - if security_protocol == SecurityConfig.SSL: - node_account.ssh("mkdir -p %s" % SecurityConfig.SSL_DIR, allow_fail=False) - node_account.scp_to(SecurityConfig.ssl_stores['ssl.keystore.location'], SecurityConfig.KEYSTORE_PATH) - node_account.scp_to(SecurityConfig.ssl_stores['ssl.truststore.location'], SecurityConfig.TRUSTSTORE_PATH) + elif security_protocol not in [SecurityConfig.PLAINTEXT, SecurityConfig.SSL]: + raise Exception("Invalid security.protocol in template properties: " + security_protocol) self.properties = { 'security.protocol' : security_protocol, @@ -101,16 +91,27 @@ def __init__(self, node_account, security_protocol, template_props=""): 'ssl.truststore.password' : SecurityConfig.ssl_stores['ssl.truststore.password'] } - def write_to_file(self, prop_file='/mnt/ssl/security.properties'): - """Write security properties to a remote file""" - tmpfile = tempfile.NamedTemporaryFile(delete=False) - localfile = tmpfile.name - with open(localfile, 'w') as fd: - for key, value in self.properties.items(): - fd.write(key + "=" + value + "\n") - self.node_account.scp_to(localfile, prop_file) - os.remove(localfile) - return prop_file + def setup_node(self, node): + if self.security_protocol == SecurityConfig.SSL: + node.account.ssh("mkdir -p %s" % SecurityConfig.SSL_DIR, allow_fail=False) + node.account.scp_to(SecurityConfig.ssl_stores['ssl.keystore.location'], SecurityConfig.KEYSTORE_PATH) + node.account.scp_to(SecurityConfig.ssl_stores['ssl.truststore.location'], SecurityConfig.TRUSTSTORE_PATH) + + def clean_node(self, node): + if self.security_protocol == SecurityConfig.SSL: + node.account.ssh("rm -rf %s" % SecurityConfig.SSL_DIR, allow_fail=False) + + def get_property(self, prop_name, template_props=""): + """ + Get property value from the string representation of + a properties file. + """ + value = None + for line in template_props.split("\n"): + items = line.split("=") + if len(items) == 2 and items[0].strip() == prop_name: + value = str(items[1].strip()) + return value @property def security_protocol(self): From 8958dd3faadb4ebec5233a294bf246ef390070be Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Wed, 7 Oct 2015 21:32:09 +0000 Subject: [PATCH 3/4] KAFKA-2581: Remove unnecessary code --- tests/kafkatest/sanity_checks/test_console_consumer.py | 1 - tests/kafkatest/services/console_consumer.py | 5 +---- tests/kafkatest/tests/benchmark_test.py | 1 - tests/kafkatest/utils/security_config.py | 6 +++--- 4 files changed, 4 insertions(+), 9 deletions(-) diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py index d6c21e1a20bc9..a9c4d53f0ed40 100644 --- a/tests/kafkatest/sanity_checks/test_console_consumer.py +++ b/tests/kafkatest/sanity_checks/test_console_consumer.py @@ -30,7 +30,6 @@ class ConsoleConsumerTest(Test): def __init__(self, test_context): super(ConsoleConsumerTest, self).__init__(test_context) - self.test_context = test_context self.topic = "topic" self.zk = ZookeeperService(test_context, num_nodes=1) diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 5e5d6a9702532..2f1e70e199e22 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -124,10 +124,7 @@ def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, new self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)} # Process client configuration - if self.consumer_timeout_ms is not None: - self.prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms) - else: - self.prop_file = self.render('console_consumer.properties') + self.prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms) # Add security properties to the config. If security protocol is not specified, # use the default in the template properties. diff --git a/tests/kafkatest/tests/benchmark_test.py b/tests/kafkatest/tests/benchmark_test.py index 0e936162b44dd..7219c0a0ebf78 100644 --- a/tests/kafkatest/tests/benchmark_test.py +++ b/tests/kafkatest/tests/benchmark_test.py @@ -35,7 +35,6 @@ class Benchmark(Test): """ def __init__(self, test_context): super(Benchmark, self).__init__(test_context) - self.test_context = test_context self.num_zk = 1 self.num_brokers = 3 self.topics = { diff --git a/tests/kafkatest/utils/security_config.py b/tests/kafkatest/utils/security_config.py index ae60e3154992e..965f20922103b 100644 --- a/tests/kafkatest/utils/security_config.py +++ b/tests/kafkatest/utils/security_config.py @@ -20,15 +20,15 @@ class Keytool(object): @staticmethod - def generate_keystore_truststore(dir='.'): + def generate_keystore_truststore(ssl_dir='.'): """ Generate JKS keystore and truststore and return Kafka SSL properties with these stores. """ - ks_path = os.path.join(dir, 'test.keystore.jks') + ks_path = os.path.join(ssl_dir, 'test.keystore.jks') ks_password = 'test-ks-passwd' key_password = 'test-key-passwd' - ts_path = os.path.join(dir, 'test.truststore.jks') + ts_path = os.path.join(ssl_dir, 'test.truststore.jks') ts_password = 'test-ts-passwd' if os.path.exists(ks_path): os.remove(ks_path) From 235623097d1d94dcc6788449c71c9e569bf6bb1d Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Mon, 12 Oct 2015 21:20:22 +0000 Subject: [PATCH 4/4] KAFKA-2581: Remove properties file for verifiable_producer --- .../templates/verifiable_producer.properties | 16 ---------------- tests/kafkatest/services/verifiable_producer.py | 3 +-- 2 files changed, 1 insertion(+), 18 deletions(-) delete mode 100644 tests/kafkatest/services/templates/verifiable_producer.properties diff --git a/tests/kafkatest/services/templates/verifiable_producer.properties b/tests/kafkatest/services/templates/verifiable_producer.properties deleted file mode 100644 index e85cf131add84..0000000000000 --- a/tests/kafkatest/services/templates/verifiable_producer.properties +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# see org.apache.kafka.clients.producer.ProducerConfig for more details - diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index 91189fde17266..7ae7988b67393 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -39,8 +39,7 @@ def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, max self.acked_values = [] self.not_acked_values = [] - # If security protocol is not specified, use the default in the template properties. - self.prop_file = self.render('verifiable_producer.properties') + self.prop_file = "" self.security_config = SecurityConfig(security_protocol, self.prop_file) self.security_protocol = self.security_config.security_protocol self.prop_file += str(self.security_config)