Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion core/src/main/scala/kafka/tools/ConsumerPerformance.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import java.nio.channels.ClosedByInterruptException
import org.apache.log4j.Logger
import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.utils.Utils
import kafka.utils.CommandLineUtils
import java.util.{ Random, Properties }
import kafka.consumer.Consumer
Expand Down Expand Up @@ -203,14 +204,21 @@ object ConsumerPerformance {
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
val useNewConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation.")
val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.")
.withRequiredArg
.describedAs("config file")
.ofType(classOf[String])

val options = parser.parse(args: _*)

CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt)

val useNewConsumer = options.has(useNewConsumerOpt)

val props = new Properties
val props = if (options.has(consumerConfigOpt))
Utils.loadProps(options.valueOf(consumerConfigOpt))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be added before or after the command line options? If we load them before, the default values for options would override custom settings from the consumer.config file even if the command line parameters were not specified.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Ideally, specified command-line settings would override the properties, but it seems like that would be a bigger change. Perhaps this should behave as the ConsoleConsumer until we can overhaul this. It looks like the change here does that, right?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, ideally if they were specified they would override, but if unspecified it would use values from the file. Not sure how easy that is to get working with this options library. I could have sworn I checked one of the other tools and found it using the behavior I expected, but now can't find it. I don't think this is make-or-break. I was just raising it since the behavior is unintuitive to me and although the performance tools are less critical, we probably wouldn't want to change their behavior that significantly after releasing a version with support for the consumer.config option.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do completely agree. However, I think all the tools currently do it this way and so I have kept it consistent. I will raise another JIRA to fix all the tools.

else
new Properties
if(useNewConsumer) {
import org.apache.kafka.clients.consumer.ConsumerConfig
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServersOpt))
Expand Down
14 changes: 10 additions & 4 deletions tests/kafkatest/sanity_checks/test_console_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from ducktape.mark import parametrize
from ducktape.mark import matrix

from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
Expand All @@ -30,16 +32,20 @@ def __init__(self, test_context):

self.topic = "topic"
self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
topics={self.topic: {"partitions": 1, "replication-factor": 1}})
self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.kafka, topic=self.topic)

def setUp(self):
self.zk.start()

@parametrize(security_protocol='SSL', new_consumer=True)
@matrix(security_protocol=['PLAINTEXT'], new_consumer=[False, True])
def test_lifecycle(self, security_protocol, new_consumer):
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
security_protocol=security_protocol,
topics={self.topic: {"partitions": 1, "replication-factor": 1}})
self.kafka.start()

def test_lifecycle(self):
t0 = time.time()
self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, security_protocol=security_protocol, new_consumer=new_consumer)
self.consumer.start()
node = self.consumer.nodes[0]

Expand Down
37 changes: 27 additions & 10 deletions tests/kafkatest/services/console_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from ducktape.services.background_thread import BackgroundThreadService
from ducktape.utils.util import wait_until
from kafkatest.utils.security_config import SecurityConfig

import os
import subprocess
Expand Down Expand Up @@ -93,13 +94,15 @@ class ConsoleConsumer(BackgroundThreadService):
"collect_default": True}
}

def __init__(self, context, num_nodes, kafka, topic, message_validator=None, from_beginning=True, consumer_timeout_ms=None):
def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, new_consumer=None, message_validator=None, from_beginning=True, consumer_timeout_ms=None):
"""
Args:
context: standard context
num_nodes: number of nodes to use (this should be 1)
kafka: kafka service
topic: consume from this topic
security_protocol: security protocol for Kafka connections
new_consumer: use new Kafka consumer if True
message_validator: function which returns message or None
from_beginning: consume from beginning if True, else from the end
consumer_timeout_ms: corresponds to consumer.timeout.ms. consumer process ends if time between
Expand All @@ -109,6 +112,7 @@ def __init__(self, context, num_nodes, kafka, topic, message_validator=None, fro
"""
super(ConsoleConsumer, self).__init__(context, num_nodes)
self.kafka = kafka
self.new_consumer = new_consumer
self.args = {
'topic': topic,
}
Expand All @@ -119,6 +123,19 @@ def __init__(self, context, num_nodes, kafka, topic, message_validator=None, fro
self.message_validator = message_validator
self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}

