Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions tests/kafkatest/services/replica_verification_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ducktape.services.background_thread import BackgroundThreadService

from kafkatest.services.kafka.directory import kafka_dir
from kafkatest.services.security.security_config import SecurityConfig

import re

class ReplicaVerificationTool(BackgroundThreadService):

logs = {
"producer_log": {
"path": "/mnt/replica_verification_tool.log",
"collect_default": False}
}

def __init__(self, context, num_nodes, kafka, topic, report_interval_ms, security_protocol="PLAINTEXT"):
super(ReplicaVerificationTool, self).__init__(context, num_nodes)

self.kafka = kafka
self.topic = topic
self.report_interval_ms = report_interval_ms
self.security_protocol = security_protocol
self.security_config = SecurityConfig(security_protocol)
self.partition_lag = {}

def _worker(self, idx, node):
cmd = self.start_cmd(node)
self.logger.debug("ReplicaVerificationTool %d command: %s" % (idx, cmd))
self.security_config.setup_node(node)
for line in node.account.ssh_capture(cmd):
self.logger.debug("Parsing line:{}".format(line))

parsed = re.search('.*max lag is (.+?) for partition \[(.+?)\] at', line)
if parsed:
lag = int(parsed.group(1))
topic_partition = parsed.group(2)
self.logger.debug("Setting max lag for {} as {}".format(topic_partition, lag))
self.partition_lag[topic_partition] = lag

def get_lag_for_partition(self, topic, partition):
"""
Get latest lag for given topic-partition

Args:
topic: a topic
partition: a partition of the topic
"""
topic_partition = topic + ',' + str(partition)
lag = self.partition_lag[topic_partition]
self.logger.debug("Retuning lag for {} as {}".format(topic_partition, lag))
return lag

def start_cmd(self, node):
cmd = "/opt/%s/bin/" % kafka_dir(node)
cmd += "kafka-run-class.sh kafka.tools.ReplicaVerificationTool"
cmd += " --broker-list %s --topic-white-list %s --time -2 --report-interval-ms %s" % (self.kafka.bootstrap_servers(self.security_protocol), self.topic, self.report_interval_ms)

cmd += " 2>> /mnt/replica_verification_tool.log | tee -a /mnt/replica_verification_tool.log &"
return cmd

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there a stop_node method here before? It's a good idea to have both stop_node and clean_node defined.

Or, if you expect the process to actually be gone by the time we hit the "stop" portion of the service lifecycle (stop_node is the "graceful shutdown" option), it's helpful to do a check that the process is actually gone (i.e. check that the actual state matches our expectations of what the state should be) and at least log a warning if the process is still running

def stop_node(self, node):
node.account.kill_process("java", clean_shutdown=True, allow_fail=True)

def clean_node(self, node):
node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf /mnt/replica_verification_tool.log", allow_fail=False)
8 changes: 7 additions & 1 deletion tests/kafkatest/services/verifiable_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class VerifiableProducer(BackgroundThreadService):
}

def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000,
message_validator=is_int, compression_types=None, version=TRUNK):
message_validator=is_int, compression_types=None, version=TRUNK, acks=None):
"""
:param max_messages is a number of messages to be produced per producer
:param message_validator checks for an expected format of messages produced. There are
Expand Down Expand Up @@ -71,6 +71,7 @@ def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput
self.acked_values = []
self.not_acked_values = []
self.produced_count = {}
self.acks = acks


@property
Expand All @@ -96,6 +97,9 @@ def _worker(self, idx, node):

# Create and upload config file
producer_prop_file = self.prop_file(node)
if self.acks is not None:
self.logger.info("VerifiableProducer (index = %d) will use acks = %s", idx, self.acks)
producer_prop_file += "\nacks=%s\n" % self.acks
self.logger.info("verifiable_producer.properties:")
self.logger.info(producer_prop_file)
node.account.create_file(VerifiableProducer.CONFIG_FILE, producer_prop_file)
Expand Down Expand Up @@ -156,6 +160,8 @@ def start_cmd(self, node, idx):
cmd += " --throughput %s" % str(self.throughput)
if self.message_validator == is_int_with_prefix:
cmd += " --value-prefix %s" % str(idx)
if self.acks is not None:
cmd += " --acks %s\n" % str(self.acks)

cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)
Expand Down
88 changes: 88 additions & 0 deletions tests/kafkatest/tests/tools/replica_verification_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from ducktape.utils.util import wait_until
from ducktape.tests.test import Test
from kafkatest.services.verifiable_producer import VerifiableProducer

from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.replica_verification_tool import ReplicaVerificationTool

TOPIC = "topic-replica-verification"
REPORT_INTERVAL_MS = 1000

class ReplicaVerificationToolTest(Test):
"""
Tests ReplicaVerificationTool
"""
def __init__(self, test_context):
super(ReplicaVerificationToolTest, self).__init__(test_context)
self.num_zk = 1
self.num_brokers = 2
self.messages_received_count = 0
self.topics = {
TOPIC: {'partitions': 1, 'replication-factor': 2}
}

self.zk = ZookeeperService(test_context, self.num_zk)
self.kafka = None
self.producer = None
self.replica_verifier = None

def setUp(self):
self.zk.start()

def start_kafka(self, security_protocol, interbroker_security_protocol):
self.kafka = KafkaService(
self.test_context, self.num_brokers,
self.zk, security_protocol=security_protocol,
interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
self.kafka.start()

def start_replica_verification_tool(self, security_protocol):
self.replica_verifier = ReplicaVerificationTool(self.test_context, 1, self.kafka, TOPIC, report_interval_ms=REPORT_INTERVAL_MS, security_protocol=security_protocol)
self.replica_verifier.start()

def start_producer(self, max_messages, acks, timeout):
# This will produce to kafka cluster
self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, acks=acks, max_messages=max_messages)
current_acked = self.producer.num_acked
self.logger.info("current_acked = %s" % current_acked)
self.producer.start()
wait_until(lambda: acks == 0 or self.producer.num_acked >= current_acked + max_messages, timeout_sec=timeout,
err_msg="Timeout awaiting messages to be produced and acked")

def stop_producer(self):
self.producer.stop()

def test_replica_lags(self, security_protocol='PLAINTEXT'):
"""
Tests ReplicaVerificationTool
:return: None
"""
self.start_kafka(security_protocol, security_protocol)
self.start_replica_verification_tool(security_protocol)
self.start_producer(max_messages=10, acks=-1, timeout=15)
# Verify that there is no lag in replicas and is correctly reported by ReplicaVerificationTool
wait_until(lambda: self.replica_verifier.get_lag_for_partition(TOPIC, 0) == 0, timeout_sec=10,
err_msg="Timed out waiting to reach zero replica lags.")
self.stop_producer()

self.start_producer(max_messages=1000, acks=0, timeout=5)
# Verify that there is lag in replicas and is correctly reported by ReplicaVerificationTool
wait_until(lambda: self.replica_verifier.get_lag_for_partition(TOPIC, 0) > 0, timeout_sec=10,
err_msg="Timed out waiting to reach non-zero number of replica lags.")