From 5e7db743a49fa332a443c41aa1015a8a4f446e4e Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 11 Mar 2016 12:19:24 -0800 Subject: [PATCH 1/8] KAFKA-3382: Add system test for ReplicationVerificationTool --- .../services/replica_verification_tool.py | 82 +++++++++++++++++ .../kafkatest/services/verifiable_producer.py | 6 +- .../tests/replica_verification_test.py | 88 +++++++++++++++++++ 3 files changed, 175 insertions(+), 1 deletion(-) create mode 100644 tests/kafkatest/services/replica_verification_tool.py create mode 100644 tests/kafkatest/tests/replica_verification_test.py diff --git a/tests/kafkatest/services/replica_verification_tool.py b/tests/kafkatest/services/replica_verification_tool.py new file mode 100644 index 0000000000000..b6d774a83c832 --- /dev/null +++ b/tests/kafkatest/services/replica_verification_tool.py @@ -0,0 +1,82 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.background_thread import BackgroundThreadService + +from kafkatest.services.kafka.directory import kafka_dir +from kafkatest.services.security.security_config import SecurityConfig + +import re + +class ReplicaVerificationTool(BackgroundThreadService): + + logs = { + "producer_log": { + "path": "/mnt/replica_verification_tool.log", + "collect_default": False} + } + + def __init__(self, context, num_nodes, kafka, topic, report_interval_ms, security_protocol="PLAINTEXT"): + super(ReplicaVerificationTool, self).__init__(context, num_nodes) + + self.kafka = kafka + self.topic = topic + self.report_interval_ms = report_interval_ms + self.security_protocol = security_protocol + self.security_config = SecurityConfig(security_protocol) + self.lagged_values = [] + self.zero_lagged_values = [] + + def _worker(self, idx, node): + cmd = self.start_cmd(node) + self.logger.debug("ReplicaVerificationTool %d command: %s" % (idx, cmd)) + self.security_config.setup_node(node) + for line in node.account.ssh_capture(cmd): + lag = re.search('.*max lag is (.+?) for partition', line) + if lag: + if int(lag.group(1)) > 0: + self.logger.info("Appending to lags: %s" % line) + self.lagged_values.append(line) + else: + self.logger.info("Appending to zero lags: %s" % line) + self.zero_lagged_values.append(line) + + def num_lags(self): + return len(self.lagged_values) + + def num_zero_lags(self): + return len(self.zero_lagged_values) + + def start_cmd(self, node): + cmd = "/opt/%s/bin/" % kafka_dir(node) + cmd += "kafka-run-class.sh kafka.tools.ReplicaVerificationTool" + cmd += " --broker-list %s --topic-white-list %s --time -2 --report-interval-ms %s" % (self.kafka.bootstrap_servers(self.security_protocol), self.topic, self.report_interval_ms) + + cmd += " 2>> /mnt/replica_verification_tool.log | tee -a /mnt/replica_verification_tool.log &" + return cmd + + def stop_node(self, node): + node.account.kill_process("ReplicaVerificationTool", allow_fail=False) + if self.worker_threads is None: + return + + # block until the corresponding thread exits + if len(self.worker_threads) >= self.idx(node): + # Need to guard this because stop is preemptively called before the worker threads are added and started + self.worker_threads[self.idx(node) - 1].join() + + def clean_node(self, node): + node.account.kill_process("ReplicaVerificationTool", clean_shutdown=False, allow_fail=False) + node.account.ssh("rm -rf /mnt/replica_verification_tool.log", allow_fail=False) diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index 414da84ad9e64..3a2b26459452f 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -43,7 +43,7 @@ class VerifiableProducer(BackgroundThreadService): } def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000, - message_validator=is_int, compression_types=None, version=TRUNK): + message_validator=is_int, compression_types=None, version=TRUNK, acks=None): """ :param max_messages is a number of messages to be produced per producer :param message_validator checks for an expected format of messages produced. There are @@ -71,6 +71,7 @@ def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput self.acked_values = [] self.not_acked_values = [] self.produced_count = {} + self.acks = acks @property @@ -96,6 +97,9 @@ def _worker(self, idx, node): # Create and upload config file producer_prop_file = self.prop_file(node) + if self.acks is not None: + self.logger.info("VerifiableProducer (index = %d) will use acks = %s", idx, self.acks) + producer_prop_file += "\nacks=%s\n" % self.acks self.logger.info("verifiable_producer.properties:") self.logger.info(producer_prop_file) node.account.create_file(VerifiableProducer.CONFIG_FILE, producer_prop_file) diff --git a/tests/kafkatest/tests/replica_verification_test.py b/tests/kafkatest/tests/replica_verification_test.py new file mode 100644 index 0000000000000..4765bdef7d51d --- /dev/null +++ b/tests/kafkatest/tests/replica_verification_test.py @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ducktape.utils.util import wait_until +from ducktape.tests.test import Test +from kafkatest.services.verifiable_producer import VerifiableProducer + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.services.replica_verification_tool import ReplicaVerificationTool + +TOPIC = "topic-replica-verification" +REPORT_INTERVAL_MS = 1000 + +class ReplicaVerificationToolTest(Test): + """ + Tests ReplicaVerificationTool + """ + def __init__(self, test_context): + super(ReplicaVerificationToolTest, self).__init__(test_context) + self.num_zk = 1 + self.num_brokers = 2 + self.messages_received_count = 0 + self.topics = { + TOPIC: {'partitions': 1, 'replication-factor': 2} + } + + self.zk = ZookeeperService(test_context, self.num_zk) + self.kafka = None + self.producer = None + self.replica_verifier = None + + def setUp(self): + self.zk.start() + + def start_kafka(self, security_protocol, interbroker_security_protocol): + self.kafka = KafkaService( + self.test_context, self.num_brokers, + self.zk, security_protocol=security_protocol, + interbroker_security_protocol=interbroker_security_protocol, topics=self.topics) + self.kafka.start() + + def start_replica_verification_tool(self, security_protocol): + self.replica_verifier = ReplicaVerificationTool(self.test_context, 1, self.kafka, TOPIC, report_interval_ms=REPORT_INTERVAL_MS, security_protocol=security_protocol) + self.replica_verifier.start() + + def start_producer(self, max_messages, acks, timeout): + # This will produce to kafka cluster + self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, acks=acks, max_messages=max_messages) + current_acked = self.producer.num_acked + self.logger.info("current_acked = %s" % current_acked) + self.producer.start() + wait_until(lambda: acks == 0 or self.producer.num_acked >= current_acked + max_messages, timeout_sec=timeout, + err_msg="Timeout awaiting messages to be produced and acked") + + def stop_producer(self): + self.producer.stop() + + def test_replica_lags(self, security_protocol='PLAINTEXT'): + """ + Tests ReplicaVerificationTool + :return: None + """ + self.start_kafka(security_protocol, security_protocol) + self.start_replica_verification_tool(security_protocol) + self.start_producer(max_messages=10, acks=-1, timeout=15) + # Verify that there is no lag in replicas and is correctly reported by ReplicaVerificationTool + wait_until(lambda: self.replica_verifier.num_lags() == 0 and self.replica_verifier.num_zero_lags() > 0, timeout_sec=10, + err_msg="Timed out waiting to reach expected num of replica lags.") + self.stop_producer() + + self.start_producer(max_messages=1000, acks=0, timeout=5) + # Verify that there is no lag in replicas and is correctly reported by ReplicaVerificationTool + wait_until(lambda: self.replica_verifier.num_lags() > 0, timeout_sec=10, + err_msg="Timed out waiting to reach expected num of replica lags.") \ No newline at end of file From b0a68b67963d289843935a8f65ab41510590f35e Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Wed, 30 Mar 2016 16:29:03 -0700 Subject: [PATCH 2/8] Address review feedback. --- .../services/replica_verification_tool.py | 40 ++++++++----------- .../kafkatest/services/verifiable_producer.py | 2 + .../tests/replica_verification_test.py | 11 ++--- 3 files changed, 24 insertions(+), 29 deletions(-) diff --git a/tests/kafkatest/services/replica_verification_tool.py b/tests/kafkatest/services/replica_verification_tool.py index b6d774a83c832..7f8428f82a53d 100644 --- a/tests/kafkatest/services/replica_verification_tool.py +++ b/tests/kafkatest/services/replica_verification_tool.py @@ -36,28 +36,30 @@ def __init__(self, context, num_nodes, kafka, topic, report_interval_ms, securit self.report_interval_ms = report_interval_ms self.security_protocol = security_protocol self.security_config = SecurityConfig(security_protocol) - self.lagged_values = [] - self.zero_lagged_values = [] + self.partition_max_lag = {} def _worker(self, idx, node): cmd = self.start_cmd(node) self.logger.debug("ReplicaVerificationTool %d command: %s" % (idx, cmd)) self.security_config.setup_node(node) for line in node.account.ssh_capture(cmd): - lag = re.search('.*max lag is (.+?) for partition', line) - if lag: - if int(lag.group(1)) > 0: - self.logger.info("Appending to lags: %s" % line) - self.lagged_values.append(line) + parsed = re.search('.*max lag is (.+?) for partition \[(.+?)\] at', line) + if parsed: + lag = int(parsed.group(1)) + topic_partition = parsed.group(2) + if self.partition_max_lag.get(topic_partition) is None: + self.logger.debug("Setting max lag for {} as {}".format(topic_partition, lag)) + self.partition_max_lag[topic_partition] = lag else: - self.logger.info("Appending to zero lags: %s" % line) - self.zero_lagged_values.append(line) + if self.partition_max_lag[topic_partition] < lag: + self.logger.debug("Updating max lag for {} as {}".format(topic_partition, lag)) + self.partition_max_lag[topic_partition] = lag - def num_lags(self): - return len(self.lagged_values) + def get_max_lag_for_partition(self, topic, partition): + return self.partition_max_lag[topic + ',' + str(partition)] - def num_zero_lags(self): - return len(self.zero_lagged_values) + def reset_partition_lags(self): + self.partition_max_lag.clear() def start_cmd(self, node): cmd = "/opt/%s/bin/" % kafka_dir(node) @@ -67,16 +69,6 @@ def start_cmd(self, node): cmd += " 2>> /mnt/replica_verification_tool.log | tee -a /mnt/replica_verification_tool.log &" return cmd - def stop_node(self, node): - node.account.kill_process("ReplicaVerificationTool", allow_fail=False) - if self.worker_threads is None: - return - - # block until the corresponding thread exits - if len(self.worker_threads) >= self.idx(node): - # Need to guard this because stop is preemptively called before the worker threads are added and started - self.worker_threads[self.idx(node) - 1].join() - def clean_node(self, node): - node.account.kill_process("ReplicaVerificationTool", clean_shutdown=False, allow_fail=False) + node.account.kill_process("java", clean_shutdown=False, allow_fail=False) node.account.ssh("rm -rf /mnt/replica_verification_tool.log", allow_fail=False) diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index 3a2b26459452f..500410f134f10 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -160,6 +160,8 @@ def start_cmd(self, node, idx): cmd += " --throughput %s" % str(self.throughput) if self.message_validator == is_int_with_prefix: cmd += " --value-prefix %s" % str(idx) + if self.acks is not None: + cmd += " --acks %s\n" % str(self.acks) cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE) diff --git a/tests/kafkatest/tests/replica_verification_test.py b/tests/kafkatest/tests/replica_verification_test.py index 4765bdef7d51d..6211739e61d45 100644 --- a/tests/kafkatest/tests/replica_verification_test.py +++ b/tests/kafkatest/tests/replica_verification_test.py @@ -78,11 +78,12 @@ def test_replica_lags(self, security_protocol='PLAINTEXT'): self.start_replica_verification_tool(security_protocol) self.start_producer(max_messages=10, acks=-1, timeout=15) # Verify that there is no lag in replicas and is correctly reported by ReplicaVerificationTool - wait_until(lambda: self.replica_verifier.num_lags() == 0 and self.replica_verifier.num_zero_lags() > 0, timeout_sec=10, - err_msg="Timed out waiting to reach expected num of replica lags.") + wait_until(lambda: self.replica_verifier.get_max_lag_for_partition(TOPIC, 0) == 0, timeout_sec=10, + err_msg="Timed out waiting to reach zero replica lags.") self.stop_producer() + self.replica_verifier.reset_partition_lags() self.start_producer(max_messages=1000, acks=0, timeout=5) - # Verify that there is no lag in replicas and is correctly reported by ReplicaVerificationTool - wait_until(lambda: self.replica_verifier.num_lags() > 0, timeout_sec=10, - err_msg="Timed out waiting to reach expected num of replica lags.") \ No newline at end of file + # Verify that there is lag in replicas and is correctly reported by ReplicaVerificationTool + wait_until(lambda: self.replica_verifier.get_max_lag_for_partition(TOPIC, 0) > 0, timeout_sec=10, + err_msg="Timed out waiting to reach non-zero number of replica lags.") \ No newline at end of file From 1af30f6b85caf11748c399a0c4c73436ba2d69fc Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Wed, 30 Mar 2016 16:33:51 -0700 Subject: [PATCH 3/8] Revert unwanted change. --- tests/kafkatest/services/replica_verification_tool.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/kafkatest/services/replica_verification_tool.py b/tests/kafkatest/services/replica_verification_tool.py index 7f8428f82a53d..de3b229e1f4e1 100644 --- a/tests/kafkatest/services/replica_verification_tool.py +++ b/tests/kafkatest/services/replica_verification_tool.py @@ -70,5 +70,5 @@ def start_cmd(self, node): return cmd def clean_node(self, node): - node.account.kill_process("java", clean_shutdown=False, allow_fail=False) - node.account.ssh("rm -rf /mnt/replica_verification_tool.log", allow_fail=False) + node.account.kill_process("ReplicaVerificationTool", clean_shutdown=False, allow_fail=False) + node.account.ssh("rm -rf /mnt/replica_verification_tool.log", allow_fail=False) \ No newline at end of file From b2a513df3044c16ca0020ab302287749941b08f6 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 4 Apr 2016 11:22:22 -0700 Subject: [PATCH 4/8] Track latest lag, instead of max lag. --- .../services/replica_verification_tool.py | 18 +++++------------- .../tests/replica_verification_test.py | 5 ++--- 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/tests/kafkatest/services/replica_verification_tool.py b/tests/kafkatest/services/replica_verification_tool.py index de3b229e1f4e1..06f280b8bce94 100644 --- a/tests/kafkatest/services/replica_verification_tool.py +++ b/tests/kafkatest/services/replica_verification_tool.py @@ -36,7 +36,7 @@ def __init__(self, context, num_nodes, kafka, topic, report_interval_ms, securit self.report_interval_ms = report_interval_ms self.security_protocol = security_protocol self.security_config = SecurityConfig(security_protocol) - self.partition_max_lag = {} + self.partition_lag = {} def _worker(self, idx, node): cmd = self.start_cmd(node) @@ -47,19 +47,11 @@ def _worker(self, idx, node): if parsed: lag = int(parsed.group(1)) topic_partition = parsed.group(2) - if self.partition_max_lag.get(topic_partition) is None: - self.logger.debug("Setting max lag for {} as {}".format(topic_partition, lag)) - self.partition_max_lag[topic_partition] = lag - else: - if self.partition_max_lag[topic_partition] < lag: - self.logger.debug("Updating max lag for {} as {}".format(topic_partition, lag)) - self.partition_max_lag[topic_partition] = lag + self.logger.debug("Setting max lag for {} as {}".format(topic_partition, lag)) + self.partition_lag[topic_partition] = lag - def get_max_lag_for_partition(self, topic, partition): - return self.partition_max_lag[topic + ',' + str(partition)] - - def reset_partition_lags(self): - self.partition_max_lag.clear() + def get_lag_for_partition(self, topic, partition): + return self.partition_lag[topic + ',' + str(partition)] def start_cmd(self, node): cmd = "/opt/%s/bin/" % kafka_dir(node) diff --git a/tests/kafkatest/tests/replica_verification_test.py b/tests/kafkatest/tests/replica_verification_test.py index 6211739e61d45..1b625e94db1f2 100644 --- a/tests/kafkatest/tests/replica_verification_test.py +++ b/tests/kafkatest/tests/replica_verification_test.py @@ -78,12 +78,11 @@ def test_replica_lags(self, security_protocol='PLAINTEXT'): self.start_replica_verification_tool(security_protocol) self.start_producer(max_messages=10, acks=-1, timeout=15) # Verify that there is no lag in replicas and is correctly reported by ReplicaVerificationTool - wait_until(lambda: self.replica_verifier.get_max_lag_for_partition(TOPIC, 0) == 0, timeout_sec=10, + wait_until(lambda: self.replica_verifier.get_lag_for_partition(TOPIC, 0) == 0, timeout_sec=10, err_msg="Timed out waiting to reach zero replica lags.") self.stop_producer() - self.replica_verifier.reset_partition_lags() self.start_producer(max_messages=1000, acks=0, timeout=5) # Verify that there is lag in replicas and is correctly reported by ReplicaVerificationTool - wait_until(lambda: self.replica_verifier.get_max_lag_for_partition(TOPIC, 0) > 0, timeout_sec=10, + wait_until(lambda: self.replica_verifier.get_lag_for_partition(TOPIC, 0) > 0, timeout_sec=10, err_msg="Timed out waiting to reach non-zero number of replica lags.") \ No newline at end of file From 71bd055a9a66c856f3cf0b08dc1593bb99dd3a15 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 4 Apr 2016 11:24:35 -0700 Subject: [PATCH 5/8] Move replica_verification_test under tools --- .../tests/tools/replica_verification_test.py | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 tests/kafkatest/tests/tools/replica_verification_test.py diff --git a/tests/kafkatest/tests/tools/replica_verification_test.py b/tests/kafkatest/tests/tools/replica_verification_test.py new file mode 100644 index 0000000000000..1b625e94db1f2 --- /dev/null +++ b/tests/kafkatest/tests/tools/replica_verification_test.py @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ducktape.utils.util import wait_until +from ducktape.tests.test import Test +from kafkatest.services.verifiable_producer import VerifiableProducer + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.services.replica_verification_tool import ReplicaVerificationTool + +TOPIC = "topic-replica-verification" +REPORT_INTERVAL_MS = 1000 + +class ReplicaVerificationToolTest(Test): + """ + Tests ReplicaVerificationTool + """ + def __init__(self, test_context): + super(ReplicaVerificationToolTest, self).__init__(test_context) + self.num_zk = 1 + self.num_brokers = 2 + self.messages_received_count = 0 + self.topics = { + TOPIC: {'partitions': 1, 'replication-factor': 2} + } + + self.zk = ZookeeperService(test_context, self.num_zk) + self.kafka = None + self.producer = None + self.replica_verifier = None + + def setUp(self): + self.zk.start() + + def start_kafka(self, security_protocol, interbroker_security_protocol): + self.kafka = KafkaService( + self.test_context, self.num_brokers, + self.zk, security_protocol=security_protocol, + interbroker_security_protocol=interbroker_security_protocol, topics=self.topics) + self.kafka.start() + + def start_replica_verification_tool(self, security_protocol): + self.replica_verifier = ReplicaVerificationTool(self.test_context, 1, self.kafka, TOPIC, report_interval_ms=REPORT_INTERVAL_MS, security_protocol=security_protocol) + self.replica_verifier.start() + + def start_producer(self, max_messages, acks, timeout): + # This will produce to kafka cluster + self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, acks=acks, max_messages=max_messages) + current_acked = self.producer.num_acked + self.logger.info("current_acked = %s" % current_acked) + self.producer.start() + wait_until(lambda: acks == 0 or self.producer.num_acked >= current_acked + max_messages, timeout_sec=timeout, + err_msg="Timeout awaiting messages to be produced and acked") + + def stop_producer(self): + self.producer.stop() + + def test_replica_lags(self, security_protocol='PLAINTEXT'): + """ + Tests ReplicaVerificationTool + :return: None + """ + self.start_kafka(security_protocol, security_protocol) + self.start_replica_verification_tool(security_protocol) + self.start_producer(max_messages=10, acks=-1, timeout=15) + # Verify that there is no lag in replicas and is correctly reported by ReplicaVerificationTool + wait_until(lambda: self.replica_verifier.get_lag_for_partition(TOPIC, 0) == 0, timeout_sec=10, + err_msg="Timed out waiting to reach zero replica lags.") + self.stop_producer() + + self.start_producer(max_messages=1000, acks=0, timeout=5) + # Verify that there is lag in replicas and is correctly reported by ReplicaVerificationTool + wait_until(lambda: self.replica_verifier.get_lag_for_partition(TOPIC, 0) > 0, timeout_sec=10, + err_msg="Timed out waiting to reach non-zero number of replica lags.") \ No newline at end of file From 9c10c6767ef7c54a049b0cf808cd65786a3e438d Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 4 Apr 2016 12:49:43 -0700 Subject: [PATCH 6/8] Kill ReplicaVerificationTool during node cleanup --- .../services/replica_verification_tool.py | 9 +- .../tests/replica_verification_test.py | 88 ------------------- 2 files changed, 7 insertions(+), 90 deletions(-) delete mode 100644 tests/kafkatest/tests/replica_verification_test.py diff --git a/tests/kafkatest/services/replica_verification_tool.py b/tests/kafkatest/services/replica_verification_tool.py index 06f280b8bce94..1c75326a097ec 100644 --- a/tests/kafkatest/services/replica_verification_tool.py +++ b/tests/kafkatest/services/replica_verification_tool.py @@ -43,6 +43,8 @@ def _worker(self, idx, node): self.logger.debug("ReplicaVerificationTool %d command: %s" % (idx, cmd)) self.security_config.setup_node(node) for line in node.account.ssh_capture(cmd): + self.logger.debug("Parsing line:{}".format(line)) + parsed = re.search('.*max lag is (.+?) for partition \[(.+?)\] at', line) if parsed: lag = int(parsed.group(1)) @@ -51,7 +53,10 @@ def _worker(self, idx, node): self.partition_lag[topic_partition] = lag def get_lag_for_partition(self, topic, partition): - return self.partition_lag[topic + ',' + str(partition)] + topic_partition = topic + ',' + str(partition) + lag = self.partition_lag[topic_partition] + self.logger.debug("Retuning lag for {} as {}".format(topic_partition, lag)) + return lag def start_cmd(self, node): cmd = "/opt/%s/bin/" % kafka_dir(node) @@ -62,5 +67,5 @@ def start_cmd(self, node): return cmd def clean_node(self, node): - node.account.kill_process("ReplicaVerificationTool", clean_shutdown=False, allow_fail=False) + node.account.kill_process("java", allow_fail=True) node.account.ssh("rm -rf /mnt/replica_verification_tool.log", allow_fail=False) \ No newline at end of file diff --git a/tests/kafkatest/tests/replica_verification_test.py b/tests/kafkatest/tests/replica_verification_test.py deleted file mode 100644 index 1b625e94db1f2..0000000000000 --- a/tests/kafkatest/tests/replica_verification_test.py +++ /dev/null @@ -1,88 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ducktape.utils.util import wait_until -from ducktape.tests.test import Test -from kafkatest.services.verifiable_producer import VerifiableProducer - -from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService -from kafkatest.services.replica_verification_tool import ReplicaVerificationTool - -TOPIC = "topic-replica-verification" -REPORT_INTERVAL_MS = 1000 - -class ReplicaVerificationToolTest(Test): - """ - Tests ReplicaVerificationTool - """ - def __init__(self, test_context): - super(ReplicaVerificationToolTest, self).__init__(test_context) - self.num_zk = 1 - self.num_brokers = 2 - self.messages_received_count = 0 - self.topics = { - TOPIC: {'partitions': 1, 'replication-factor': 2} - } - - self.zk = ZookeeperService(test_context, self.num_zk) - self.kafka = None - self.producer = None - self.replica_verifier = None - - def setUp(self): - self.zk.start() - - def start_kafka(self, security_protocol, interbroker_security_protocol): - self.kafka = KafkaService( - self.test_context, self.num_brokers, - self.zk, security_protocol=security_protocol, - interbroker_security_protocol=interbroker_security_protocol, topics=self.topics) - self.kafka.start() - - def start_replica_verification_tool(self, security_protocol): - self.replica_verifier = ReplicaVerificationTool(self.test_context, 1, self.kafka, TOPIC, report_interval_ms=REPORT_INTERVAL_MS, security_protocol=security_protocol) - self.replica_verifier.start() - - def start_producer(self, max_messages, acks, timeout): - # This will produce to kafka cluster - self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, acks=acks, max_messages=max_messages) - current_acked = self.producer.num_acked - self.logger.info("current_acked = %s" % current_acked) - self.producer.start() - wait_until(lambda: acks == 0 or self.producer.num_acked >= current_acked + max_messages, timeout_sec=timeout, - err_msg="Timeout awaiting messages to be produced and acked") - - def stop_producer(self): - self.producer.stop() - - def test_replica_lags(self, security_protocol='PLAINTEXT'): - """ - Tests ReplicaVerificationTool - :return: None - """ - self.start_kafka(security_protocol, security_protocol) - self.start_replica_verification_tool(security_protocol) - self.start_producer(max_messages=10, acks=-1, timeout=15) - # Verify that there is no lag in replicas and is correctly reported by ReplicaVerificationTool - wait_until(lambda: self.replica_verifier.get_lag_for_partition(TOPIC, 0) == 0, timeout_sec=10, - err_msg="Timed out waiting to reach zero replica lags.") - self.stop_producer() - - self.start_producer(max_messages=1000, acks=0, timeout=5) - # Verify that there is lag in replicas and is correctly reported by ReplicaVerificationTool - wait_until(lambda: self.replica_verifier.get_lag_for_partition(TOPIC, 0) > 0, timeout_sec=10, - err_msg="Timed out waiting to reach non-zero number of replica lags.") \ No newline at end of file From e86769e2d2e1e730916e6794fd15c6e7fc72ed7c Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 21 Apr 2016 16:30:42 -0700 Subject: [PATCH 7/8] Add stop_node to replica_verification_tool --- .../kafkatest/services/replica_verification_tool.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tests/kafkatest/services/replica_verification_tool.py b/tests/kafkatest/services/replica_verification_tool.py index 1c75326a097ec..ef025bcae388b 100644 --- a/tests/kafkatest/services/replica_verification_tool.py +++ b/tests/kafkatest/services/replica_verification_tool.py @@ -52,6 +52,13 @@ def _worker(self, idx, node): self.logger.debug("Setting max lag for {} as {}".format(topic_partition, lag)) self.partition_lag[topic_partition] = lag + """ + Get latest lag for given topic-partition + + Args: + topic: a topic + partition: a partition of the topic + """ def get_lag_for_partition(self, topic, partition): topic_partition = topic + ',' + str(partition) lag = self.partition_lag[topic_partition] @@ -66,6 +73,9 @@ def start_cmd(self, node): cmd += " 2>> /mnt/replica_verification_tool.log | tee -a /mnt/replica_verification_tool.log &" return cmd + def stop_node(self, node): + node.account.kill_process("java", clean_shutdown=True, allow_fail=True) + def clean_node(self, node): - node.account.kill_process("java", allow_fail=True) + node.account.kill_process("java", clean_shutdown=False, allow_fail=True) node.account.ssh("rm -rf /mnt/replica_verification_tool.log", allow_fail=False) \ No newline at end of file From 11c084c34c88f64f57a30445d093b1be5110f13f Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 22 Apr 2016 15:34:49 -0700 Subject: [PATCH 8/8] Move doc string inside method, python way. --- .../services/replica_verification_tool.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/kafkatest/services/replica_verification_tool.py b/tests/kafkatest/services/replica_verification_tool.py index ef025bcae388b..f6374fbcc259e 100644 --- a/tests/kafkatest/services/replica_verification_tool.py +++ b/tests/kafkatest/services/replica_verification_tool.py @@ -52,14 +52,14 @@ def _worker(self, idx, node): self.logger.debug("Setting max lag for {} as {}".format(topic_partition, lag)) self.partition_lag[topic_partition] = lag - """ - Get latest lag for given topic-partition - - Args: - topic: a topic - partition: a partition of the topic - """ def get_lag_for_partition(self, topic, partition): + """ + Get latest lag for given topic-partition + + Args: + topic: a topic + partition: a partition of the topic + """ topic_partition = topic + ',' + str(partition) lag = self.partition_lag[topic_partition] self.logger.debug("Retuning lag for {} as {}".format(topic_partition, lag))