# Process client configuration
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can simply do
self.prop_file = self.render('console_consumer.properties')

The properties file already does the "null/None" check:

{% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %}
consumer.timeout.ms={{ consumer_timeout_ms }}
{% endif %}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@granders Have made these changes.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems not done in the latest patch.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang I think the change suggested by @granders was to remove the if-then-else around render() which was in the code earlier. That was removed in the latest patch (the code selection above is slightly confusing). Can you let me know if I have misunderstood the comment?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajinisivaram @guozhangwang
This was correctly updated - I was referring to the block around render()

self.prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms)

# Add security properties to the config. If security protocol is not specified,
# use the default in the template properties.
self.security_config = SecurityConfig(security_protocol, self.prop_file)
self.security_protocol = self.security_config.security_protocol
if self.new_consumer is None:
self.new_consumer = self.security_protocol == SecurityConfig.SSL
if self.security_protocol == SecurityConfig.SSL and not self.new_consumer:
raise Exception("SSL protocol is supported only with the new consumer")
self.prop_file += str(self.security_config)

@property
def start_cmd(self):
args = self.args.copy()
Expand All @@ -129,9 +146,13 @@ def start_cmd(self):

cmd = "export LOG_DIR=%s;" % ConsoleConsumer.LOG_DIR
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsoleConsumer.LOG4J_CONFIG
cmd += " /opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s --zookeeper %(zk_connect)s" \
cmd += " /opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s" \
" --consumer.config %(config_file)s" % args

if self.new_consumer:
cmd += " --new-consumer --bootstrap-server %s" % self.kafka.bootstrap_servers()
else:
cmd += " --zookeeper %(zk_connect)s" % args
if self.from_beginning:
cmd += " --from-beginning"

Expand All @@ -152,15 +173,10 @@ def alive(self, node):
def _worker(self, idx, node):
node.account.ssh("mkdir -p %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)

# Create and upload config file
if self.consumer_timeout_ms is not None:
prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms)
else:
prop_file = self.render('console_consumer.properties')

self.logger.info("console_consumer.properties:")
self.logger.info(prop_file)
node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file)
self.logger.info(self.prop_file)
node.account.create_file(ConsoleConsumer.CONFIG_FILE, self.prop_file)
self.security_config.setup_node(node)

# Create and upload log properties
log_config = self.render('tools_log4j.properties', log_file=ConsoleConsumer.LOG_FILE)
Expand Down Expand Up @@ -190,4 +206,5 @@ def clean_node(self, node):
(self.__class__.__name__, node.account))
node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
self.security_config.clean_node(node)

20 changes: 17 additions & 3 deletions tests/kafkatest/services/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from ducktape.services.service import Service
from ducktape.utils.util import wait_until
from kafkatest.utils.security_config import SecurityConfig

import json
import re
Expand All @@ -33,14 +34,21 @@ class KafkaService(Service):
"collect_default": False}
}

def __init__(self, context, num_nodes, zk, topics=None):
def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT, topics=None):
"""
:type context
:type zk: ZookeeperService
:type topics: dict
"""
super(KafkaService, self).__init__(context, num_nodes)
self.zk = zk
if security_protocol == SecurityConfig.SSL or interbroker_security_protocol == SecurityConfig.SSL:
self.security_config = SecurityConfig(SecurityConfig.SSL)
else:
self.security_config = SecurityConfig(SecurityConfig.PLAINTEXT)
self.security_protocol = security_protocol
self.interbroker_security_protocol = interbroker_security_protocol
self.port = 9092 if security_protocol == SecurityConfig.PLAINTEXT else 9093
self.topics = topics

