diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index 826703072b921..c7f907238b152 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -27,6 +27,7 @@ import java.nio.channels.ClosedByInterruptException import org.apache.log4j.Logger import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer} import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.utils.Utils import kafka.utils.CommandLineUtils import java.util.{ Random, Properties } import kafka.consumer.Consumer @@ -203,6 +204,10 @@ object ConsumerPerformance { .ofType(classOf[java.lang.Integer]) .defaultsTo(1) val useNewConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation.") + val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.") + .withRequiredArg + .describedAs("config file") + .ofType(classOf[String]) val options = parser.parse(args: _*) @@ -210,7 +215,10 @@ object ConsumerPerformance { val useNewConsumer = options.has(useNewConsumerOpt) - val props = new Properties + val props = if (options.has(consumerConfigOpt)) + Utils.loadProps(options.valueOf(consumerConfigOpt)) + else + new Properties if(useNewConsumer) { import org.apache.kafka.clients.consumer.ConsumerConfig props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServersOpt)) diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py index 4544c0097db0e..a9c4d53f0ed40 100644 --- a/tests/kafkatest/sanity_checks/test_console_consumer.py +++ b/tests/kafkatest/sanity_checks/test_console_consumer.py @@ -15,6 +15,8 @@ from ducktape.tests.test import Test from ducktape.utils.util import wait_until +from ducktape.mark import parametrize +from ducktape.mark import matrix from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService @@ -30,16 +32,20 @@ def __init__(self, test_context): self.topic = "topic" self.zk = ZookeeperService(test_context, num_nodes=1) - self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, - topics={self.topic: {"partitions": 1, "replication-factor": 1}}) - self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.kafka, topic=self.topic) def setUp(self): self.zk.start() + + @parametrize(security_protocol='SSL', new_consumer=True) + @matrix(security_protocol=['PLAINTEXT'], new_consumer=[False, True]) + def test_lifecycle(self, security_protocol, new_consumer): + self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, + security_protocol=security_protocol, + topics={self.topic: {"partitions": 1, "replication-factor": 1}}) self.kafka.start() - def test_lifecycle(self): t0 = time.time() + self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, security_protocol=security_protocol, new_consumer=new_consumer) self.consumer.start() node = self.consumer.nodes[0] diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 9286654007039..2f1e70e199e22 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -15,6 +15,7 @@ from ducktape.services.background_thread import BackgroundThreadService from ducktape.utils.util import wait_until +from kafkatest.utils.security_config import SecurityConfig import os import subprocess @@ -93,13 +94,15 @@ class ConsoleConsumer(BackgroundThreadService): "collect_default": True} } - def __init__(self, context, num_nodes, kafka, topic, message_validator=None, from_beginning=True, consumer_timeout_ms=None): + def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, new_consumer=None, message_validator=None, from_beginning=True, consumer_timeout_ms=None): """ Args: context: standard context num_nodes: number of nodes to use (this should be 1) kafka: kafka service topic: consume from this topic + security_protocol: security protocol for Kafka connections + new_consumer: use new Kafka consumer if True message_validator: function which returns message or None from_beginning: consume from beginning if True, else from the end consumer_timeout_ms: corresponds to consumer.timeout.ms. consumer process ends if time between @@ -109,6 +112,7 @@ def __init__(self, context, num_nodes, kafka, topic, message_validator=None, fro """ super(ConsoleConsumer, self).__init__(context, num_nodes) self.kafka = kafka + self.new_consumer = new_consumer self.args = { 'topic': topic, } @@ -119,6 +123,19 @@ def __init__(self, context, num_nodes, kafka, topic, message_validator=None, fro self.message_validator = message_validator self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)} + # Process client configuration + self.prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms) + + # Add security properties to the config. If security protocol is not specified, + # use the default in the template properties. + self.security_config = SecurityConfig(security_protocol, self.prop_file) + self.security_protocol = self.security_config.security_protocol + if self.new_consumer is None: + self.new_consumer = self.security_protocol == SecurityConfig.SSL + if self.security_protocol == SecurityConfig.SSL and not self.new_consumer: + raise Exception("SSL protocol is supported only with the new consumer") + self.prop_file += str(self.security_config) + @property def start_cmd(self): args = self.args.copy() @@ -129,9 +146,13 @@ def start_cmd(self): cmd = "export LOG_DIR=%s;" % ConsoleConsumer.LOG_DIR cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsoleConsumer.LOG4J_CONFIG - cmd += " /opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s --zookeeper %(zk_connect)s" \ + cmd += " /opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s" \ " --consumer.config %(config_file)s" % args + if self.new_consumer: + cmd += " --new-consumer --bootstrap-server %s" % self.kafka.bootstrap_servers() + else: + cmd += " --zookeeper %(zk_connect)s" % args if self.from_beginning: cmd += " --from-beginning" @@ -152,15 +173,10 @@ def alive(self, node): def _worker(self, idx, node): node.account.ssh("mkdir -p %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False) - # Create and upload config file - if self.consumer_timeout_ms is not None: - prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms) - else: - prop_file = self.render('console_consumer.properties') - self.logger.info("console_consumer.properties:") - self.logger.info(prop_file) - node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file) + self.logger.info(self.prop_file) + node.account.create_file(ConsoleConsumer.CONFIG_FILE, self.prop_file) + self.security_config.setup_node(node) # Create and upload log properties log_config = self.render('tools_log4j.properties', log_file=ConsoleConsumer.LOG_FILE) @@ -190,4 +206,5 @@ def clean_node(self, node): (self.__class__.__name__, node.account)) node.account.kill_process("java", clean_shutdown=False, allow_fail=True) node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False) + self.security_config.clean_node(node) diff --git a/tests/kafkatest/services/kafka.py b/tests/kafkatest/services/kafka.py index 5ff804766f1df..2c200f37927ef 100644 --- a/tests/kafkatest/services/kafka.py +++ b/tests/kafkatest/services/kafka.py @@ -15,6 +15,7 @@ from ducktape.services.service import Service from ducktape.utils.util import wait_until +from kafkatest.utils.security_config import SecurityConfig import json import re @@ -33,7 +34,7 @@ class KafkaService(Service): "collect_default": False} } - def __init__(self, context, num_nodes, zk, topics=None): + def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT, topics=None): """ :type context :type zk: ZookeeperService @@ -41,6 +42,13 @@ def __init__(self, context, num_nodes, zk, topics=None): """ super(KafkaService, self).__init__(context, num_nodes) self.zk = zk + if security_protocol == SecurityConfig.SSL or interbroker_security_protocol == SecurityConfig.SSL: + self.security_config = SecurityConfig(SecurityConfig.SSL) + else: + self.security_config = SecurityConfig(SecurityConfig.PLAINTEXT) + self.security_protocol = security_protocol + self.interbroker_security_protocol = interbroker_security_protocol + self.port = 9092 if security_protocol == SecurityConfig.PLAINTEXT else 9093 self.topics = topics def start(self): @@ -56,10 +64,13 @@ def start(self): self.create_topic(topic_cfg) def start_node(self, node): - props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node)) + props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node), + port = self.port, security_protocol = self.security_protocol, + interbroker_security_protocol=self.interbroker_security_protocol) self.logger.info("kafka.properties:") self.logger.info(props_file) node.account.create_file("/mnt/kafka.properties", props_file) + self.security_config.setup_node(node) cmd = "/opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log & echo $! > /mnt/kafka.pid" self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd)) @@ -97,6 +108,7 @@ def stop_node(self, node, clean_shutdown=True): def clean_node(self, node): node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True) node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid", allow_fail=False) + self.security_config.clean_node(node) def create_topic(self, topic_cfg): node = self.nodes[0] # any node is fine here @@ -227,4 +239,6 @@ def leader(self, topic, partition=0): return self.get_node(leader_idx) def bootstrap_servers(self): - return ','.join([node.account.hostname + ":9092" for node in self.nodes]) + """Get the broker list to connect to Kafka using the specified security protocol + """ + return ','.join([node.account.hostname + ":" + `self.port` for node in self.nodes]) diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py index ecaef43f14ba4..b8eab22471749 100644 --- a/tests/kafkatest/services/performance/consumer_performance.py +++ b/tests/kafkatest/services/performance/consumer_performance.py @@ -14,6 +14,7 @@ # limitations under the License. from kafkatest.services.performance import PerformanceService +from kafkatest.utils.security_config import SecurityConfig import os @@ -43,6 +44,7 @@ class ConsumerPerformanceService(PerformanceService): "num-fetch-threads", "Number of fetcher threads. Defaults to 1" "new-consumer", "Use the new consumer implementation." + "consumer.config", "Consumer config properties file." """ # Root directory for persistent output @@ -51,6 +53,7 @@ class ConsumerPerformanceService(PerformanceService): STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "consumer_performance.stdout") LOG_FILE = os.path.join(LOG_DIR, "consumer_performance.log") LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") + CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "consumer.properties") logs = { "consumer_performance_output": { @@ -62,9 +65,11 @@ class ConsumerPerformanceService(PerformanceService): "collect_default": True} } - def __init__(self, context, num_nodes, kafka, topic, messages, new_consumer=False, settings={}): + def __init__(self, context, num_nodes, kafka, security_protocol, topic, messages, new_consumer=False, settings={}): super(ConsumerPerformanceService, self).__init__(context, num_nodes) self.kafka = kafka + self.security_config = SecurityConfig(security_protocol) + self.security_protocol = security_protocol self.topic = topic self.messages = messages self.new_consumer = new_consumer @@ -119,6 +124,7 @@ def start_cmd(self): cmd += " /opt/kafka/bin/kafka-consumer-perf-test.sh" for key, value in self.args.items(): cmd += " --%s %s" % (key, value) + cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE for key, value in self.settings.items(): cmd += " %s=%s" % (str(key), str(value)) @@ -131,6 +137,8 @@ def _worker(self, idx, node): log_config = self.render('tools_log4j.properties', log_file=ConsumerPerformanceService.LOG_FILE) node.account.create_file(ConsumerPerformanceService.LOG4J_CONFIG, log_config) + node.account.create_file(ConsumerPerformanceService.CONFIG_FILE, str(self.security_config)) + self.security_config.setup_node(node) cmd = self.start_cmd self.logger.debug("Consumer performance %d command: %s", idx, cmd) diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py index 4c61a93e8f259..0559a43174e81 100644 --- a/tests/kafkatest/services/performance/end_to_end_latency.py +++ b/tests/kafkatest/services/performance/end_to_end_latency.py @@ -14,6 +14,7 @@ # limitations under the License. from kafkatest.services.performance import PerformanceService +from kafkatest.utils.security_config import SecurityConfig class EndToEndLatencyService(PerformanceService): @@ -24,9 +25,11 @@ class EndToEndLatencyService(PerformanceService): "collect_default": True}, } - def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1): + def __init__(self, context, num_nodes, kafka, security_protocol, topic, num_records, consumer_fetch_max_wait=100, acks=1): super(EndToEndLatencyService, self).__init__(context, num_nodes) self.kafka = kafka + self.security_config = SecurityConfig(security_protocol) + self.security_protocol = security_protocol self.args = { 'topic': topic, 'num_records': num_records, @@ -36,14 +39,21 @@ def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch def _worker(self, idx, node): args = self.args.copy() + self.security_config.setup_node(node) + if self.security_protocol == SecurityConfig.SSL: + ssl_config_file = SecurityConfig.SSL_DIR + "/security.properties" + node.account.create_file(ssl_config_file, str(self.security_config)) + else: + ssl_config_file = "" args.update({ 'zk_connect': self.kafka.zk.connect_setting(), 'bootstrap_servers': self.kafka.bootstrap_servers(), + 'ssl_config_file': ssl_config_file }) cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\ "%(bootstrap_servers)s %(topic)s %(num_records)d "\ - "%(acks)d 20" % args + "%(acks)d 20 %(ssl_config_file)s" % args cmd += " | tee /mnt/end-to-end-latency.log" diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py index c46a910f29323..7a026fcfd7806 100644 --- a/tests/kafkatest/services/performance/producer_performance.py +++ b/tests/kafkatest/services/performance/producer_performance.py @@ -14,6 +14,7 @@ # limitations under the License. from kafkatest.services.performance import PerformanceService +from kafkatest.utils.security_config import SecurityConfig class ProducerPerformanceService(PerformanceService): @@ -24,9 +25,11 @@ class ProducerPerformanceService(PerformanceService): "collect_default": True}, } - def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False): + def __init__(self, context, num_nodes, kafka, security_protocol, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False): super(ProducerPerformanceService, self).__init__(context, num_nodes) self.kafka = kafka + self.security_config = SecurityConfig(security_protocol) + self.security_protocol = security_protocol self.args = { 'topic': topic, 'num_records': num_records, @@ -40,11 +43,15 @@ def _worker(self, idx, node): args = self.args.copy() args.update({'bootstrap_servers': self.kafka.bootstrap_servers()}) cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance "\ - "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s"\ - " | tee /mnt/producer-performance.log" % args + "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s" % args + self.security_config.setup_node(node) + if self.security_protocol == SecurityConfig.SSL: + self.settings.update(self.security_config.properties) for key, value in self.settings.items(): cmd += " %s=%s" % (str(key), str(value)) + cmd += " | tee /mnt/producer-performance.log" + self.logger.debug("Producer performance %d command: %s", idx, cmd) def parse_stats(line): diff --git a/tests/kafkatest/services/templates/kafka.properties b/tests/kafkatest/services/templates/kafka.properties index 6650d23784924..036a8dba6a91e 100644 --- a/tests/kafkatest/services/templates/kafka.properties +++ b/tests/kafkatest/services/templates/kafka.properties @@ -20,6 +20,13 @@ port=9092 #host.name=localhost advertised.host.name={{ node.account.hostname }} #advertised.port= +{% if security_protocol == interbroker_security_protocol %} +listeners={{ security_protocol }}://:{{ port }} +advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:{{ port }} +{% else %} +listeners=PLAINTEXT://:9092,SSL://:9093 +advertised.listeners=PLAINTEXT://{{ node.account.hostname }}:9092,SSL://{{ node.account.hostname }}:9093 +{% endif %} num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 @@ -39,3 +46,13 @@ log.cleaner.enable=false zookeeper.connect={{ zk.connect_setting() }} zookeeper.connection.timeout.ms=2000 + +security.inter.broker.protocol={{ interbroker_security_protocol }} +ssl.keystore.location=/mnt/ssl/test.keystore.jks +ssl.keystore.password=test-ks-passwd +ssl.key.password=test-key-passwd +ssl.keystore.type=JKS +ssl.truststore.location=/mnt/ssl/test.truststore.jks +ssl.truststore.password=test-ts-passwd +ssl.truststore.type=JKS + diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index 158db7ab3d1ac..7ae7988b67393 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -14,19 +14,21 @@ # limitations under the License. from ducktape.services.background_thread import BackgroundThreadService +from kafkatest.utils.security_config import SecurityConfig import json class VerifiableProducer(BackgroundThreadService): + CONFIG_FILE = "/mnt/verifiable_producer.properties" logs = { "producer_log": { "path": "/mnt/producer.log", "collect_default": False} } - def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000): + def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, max_messages=-1, throughput=100000): super(VerifiableProducer, self).__init__(context, num_nodes) self.kafka = kafka @@ -37,7 +39,18 @@ def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput self.acked_values = [] self.not_acked_values = [] + self.prop_file = "" + self.security_config = SecurityConfig(security_protocol, self.prop_file) + self.security_protocol = self.security_config.security_protocol + self.prop_file += str(self.security_config) + def _worker(self, idx, node): + # Create and upload config file + self.logger.info("verifiable_producer.properties:") + self.logger.info(self.prop_file) + node.account.create_file(VerifiableProducer.CONFIG_FILE, self.prop_file) + self.security_config.setup_node(node) + cmd = self.start_cmd self.logger.debug("VerifiableProducer %d command: %s" % (idx, cmd)) @@ -64,6 +77,7 @@ def start_cmd(self): if self.throughput > 0: cmd += " --throughput %s" % str(self.throughput) + cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE cmd += " 2>> /mnt/producer.log | tee -a /mnt/producer.log &" return cmd @@ -99,7 +113,8 @@ def stop_node(self, node): def clean_node(self, node): node.account.kill_process("VerifiableProducer", clean_shutdown=False, allow_fail=False) - node.account.ssh("rm -rf /mnt/producer.log", allow_fail=False) + node.account.ssh("rm -rf /mnt/producer.log /mnt/verifiable_producer.properties", allow_fail=False) + self.security_config.clean_node(node) def try_parse_json(self, string): """Try to parse a string as json. Return None if not parseable.""" diff --git a/tests/kafkatest/tests/benchmark_test.py b/tests/kafkatest/tests/benchmark_test.py index 02503ec44b025..7219c0a0ebf78 100644 --- a/tests/kafkatest/tests/benchmark_test.py +++ b/tests/kafkatest/tests/benchmark_test.py @@ -14,10 +14,12 @@ # limitations under the License. from ducktape.services.service import Service +from ducktape.tests.test import Test from ducktape.mark import parametrize from ducktape.mark import matrix -from kafkatest.tests.kafka_test import KafkaTest +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService @@ -26,16 +28,21 @@ DEFAULT_RECORD_SIZE = 100 # bytes -class Benchmark(KafkaTest): +class Benchmark(Test): """A benchmark of Kafka producer/consumer performance. This replicates the test run here: https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines """ def __init__(self, test_context): - super(Benchmark, self).__init__(test_context, num_zk=1, num_brokers=3, topics={ + super(Benchmark, self).__init__(test_context) + self.num_zk = 1 + self.num_brokers = 3 + self.topics = { TOPIC_REP_ONE: {'partitions': 6, 'replication-factor': 1}, TOPIC_REP_THREE: {'partitions': 6, 'replication-factor': 3} - }) + } + + self.zk = ZookeeperService(test_context, self.num_zk) self.msgs_large = 10000000 self.batch_size = 8*1024 @@ -44,25 +51,36 @@ def __init__(self, test_context): self.target_data_size = 128*1024*1024 self.target_data_size_gb = self.target_data_size/float(1024*1024*1024) - @parametrize(acks=1, topic=TOPIC_REP_ONE, num_producers=1, message_size=DEFAULT_RECORD_SIZE) - @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=1, message_size=DEFAULT_RECORD_SIZE) - @parametrize(acks=-1, topic=TOPIC_REP_THREE, num_producers=1, message_size=DEFAULT_RECORD_SIZE) - @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3, message_size=DEFAULT_RECORD_SIZE) - @matrix(acks=[1], topic=[TOPIC_REP_THREE], num_producers=[1], message_size=[10, 100, 1000, 10000, 100000]) - def test_producer_throughput(self, acks, topic, num_producers, message_size): + def setUp(self): + self.zk.start() + + def start_kafka(self, security_protocol, interbroker_security_protocol): + self.kafka = KafkaService( + self.test_context, self.num_brokers, + self.zk, security_protocol=security_protocol, + interbroker_security_protocol=interbroker_security_protocol, topics=self.topics) + self.kafka.start() + + @parametrize(acks=1, topic=TOPIC_REP_ONE) + @parametrize(acks=1, topic=TOPIC_REP_THREE) + @parametrize(acks=-1, topic=TOPIC_REP_THREE) + @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3) + @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], security_protocol=['PLAINTEXT', 'SSL']) + def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE, security_protocol='PLAINTEXT'): """ Setup: 1 node zk + 3 node kafka cluster Produce ~128MB worth of messages to a topic with 6 partitions. Required acks, topic replication factor, - and message size are varied depending on arguments injected into this test. + security protocol and message size are varied depending on arguments injected into this test. Collect and return aggregate throughput statistics after all messages have been acknowledged. (This runs ProducerPerformance.java under the hood) """ + self.start_kafka(security_protocol, security_protocol) # Always generate the same total amount of data nrecords = int(self.target_data_size / message_size) self.producer = ProducerPerformanceService( - self.test_context, num_producers, self.kafka, topic=topic, + self.test_context, num_producers, self.kafka, security_protocol=security_protocol, topic=topic, num_records=nrecords, record_size=message_size, throughput=-1, settings={ 'acks': acks, @@ -71,7 +89,9 @@ def test_producer_throughput(self, acks, topic, num_producers, message_size): self.producer.run() return compute_aggregate_throughput(self.producer) - def test_long_term_producer_throughput(self): + @parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='PLAINTEXT') + @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT', 'SSL']) + def test_long_term_producer_throughput(self, security_protocol, interbroker_security_protocol): """ Setup: 1 node zk + 3 node kafka cluster Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1. @@ -80,8 +100,9 @@ def test_long_term_producer_throughput(self): (This runs ProducerPerformance.java under the hood) """ + self.start_kafka(security_protocol, security_protocol) self.producer = ProducerPerformanceService( - self.test_context, 1, self.kafka, + self.test_context, 1, self.kafka, security_protocol=security_protocol, topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE, throughput=-1, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}, intermediate_stats=True @@ -111,7 +132,10 @@ def test_long_term_producer_throughput(self): self.logger.info("\n".join(summary)) return data - def test_end_to_end_latency(self): + + @parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='PLAINTEXT') + @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT', 'SSL']) + def test_end_to_end_latency(self, security_protocol, interbroker_security_protocol): """ Setup: 1 node zk + 3 node kafka cluster Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3, @@ -121,16 +145,19 @@ def test_end_to_end_latency(self): (Under the hood, this simply runs EndToEndLatency.scala) """ + self.start_kafka(security_protocol, interbroker_security_protocol) self.logger.info("BENCHMARK: End to end latency") self.perf = EndToEndLatencyService( self.test_context, 1, self.kafka, - topic=TOPIC_REP_THREE, num_records=10000 + topic=TOPIC_REP_THREE, security_protocol=security_protocol, num_records=10000 ) self.perf.run() return latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms']) - @matrix(new_consumer=[True, False]) - def test_producer_and_consumer(self, new_consumer=False): + @parametrize(new_consumer=True, security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') + @parametrize(new_consumer=True, security_protocol='SSL', interbroker_security_protocol='SSL') + @matrix(new_consumer=[True, False], security_protocol=['PLAINTEXT']) + def test_producer_and_consumer(self, new_consumer, security_protocol, interbroker_security_protocol='PLAINTEXT'): """ Setup: 1 node zk + 3 node kafka cluster Concurrently produce and consume 10e6 messages with a single producer and a single consumer, @@ -140,15 +167,17 @@ def test_producer_and_consumer(self, new_consumer=False): (Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.scala) """ + self.start_kafka(security_protocol, interbroker_security_protocol) num_records = 10 * 1000 * 1000 # 10e6 self.producer = ProducerPerformanceService( - self.test_context, 1, self.kafka, - topic=TOPIC_REP_THREE, num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, + self.test_context, 1, self.kafka, + topic=TOPIC_REP_THREE, security_protocol=security_protocol, + num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory} ) self.consumer = ConsumerPerformanceService( - self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records) + self.test_context, 1, self.kafka, security_protocol, topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records) Service.run_parallel(self.producer, self.consumer) data = { @@ -161,18 +190,22 @@ def test_producer_and_consumer(self, new_consumer=False): self.logger.info("\n".join(summary)) return data - @matrix(new_consumer=[True, False], num_consumers=[1]) - def test_consumer_throughput(self, new_consumer, num_consumers): + @parametrize(new_consumer=True, security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') + @parametrize(new_consumer=True, security_protocol='SSL', interbroker_security_protocol='SSL') + @matrix(new_consumer=[True, False], security_protocol=['PLAINTEXT']) + def test_consumer_throughput(self, new_consumer, security_protocol, interbroker_security_protocol='PLAINTEXT', num_consumers=1): """ Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions (using new consumer iff new_consumer == True), and report throughput. """ + self.start_kafka(security_protocol, interbroker_security_protocol) num_records = 10 * 1000 * 1000 # 10e6 # seed kafka w/messages self.producer = ProducerPerformanceService( self.test_context, 1, self.kafka, - topic=TOPIC_REP_THREE, num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, + topic=TOPIC_REP_THREE, security_protocol=security_protocol, + num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory} ) self.producer.run() @@ -180,7 +213,7 @@ def test_consumer_throughput(self, new_consumer, num_consumers): # consume self.consumer = ConsumerPerformanceService( self.test_context, num_consumers, self.kafka, - topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records) + topic=TOPIC_REP_THREE, security_protocol=security_protocol, new_consumer=new_consumer, messages=num_records) self.consumer.group = "test-consumer-group" self.consumer.run() return compute_aggregate_throughput(self.consumer) diff --git a/tests/kafkatest/tests/replication_test.py b/tests/kafkatest/tests/replication_test.py index a83769b509439..d20cc220b4b32 100644 --- a/tests/kafkatest/tests/replication_test.py +++ b/tests/kafkatest/tests/replication_test.py @@ -15,6 +15,8 @@ from ducktape.tests.test import Test from ducktape.utils.util import wait_until +from ducktape.mark import parametrize +from ducktape.mark import matrix from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService @@ -36,24 +38,18 @@ def __init__(self, test_context): self.topic = "test_topic" self.zk = ZookeeperService(test_context, num_nodes=1) - self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: { - "partitions": 3, - "replication-factor": 3, - "min.insync.replicas": 2} - }) self.producer_throughput = 10000 self.num_producers = 1 self.num_consumers = 1 def setUp(self): self.zk.start() - self.kafka.start() def min_cluster_size(self): """Override this since we're adding services outside of the constructor""" return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers - def run_with_failure(self, failure): + def run_with_failure(self, failure, interbroker_security_protocol): """This is the top-level test template. The steps are: @@ -75,8 +71,18 @@ def run_with_failure(self, failure): indicator that nothing is left to consume. """ - self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput) - self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=3000, message_validator=is_int) + security_protocol='PLAINTEXT' + self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, + security_protocol=security_protocol, + interbroker_security_protocol=interbroker_security_protocol, + topics={self.topic: { + "partitions": 3, + "replication-factor": 3, + "min.insync.replicas": 2} + }) + self.kafka.start() + self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, security_protocol=security_protocol, throughput=self.producer_throughput) + self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, security_protocol=security_protocol, new_consumer=False, consumer_timeout_ms=3000, message_validator=is_int) # Produce in a background thread while driving broker failures self.producer.start() @@ -149,14 +155,19 @@ def validate(self): return success, msg - def test_clean_shutdown(self): - self.run_with_failure(self.clean_shutdown) + + @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL']) + def test_clean_shutdown(self, interbroker_security_protocol): + self.run_with_failure(self.clean_shutdown, interbroker_security_protocol) - def test_hard_shutdown(self): - self.run_with_failure(self.hard_shutdown) + @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL']) + def test_hard_shutdown(self, interbroker_security_protocol): + self.run_with_failure(self.hard_shutdown, interbroker_security_protocol) - def test_clean_bounce(self): - self.run_with_failure(self.clean_bounce) + @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL']) + def test_clean_bounce(self, interbroker_security_protocol): + self.run_with_failure(self.clean_bounce, interbroker_security_protocol) - def test_hard_bounce(self): - self.run_with_failure(self.hard_bounce) + @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL']) + def test_hard_bounce(self, interbroker_security_protocol): + self.run_with_failure(self.hard_bounce, interbroker_security_protocol) diff --git a/tests/kafkatest/utils/security_config.py b/tests/kafkatest/utils/security_config.py new file mode 100644 index 0000000000000..965f20922103b --- /dev/null +++ b/tests/kafkatest/utils/security_config.py @@ -0,0 +1,133 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import subprocess + + +class Keytool(object): + + @staticmethod + def generate_keystore_truststore(ssl_dir='.'): + """ + Generate JKS keystore and truststore and return + Kafka SSL properties with these stores. + """ + ks_path = os.path.join(ssl_dir, 'test.keystore.jks') + ks_password = 'test-ks-passwd' + key_password = 'test-key-passwd' + ts_path = os.path.join(ssl_dir, 'test.truststore.jks') + ts_password = 'test-ts-passwd' + if os.path.exists(ks_path): + os.remove(ks_path) + if os.path.exists(ts_path): + os.remove(ts_path) + + Keytool.runcmd("keytool -genkeypair -alias test -keyalg RSA -keysize 2048 -keystore %s -storetype JKS -keypass %s -storepass %s -dname CN=systemtest" % (ks_path, key_password, ks_password)) + Keytool.runcmd("keytool -export -alias test -keystore %s -storepass %s -storetype JKS -rfc -file test.crt" % (ks_path, ks_password)) + Keytool.runcmd("keytool -import -alias test -file test.crt -keystore %s -storepass %s -storetype JKS -noprompt" % (ts_path, ts_password)) + os.remove('test.crt') + + return { + 'ssl.keystore.location' : ks_path, + 'ssl.keystore.password' : ks_password, + 'ssl.key.password' : key_password, + 'ssl.truststore.location' : ts_path, + 'ssl.truststore.password' : ts_password + } + + @staticmethod + def runcmd(cmd): + proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + proc.communicate() + if proc.returncode != 0: + raise subprocess.CalledProcessError(proc.returncode, cmd) + + +class SecurityConfig(object): + + PLAINTEXT = 'PLAINTEXT' + SSL = 'SSL' + SSL_DIR = "/mnt/ssl" + KEYSTORE_PATH = "/mnt/ssl/test.keystore.jks" + TRUSTSTORE_PATH = "/mnt/ssl/test.truststore.jks" + + ssl_stores = Keytool.generate_keystore_truststore('.') + + def __init__(self, security_protocol, template_props=""): + """ + Initialize the security properties for the node and copy + keystore and truststore to the remote node if the transport protocol + is SSL. If security_protocol is None, the protocol specified in the + template properties file is used. If no protocol is specified in the + template properties either, PLAINTEXT is used as default. + """ + + if security_protocol is None: + security_protocol = self.get_property('security.protocol', template_props) + if security_protocol is None: + security_protocol = SecurityConfig.PLAINTEXT + elif security_protocol not in [SecurityConfig.PLAINTEXT, SecurityConfig.SSL]: + raise Exception("Invalid security.protocol in template properties: " + security_protocol) + + self.properties = { + 'security.protocol' : security_protocol, + 'ssl.keystore.location' : SecurityConfig.KEYSTORE_PATH, + 'ssl.keystore.password' : SecurityConfig.ssl_stores['ssl.keystore.password'], + 'ssl.key.password' : SecurityConfig.ssl_stores['ssl.key.password'], + 'ssl.truststore.location' : SecurityConfig.TRUSTSTORE_PATH, + 'ssl.truststore.password' : SecurityConfig.ssl_stores['ssl.truststore.password'] + } + + def setup_node(self, node): + if self.security_protocol == SecurityConfig.SSL: + node.account.ssh("mkdir -p %s" % SecurityConfig.SSL_DIR, allow_fail=False) + node.account.scp_to(SecurityConfig.ssl_stores['ssl.keystore.location'], SecurityConfig.KEYSTORE_PATH) + node.account.scp_to(SecurityConfig.ssl_stores['ssl.truststore.location'], SecurityConfig.TRUSTSTORE_PATH) + + def clean_node(self, node): + if self.security_protocol == SecurityConfig.SSL: + node.account.ssh("rm -rf %s" % SecurityConfig.SSL_DIR, allow_fail=False) + + def get_property(self, prop_name, template_props=""): + """ + Get property value from the string representation of + a properties file. + """ + value = None + for line in template_props.split("\n"): + items = line.split("=") + if len(items) == 2 and items[0].strip() == prop_name: + value = str(items[1].strip()) + return value + + @property + def security_protocol(self): + return self.properties['security.protocol'] + + def __str__(self): + """ + Return properties as string with line separators. + This is used to append security config properties to + a properties file. + """ + + prop_str = "" + if self.security_protocol == SecurityConfig.SSL: + for key, value in self.properties.items(): + prop_str += ("\n" + key + "=" + value) + prop_str += "\n" + return prop_str + diff --git a/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java index b0e19fc4f46eb..a79f78e305c53 100644 --- a/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java +++ b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.utils.Utils; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -128,6 +129,13 @@ private static ArgumentParser argParser() { .metavar("ACKS") .help("Acks required on each produced message. See Kafka docs on request.required.acks for details."); + parser.addArgument("--producer.config") + .action(store()) + .required(false) + .type(String.class) + .metavar("CONFIG_FILE") + .help("Producer config properties file."); + return parser; } @@ -143,6 +151,7 @@ public static VerifiableProducer createFromArgs(String[] args) { int maxMessages = res.getInt("maxMessages"); String topic = res.getString("topic"); int throughput = res.getInt("throughput"); + String configFile = res.getString("producer.config"); Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList")); @@ -153,6 +162,13 @@ public static VerifiableProducer createFromArgs(String[] args) { producerProps.put(ProducerConfig.ACKS_CONFIG, Integer.toString(res.getInt("acks"))); // No producer retries producerProps.put("retries", "0"); + if (configFile != null) { + try { + producerProps.putAll(Utils.loadProps(configFile)); + } catch (IOException e) { + throw new ArgumentParserException(e.getMessage(), parser); + } + } producer = new VerifiableProducer(producerProps, topic, throughput, maxMessages); } catch (ArgumentParserException e) {