def start(self):
Expand All @@ -56,10 +64,13 @@ def start(self):
self.create_topic(topic_cfg)

def start_node(self, node):
props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node))
props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node),
port = self.port, security_protocol = self.security_protocol,
interbroker_security_protocol=self.interbroker_security_protocol)
self.logger.info("kafka.properties:")
self.logger.info(props_file)
node.account.create_file("/mnt/kafka.properties", props_file)
self.security_config.setup_node(node)

cmd = "/opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log & echo $! > /mnt/kafka.pid"
self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
Expand Down Expand Up @@ -97,6 +108,7 @@ def stop_node(self, node, clean_shutdown=True):
def clean_node(self, node):
node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid", allow_fail=False)
self.security_config.clean_node(node)

def create_topic(self, topic_cfg):
node = self.nodes[0] # any node is fine here
Expand Down Expand Up @@ -227,4 +239,6 @@ def leader(self, topic, partition=0):
return self.get_node(leader_idx)

def bootstrap_servers(self):
return ','.join([node.account.hostname + ":9092" for node in self.nodes])
"""Get the broker list to connect to Kafka using the specified security protocol
"""
return ','.join([node.account.hostname + ":" + `self.port` for node in self.nodes])
10 changes: 9 additions & 1 deletion tests/kafkatest/services/performance/consumer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

from kafkatest.services.performance import PerformanceService
from kafkatest.utils.security_config import SecurityConfig

import os

Expand Down Expand Up @@ -43,6 +44,7 @@ class ConsumerPerformanceService(PerformanceService):
"num-fetch-threads", "Number of fetcher threads. Defaults to 1"

"new-consumer", "Use the new consumer implementation."
"consumer.config", "Consumer config properties file."
"""

# Root directory for persistent output
Expand All @@ -51,6 +53,7 @@ class ConsumerPerformanceService(PerformanceService):
STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "consumer_performance.stdout")
LOG_FILE = os.path.join(LOG_DIR, "consumer_performance.log")
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "consumer.properties")

logs = {
"consumer_performance_output": {
Expand All @@ -62,9 +65,11 @@ class ConsumerPerformanceService(PerformanceService):
"collect_default": True}
}

def __init__(self, context, num_nodes, kafka, topic, messages, new_consumer=False, settings={}):
def __init__(self, context, num_nodes, kafka, security_protocol, topic, messages, new_consumer=False, settings={}):
super(ConsumerPerformanceService, self).__init__(context, num_nodes)
self.kafka = kafka
self.security_config = SecurityConfig(security_protocol)
self.security_protocol = security_protocol
self.topic = topic
self.messages = messages
self.new_consumer = new_consumer
Expand Down Expand Up @@ -119,6 +124,7 @@ def start_cmd(self):
cmd += " /opt/kafka/bin/kafka-consumer-perf-test.sh"
for key, value in self.args.items():
cmd += " --%s %s" % (key, value)
cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE

for key, value in self.settings.items():
cmd += " %s=%s" % (str(key), str(value))
Expand All @@ -131,6 +137,8 @@ def _worker(self, idx, node):

log_config = self.render('tools_log4j.properties', log_file=ConsumerPerformanceService.LOG_FILE)
node.account.create_file(ConsumerPerformanceService.LOG4J_CONFIG, log_config)
node.account.create_file(ConsumerPerformanceService.CONFIG_FILE, str(self.security_config))
self.security_config.setup_node(node)

cmd = self.start_cmd
self.logger.debug("Consumer performance %d command: %s", idx, cmd)
Expand Down
14 changes: 12 additions & 2 deletions tests/kafkatest/services/performance/end_to_end_latency.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

from kafkatest.services.performance import PerformanceService
from kafkatest.utils.security_config import SecurityConfig


class EndToEndLatencyService(PerformanceService):
Expand All @@ -24,9 +25,11 @@ class EndToEndLatencyService(PerformanceService):
"collect_default": True},
}

def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1):
def __init__(self, context, num_nodes, kafka, security_protocol, topic, num_records, consumer_fetch_max_wait=100, acks=1):
super(EndToEndLatencyService, self).__init__(context, num_nodes)
self.kafka = kafka
self.security_config = SecurityConfig(security_protocol)
self.security_protocol = security_protocol
self.args = {
'topic': topic,
'num_records': num_records,
Expand All @@ -36,14 +39,21 @@ def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch

def _worker(self, idx, node):
args = self.args.copy()
self.security_config.setup_node(node)
if self.security_protocol == SecurityConfig.SSL:
ssl_config_file = SecurityConfig.SSL_DIR + "/security.properties"
node.account.create_file(ssl_config_file, str(self.security_config))
else:
ssl_config_file = ""
args.update({
'zk_connect': self.kafka.zk.connect_setting(),
'bootstrap_servers': self.kafka.bootstrap_servers(),
'ssl_config_file': ssl_config_file
})

cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\
"%(bootstrap_servers)s %(topic)s %(num_records)d "\
"%(acks)d 20" % args
"%(acks)d 20 %(ssl_config_file)s" % args

cmd += " | tee /mnt/end-to-end-latency.log"

Expand Down
13 changes: 10 additions & 3 deletions tests/kafkatest/services/performance/producer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

from kafkatest.services.performance import PerformanceService
from kafkatest.utils.security_config import SecurityConfig


class ProducerPerformanceService(PerformanceService):
Expand All @@ -24,9 +25,11 @@ class ProducerPerformanceService(PerformanceService):
"collect_default": True},
}

def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False):
def __init__(self, context, num_nodes, kafka, security_protocol, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False):
super(ProducerPerformanceService, self).__init__(context, num_nodes)
self.kafka = kafka
self.security_config = SecurityConfig(security_protocol)
self.security_protocol = security_protocol
self.args = {
'topic': topic,
'num_records': num_records,
Expand All @@ -40,11 +43,15 @@ def _worker(self, idx, node):
args = self.args.copy()
args.update({'bootstrap_servers': self.kafka.bootstrap_servers()})
cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance "\
"%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s"\
" | tee /mnt/producer-performance.log" % args
"%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s" % args

self.security_config.setup_node(node)
if self.security_protocol == SecurityConfig.SSL:
self.settings.update(self.security_config.properties)
for key, value in self.settings.items():
cmd += " %s=%s" % (str(key), str(value))
cmd += " | tee /mnt/producer-performance.log"

self.logger.debug("Producer performance %d command: %s", idx, cmd)

def parse_stats(line):
Expand Down
17 changes: 17 additions & 0 deletions tests/kafkatest/services/templates/kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ port=9092
#host.name=localhost
advertised.host.name={{ node.account.hostname }}
#advertised.port=<port accessible by clients>
{% if security_protocol == interbroker_security_protocol %}
listeners={{ security_protocol }}://:{{ port }}
advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:{{ port }}
{% else %}
listeners=PLAINTEXT://:9092,SSL://:9093
advertised.listeners=PLAINTEXT://{{ node.account.hostname }}:9092,SSL://{{ node.account.hostname }}:9093
{% endif %}
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
Expand All @@ -39,3 +46,13 @@ log.cleaner.enable=false

zookeeper.connect={{ zk.connect_setting() }}
zookeeper.connection.timeout.ms=2000

security.inter.broker.protocol={{ interbroker_security_protocol }}
ssl.keystore.location=/mnt/ssl/test.keystore.jks
ssl.keystore.password=test-ks-passwd
ssl.key.password=test-key-passwd
ssl.keystore.type=JKS
ssl.truststore.location=/mnt/ssl/test.truststore.jks
ssl.truststore.password=test-ts-passwd
ssl.truststore.type=JKS

Loading