diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index f7d3c60ba300f..8e8169aec3d1b 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -33,9 +33,9 @@ LABEL ducker.creator=$ducker_creator # Update Linux and install necessary utilities. # we have to install git since it is included in openjdk:8 but not openjdk:11 -RUN apt update && apt install -y sudo git netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python-pip python-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean -RUN python -m pip install -U pip==9.0.3; -RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip install --upgrade ducktape==0.7.9 +RUN apt update && apt install -y sudo git netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean +RUN python3 -m pip install -U pip==20.2.2; +RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip3 install --upgrade ducktape==0.8.0 # Set up ssh COPY ./ssh-config /root/.ssh/config diff --git a/tests/docker/ducker-ak b/tests/docker/ducker-ak index ca227917d7294..0a7604a44dfd9 100755 --- a/tests/docker/ducker-ak +++ b/tests/docker/ducker-ak @@ -270,7 +270,7 @@ setup_custom_ducktape() { docker_run ducker01 "${image_name}" local running_container="$(docker ps -f=network=ducknet -q)" must_do -v -o docker cp "${custom_ducktape}" "${running_container}:/opt/ducktape" - docker exec --user=root ducker01 bash -c 'set -x && cd /opt/kafka-dev/tests && sudo python ./setup.py develop install && cd /opt/ducktape && sudo python ./setup.py develop install' + docker exec --user=root ducker01 bash -c 'set -x && cd /opt/kafka-dev/tests && sudo python3 ./setup.py develop install && cd /opt/ducktape && sudo python3 ./setup.py develop install' [[ $? -ne 0 ]] && die "failed to install the new ducktape." must_do -v -o docker commit ducker01 "${image_name}" must_do -v docker kill "${running_container}" diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py b/tests/kafkatest/benchmarks/core/benchmark_test.py index 6bab304f3037b..5e7bddeb5d107 100644 --- a/tests/kafkatest/benchmarks/core/benchmark_test.py +++ b/tests/kafkatest/benchmarks/core/benchmark_test.py @@ -140,8 +140,8 @@ def test_long_term_producer_throughput(self, compression_type="none", # FIXME we should be generating a graph too # Try to break it into 5 blocks, but fall back to a smaller number if # there aren't even 5 elements - block_size = max(len(self.producer.stats[0]) / 5, 1) - nblocks = len(self.producer.stats[0]) / block_size + block_size = max(len(self.producer.stats[0]) // 5, 1) + nblocks = len(self.producer.stats[0]) // block_size for i in range(nblocks): subset = self.producer.stats[0][i*block_size:min((i+1)*block_size, len(self.producer.stats[0]))] diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py index acf1184e0595d..686cd42e123e5 100644 --- a/tests/kafkatest/sanity_checks/test_console_consumer.py +++ b/tests/kafkatest/sanity_checks/test_console_consumer.py @@ -16,7 +16,6 @@ import time from ducktape.mark import matrix -from ducktape.mark import parametrize from ducktape.mark.resource import cluster from ducktape.tests.test import Test from ducktape.utils.util import wait_until diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py index a008532cb1135..fde2abf1f819f 100644 --- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py +++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py @@ -39,7 +39,7 @@ def __init__(self, test_context): self.num_messages = 1000 # This will produce to source kafka cluster self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, - max_messages=self.num_messages, throughput=self.num_messages/5) + max_messages=self.num_messages, throughput=self.num_messages // 5) def setUp(self): self.zk.start() diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 47c2456ebd1cf..14b450c0871e2 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import itertools import os from ducktape.cluster.remoteaccount import RemoteCommandError diff --git a/tests/kafkatest/services/kafka/__init__.py b/tests/kafkatest/services/kafka/__init__.py index 5abbbee6029c9..5ae879db65e3c 100644 --- a/tests/kafkatest/services/kafka/__init__.py +++ b/tests/kafkatest/services/kafka/__init__.py @@ -13,6 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from kafka import KafkaService -from util import TopicPartition -from config import KafkaConfig +from .kafka import KafkaService +from .util import TopicPartition +from .config import KafkaConfig diff --git a/tests/kafkatest/services/kafka/config.py b/tests/kafkatest/services/kafka/config.py index 251a61f0a8d9b..64e96e3886aa5 100644 --- a/tests/kafkatest/services/kafka/config.py +++ b/tests/kafkatest/services/kafka/config.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import config_property +from . import config_property class KafkaConfig(dict): @@ -34,7 +34,7 @@ def __init__(self, **kwargs): # Set defaults for key, val in self.DEFAULTS.items(): - if not self.has_key(key): + if key not in self: self[key] = val def render(self): diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index e164c917a6c83..be56b859c37f5 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -23,7 +23,7 @@ from ducktape.utils.util import wait_until from ducktape.cluster.remoteaccount import RemoteCommandError -from config import KafkaConfig +from .config import KafkaConfig from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.kafka import config_property from kafkatest.services.monitor.jmx import JmxMixin @@ -678,11 +678,11 @@ def parse_describe_topic(self, topic_description): fields = line.split("\t") # ["Partition: 4", "Leader: 0"] -> ["4", "0"] - fields = map(lambda x: x.split(" ")[1], fields) + fields = list(map(lambda x: x.split(" ")[1], fields)) partitions.append( {"topic": fields[0], "partition": int(fields[1]), - "replicas": map(int, fields[3].split(','))}) + "replicas": list(map(int, fields[3].split(',')))}) return {"partitions": partitions} diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index 0c028aac7073c..96acc2cc96020 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -27,7 +27,7 @@ inter.broker.listener.name={{ interbroker_listener.name }} security.inter.broker.protocol={{ interbroker_listener.security_protocol }} {% endif %} -{% for k, v in listener_security_config.client_listener_overrides.iteritems() %} +{% for k, v in listener_security_config.client_listener_overrides.items() %} {% if listener_security_config.requires_sasl_mechanism_prefix(k) %} listener.name.{{ security_protocol.lower() }}.{{ security_config.client_sasl_mechanism.lower() }}.{{ k }}={{ v }} {% else %} @@ -36,7 +36,7 @@ listener.name.{{ security_protocol.lower() }}.{{ k }}={{ v }} {% endfor %} {% if interbroker_listener.name != security_protocol %} -{% for k, v in listener_security_config.interbroker_listener_overrides.iteritems() %} +{% for k, v in listener_security_config.interbroker_listener_overrides.items() %} {% if listener_security_config.requires_sasl_mechanism_prefix(k) %} listener.name.{{ interbroker_listener.name.lower() }}.{{ security_config.interbroker_sasl_mechanism.lower() }}.{{ k }}={{ v }} {% else %} diff --git a/tests/kafkatest/services/monitor/http.py b/tests/kafkatest/services/monitor/http.py index 88ddb2ef68f3a..0293fbd8c4f98 100644 --- a/tests/kafkatest/services/monitor/http.py +++ b/tests/kafkatest/services/monitor/http.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer +from http.server import BaseHTTPRequestHandler, HTTPServer from collections import defaultdict, namedtuple import json from threading import Thread @@ -114,7 +114,7 @@ def metrics(self, host=None, client_id=None, name=None, group=None, tags=None): Get any collected metrics that match the specified parameters, yielding each as a tuple of (key, [, ...]) values. """ - for k, values in self._http_metrics.iteritems(): + for k, values in self._http_metrics.items(): if ((host is None or host == k.host) and (client_id is None or client_id == k.client_id) and (name is None or name == k.name) and @@ -154,7 +154,7 @@ def do_POST(self): name = raw_metric['name'] group = raw_metric['group'] # Convert to tuple of pairs because dicts & lists are unhashable - tags = tuple([(k, v) for k, v in raw_metric['tags'].iteritems()]), + tags = tuple((k, v) for k, v in raw_metric['tags'].items()), value = raw_metric['value'] key = MetricKey(host=host, client_id=client_id, name=name, group=group, tags=tags) diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py index 2dcd369512dac..bff1878fada62 100644 --- a/tests/kafkatest/services/monitor/jmx.py +++ b/tests/kafkatest/services/monitor/jmx.py @@ -128,7 +128,7 @@ def read_jmx_output(self, idx, node): for name in object_attribute_names: aggregates_per_time = [] - for time_sec in xrange(start_time_sec, end_time_sec + 1): + for time_sec in range(start_time_sec, end_time_sec + 1): # assume that value is 0 if it is not read by jmx tool at the given time. This is appropriate for metrics such as bandwidth values_per_node = [time_to_stats.get(time_sec, {}).get(name, 0) for time_to_stats in self.jmx_stats] # assume that value is aggregated across nodes by sum. This is appropriate for metrics such as bandwidth diff --git a/tests/kafkatest/services/performance/__init__.py b/tests/kafkatest/services/performance/__init__.py index 9eddcaa6dbcc1..69686f750519c 100644 --- a/tests/kafkatest/services/performance/__init__.py +++ b/tests/kafkatest/services/performance/__init__.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from performance import PerformanceService, throughput, latency, compute_aggregate_throughput -from end_to_end_latency import EndToEndLatencyService -from producer_performance import ProducerPerformanceService -from consumer_performance import ConsumerPerformanceService +from .performance import PerformanceService, throughput, latency, compute_aggregate_throughput +from .end_to_end_latency import EndToEndLatencyService +from .producer_performance import ProducerPerformanceService +from .consumer_performance import ConsumerPerformanceService diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py index 9cd56ec6e096d..3c4369883040d 100644 --- a/tests/kafkatest/services/performance/producer_performance.py +++ b/tests/kafkatest/services/performance/producer_performance.py @@ -78,7 +78,7 @@ def start_cmd(self, node): 'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol), 'client_id': self.client_id, 'kafka_run_class': self.path.script("kafka-run-class.sh", node), - 'metrics_props': ' '.join(["%s=%s" % (k, v) for k, v in self.http_metrics_client_configs.iteritems()]) + 'metrics_props': ' '.join("%s=%s" % (k, v) for k, v in self.http_metrics_client_configs.items()) }) cmd = "" diff --git a/tests/kafkatest/services/security/minikdc.py b/tests/kafkatest/services/security/minikdc.py index f26b379a52a73..23327db988379 100644 --- a/tests/kafkatest/services/security/minikdc.py +++ b/tests/kafkatest/services/security/minikdc.py @@ -14,8 +14,6 @@ # limitations under the License. import os -import random -import uuid from io import open from os import remove, close from shutil import move diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py index d619630ee716e..437084ba16cc8 100644 --- a/tests/kafkatest/services/security/security_config.py +++ b/tests/kafkatest/services/security/security_config.py @@ -386,7 +386,7 @@ def props(self, prefix=''): return "" if self.has_sasl and not self.static_jaas_conf and 'sasl.jaas.config' not in self.properties: raise Exception("JAAS configuration property has not yet been initialized") - config_lines = (prefix + key + "=" + value for key, value in self.properties.iteritems()) + config_lines = (prefix + key + "=" + value for key, value in self.properties.items()) # Extra blank lines ensure this can be appended/prepended safely return "\n".join(itertools.chain([""], config_lines, [""])) diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 66be72df107aa..dc6facca26841 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -15,9 +15,8 @@ import os.path import signal -import streams_property -import consumer_property -from ducktape.cluster.remoteaccount import RemoteCommandError +from . import streams_property +from . import consumer_property from ducktape.services.service import Service from ducktape.utils.util import wait_until from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin @@ -231,7 +230,7 @@ def pids(self, node): try: pids = [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=str)] return [int(pid) for pid in pids] - except Exception, exception: + except Exception as exception: self.logger.debug(str(exception)) return [] diff --git a/tests/kafkatest/services/templates/tools_log4j.properties b/tests/kafkatest/services/templates/tools_log4j.properties index f1cf342fff422..3f83b4220a1f5 100644 --- a/tests/kafkatest/services/templates/tools_log4j.properties +++ b/tests/kafkatest/services/templates/tools_log4j.properties @@ -17,7 +17,7 @@ log4j.rootLogger = {{ log_level|default("INFO") }}, FILE {% if loggers is defined %} -{% for logger, log_level in loggers.iteritems() %} +{% for logger, log_level in loggers.items() %} log4j.logger.{{ logger }}={{ log_level }} {% endfor %} {% endif %} diff --git a/tests/kafkatest/services/trogdor/task_spec.py b/tests/kafkatest/services/trogdor/task_spec.py index ae457aebc06d0..aa5766ee81654 100644 --- a/tests/kafkatest/services/trogdor/task_spec.py +++ b/tests/kafkatest/services/trogdor/task_spec.py @@ -44,7 +44,7 @@ def to_node_names(nodes): """ node_names = [] for obj in nodes: - if isinstance(obj, basestring): + if isinstance(obj, str): node_names.append(obj) else: node_names.append(obj.name) diff --git a/tests/kafkatest/services/verifiable_client.py b/tests/kafkatest/services/verifiable_client.py index 72114a4ccdf4d..a2cbf96ef3646 100644 --- a/tests/kafkatest/services/verifiable_client.py +++ b/tests/kafkatest/services/verifiable_client.py @@ -18,7 +18,6 @@ from ducktape.cluster.remoteaccount import RemoteCommandError import importlib -import os import subprocess import signal from kafkatest.services.kafka.util import fix_opts_for_new_jvm diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py index 433823bbab1ad..93d9446fb9bba 100644 --- a/tests/kafkatest/services/verifiable_consumer.py +++ b/tests/kafkatest/services/verifiable_consumer.py @@ -361,7 +361,7 @@ def clean_node(self, node): def current_assignment(self): with self.lock: - return { handler.node: handler.current_assignment() for handler in self.event_handlers.itervalues() } + return { handler.node: handler.current_assignment() for handler in self.event_handlers.values() } def current_position(self, tp): with self.lock: @@ -372,7 +372,7 @@ def current_position(self, tp): def owner(self, tp): with self.lock: - for handler in self.event_handlers.itervalues(): + for handler in self.event_handlers.values(): if tp in handler.current_assignment(): return handler.node return None @@ -386,33 +386,33 @@ def last_commit(self, tp): def total_consumed(self): with self.lock: - return sum(handler.total_consumed for handler in self.event_handlers.itervalues()) + return sum(handler.total_consumed for handler in self.event_handlers.values()) def num_rebalances(self): with self.lock: - return max(handler.assigned_count for handler in self.event_handlers.itervalues()) + return max(handler.assigned_count for handler in self.event_handlers.values()) def num_revokes_for_alive(self, keep_alive=1): with self.lock: - return max([handler.revoked_count for handler in self.event_handlers.itervalues() - if handler.idx <= keep_alive]) + return max(handler.revoked_count for handler in self.event_handlers.values() + if handler.idx <= keep_alive) def joined_nodes(self): with self.lock: - return [handler.node for handler in self.event_handlers.itervalues() + return [handler.node for handler in self.event_handlers.values() if handler.state == ConsumerState.Joined] def rebalancing_nodes(self): with self.lock: - return [handler.node for handler in self.event_handlers.itervalues() + return [handler.node for handler in self.event_handlers.values() if handler.state == ConsumerState.Rebalancing] def dead_nodes(self): with self.lock: - return [handler.node for handler in self.event_handlers.itervalues() + return [handler.node for handler in self.event_handlers.values() if handler.state == ConsumerState.Dead] def alive_nodes(self): with self.lock: - return [handler.node for handler in self.event_handlers.itervalues() + return [handler.node for handler in self.event_handlers.values() if handler.state != ConsumerState.Dead] diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py index b8cb9b69cbd82..51f2b31de2dcf 100644 --- a/tests/kafkatest/services/zookeeper.py +++ b/tests/kafkatest/services/zookeeper.py @@ -16,7 +16,6 @@ import os import re -import time from ducktape.services.service import Service from ducktape.utils.util import wait_until diff --git a/tests/kafkatest/tests/client/client_compatibility_features_test.py b/tests/kafkatest/tests/client/client_compatibility_features_test.py index 50f4f7f8af69b..f815326868032 100644 --- a/tests/kafkatest/tests/client/client_compatibility_features_test.py +++ b/tests/kafkatest/tests/client/client_compatibility_features_test.py @@ -14,6 +14,8 @@ # limitations under the License. import os + +import errno import time from random import randint @@ -53,7 +55,7 @@ def run_command(node, cmd, ssh_log_file): f.write(line) except Exception as e: f.write("** Command failed!") - print e + print(e) raise @@ -86,14 +88,14 @@ def invoke_compatibility_program(self, features): "--topic %s " % (self.zk.path.script("kafka-run-class.sh", node), self.kafka.bootstrap_servers(), len(self.kafka.nodes), - self.topics.keys()[0])) - for k, v in features.iteritems(): + list(self.topics.keys())[0])) + for k, v in features.items(): cmd = cmd + ("--%s %s " % (k, v)) results_dir = TestContext.results_dir(self.test_context, 0) try: os.makedirs(results_dir) except OSError as e: - if e.errno == errno.EEXIST and os.path.isdir(path): + if e.errno == errno.EEXIST and os.path.isdir(results_dir): pass else: raise diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py index 394a0f3b5718d..204cabbecdf41 100644 --- a/tests/kafkatest/tests/client/quota_test.py +++ b/tests/kafkatest/tests/client/quota_test.py @@ -162,7 +162,7 @@ def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_n jmx_attributes=['bytes-consumed-rate'], version=client_version) consumer.run() - for idx, messages in consumer.messages_consumed.iteritems(): + for idx, messages in consumer.messages_consumed.items(): assert len(messages) > 0, "consumer %d didn't consume any message before timeout" % idx success, msg = self.validate(self.kafka, producer, consumer) diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index 107c0d4290597..684cbc6da56da 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -26,6 +26,7 @@ from kafkatest.services.security.security_config import SecurityConfig from kafkatest.version import DEV_BRANCH, LATEST_2_3, LATEST_2_2, LATEST_2_1, LATEST_2_0, LATEST_1_1, LATEST_1_0, LATEST_0_11_0, LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, KafkaVersion +from functools import reduce from collections import Counter, namedtuple import itertools import json @@ -227,9 +228,9 @@ def test_pause_and_resume_source(self, connect_protocol): err_msg="Failed to see connector transition to the PAUSED state") # verify that we do not produce new messages while paused - num_messages = len(self.source.sent_messages()) + num_messages = len(list(self.source.sent_messages())) time.sleep(10) - assert num_messages == len(self.source.sent_messages()), "Paused source connector should not produce any messages" + assert num_messages == len(list(self.source.sent_messages())), "Paused source connector should not produce any messages" self.cc.resume_connector(self.source.name) @@ -238,7 +239,7 @@ def test_pause_and_resume_source(self, connect_protocol): err_msg="Failed to see connector transition to the RUNNING state") # after resuming, we should see records produced again - wait_until(lambda: len(self.source.sent_messages()) > num_messages, timeout_sec=30, + wait_until(lambda: len(list(self.source.sent_messages())) > num_messages, timeout_sec=30, err_msg="Failed to produce messages after resuming source connector") @cluster(num_nodes=5) @@ -258,7 +259,7 @@ def test_pause_and_resume_sink(self, connect_protocol): self.source = VerifiableSource(self.cc, topic=self.TOPIC) self.source.start() - wait_until(lambda: len(self.source.committed_messages()) > 0, timeout_sec=30, + wait_until(lambda: len(list(self.source.committed_messages())) > 0, timeout_sec=30, err_msg="Timeout expired waiting for source task to produce a message") self.sink = VerifiableSink(self.cc, topics=[self.TOPIC]) @@ -275,9 +276,9 @@ def test_pause_and_resume_sink(self, connect_protocol): err_msg="Failed to see connector transition to the PAUSED state") # verify that we do not consume new messages while paused - num_messages = len(self.sink.received_messages()) + num_messages = len(list(self.sink.received_messages())) time.sleep(10) - assert num_messages == len(self.sink.received_messages()), "Paused sink connector should not consume any messages" + assert num_messages == len(list(self.sink.received_messages())), "Paused sink connector should not consume any messages" self.cc.resume_connector(self.sink.name) @@ -286,7 +287,7 @@ def test_pause_and_resume_sink(self, connect_protocol): err_msg="Failed to see connector transition to the RUNNING state") # after resuming, we should see records consumed again - wait_until(lambda: len(self.sink.received_messages()) > num_messages, timeout_sec=30, + wait_until(lambda: len(list(self.sink.received_messages())) > num_messages, timeout_sec=30, err_msg="Failed to consume messages after resuming sink connector") @cluster(num_nodes=5) @@ -420,11 +421,11 @@ def test_bounce(self, clean, connect_protocol): src_seqnos = [msg['seqno'] for msg in src_messages if msg['task'] == task] # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because clean # bouncing should commit on rebalance. - src_seqno_max = max(src_seqnos) + src_seqno_max = max(src_seqnos) if len(src_seqnos) else 0 self.logger.debug("Max source seqno: %d", src_seqno_max) src_seqno_counts = Counter(src_seqnos) missing_src_seqnos = sorted(set(range(src_seqno_max)).difference(set(src_seqnos))) - duplicate_src_seqnos = sorted([seqno for seqno,count in src_seqno_counts.iteritems() if count > 1]) + duplicate_src_seqnos = sorted(seqno for seqno,count in src_seqno_counts.items() if count > 1) if missing_src_seqnos: self.logger.error("Missing source sequence numbers for task " + str(task)) @@ -440,11 +441,11 @@ def test_bounce(self, clean, connect_protocol): sink_seqnos = [msg['seqno'] for msg in sink_messages if msg['task'] == task] # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because # clean bouncing should commit on rebalance. - sink_seqno_max = max(sink_seqnos) + sink_seqno_max = max(sink_seqnos) if len(sink_seqnos) else 0 self.logger.debug("Max sink seqno: %d", sink_seqno_max) sink_seqno_counts = Counter(sink_seqnos) missing_sink_seqnos = sorted(set(range(sink_seqno_max)).difference(set(sink_seqnos))) - duplicate_sink_seqnos = sorted([seqno for seqno,count in sink_seqno_counts.iteritems() if count > 1]) + duplicate_sink_seqnos = sorted(seqno for seqno,count in iter(sink_seqno_counts.items()) if count > 1) if missing_sink_seqnos: self.logger.error("Missing sink sequence numbers for task " + str(task)) diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py index 9a1ff1bb63123..580d9f376c627 100644 --- a/tests/kafkatest/tests/connect/connect_test.py +++ b/tests/kafkatest/tests/connect/connect_test.py @@ -16,7 +16,7 @@ from ducktape.tests.test import Test from ducktape.mark.resource import cluster from ducktape.utils.util import wait_until -from ducktape.mark import parametrize, matrix +from ducktape.mark import parametrize from ducktape.cluster.remoteaccount import RemoteCommandError from ducktape.errors import TimeoutError @@ -28,7 +28,6 @@ import hashlib import json -import os.path class ConnectStandaloneFileTest(Test): @@ -130,7 +129,7 @@ def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.Jso def validate_output(self, value): try: output_hash = list(self.sink.node.account.ssh_capture("md5sum " + self.OUTPUT_FILE))[0].strip().split()[0] - return output_hash == hashlib.md5(value).hexdigest() + return output_hash == hashlib.md5(value.encode('utf-8')).hexdigest() except RemoteCommandError: return False diff --git a/tests/kafkatest/tests/core/downgrade_test.py b/tests/kafkatest/tests/core/downgrade_test.py index 7ed3cfbe55b95..250e5639dacb5 100644 --- a/tests/kafkatest/tests/core/downgrade_test.py +++ b/tests/kafkatest/tests/core/downgrade_test.py @@ -17,14 +17,9 @@ from ducktape.mark.resource import cluster from ducktape.utils.util import wait_until -from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.services.kafka import KafkaService from kafkatest.services.kafka import config_property -from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.end_to_end import EndToEndTest -from kafkatest.utils import is_int -from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, V_0_9_0_0, V_0_11_0_0, DEV_BRANCH, KafkaVersion +from kafkatest.version import LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, DEV_BRANCH, KafkaVersion class TestDowngrade(EndToEndTest): PARTITIONS = 3 diff --git a/tests/kafkatest/tests/core/get_offset_shell_test.py b/tests/kafkatest/tests/core/get_offset_shell_test.py index 8fbba17e53fff..82291d7db48c6 100644 --- a/tests/kafkatest/tests/core/get_offset_shell_test.py +++ b/tests/kafkatest/tests/core/get_offset_shell_test.py @@ -22,7 +22,6 @@ from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.services.security.security_config import SecurityConfig TOPIC = "topic-get-offset-shell" MAX_MESSAGES = 100 diff --git a/tests/kafkatest/tests/core/mirror_maker_test.py b/tests/kafkatest/tests/core/mirror_maker_test.py index c33f103e50e9e..05cdb4b030818 100644 --- a/tests/kafkatest/tests/core/mirror_maker_test.py +++ b/tests/kafkatest/tests/core/mirror_maker_test.py @@ -14,7 +14,7 @@ # limitations under the License. from ducktape.utils.util import wait_until -from ducktape.mark import parametrize, matrix +from ducktape.mark import matrix from ducktape.mark.resource import cluster from kafkatest.services.zookeeper import ZookeeperService diff --git a/tests/kafkatest/tests/core/network_degrade_test.py b/tests/kafkatest/tests/core/network_degrade_test.py index 5b77d99c62da3..76af8b0dd72c0 100644 --- a/tests/kafkatest/tests/core/network_degrade_test.py +++ b/tests/kafkatest/tests/core/network_degrade_test.py @@ -129,7 +129,7 @@ def test_rate(self, task_name, device_name, latency_ms, rate_limit_kbit): self.logger.info("Measured rates: %s" % measured_rates) # We expect to see measured rates within an order of magnitude of our target rate - low_kbps = rate_limit_kbit / 10 + low_kbps = rate_limit_kbit // 10 high_kbps = rate_limit_kbit * 10 acceptable_rates = [r for r in measured_rates if low_kbps < r < high_kbps] diff --git a/tests/kafkatest/tests/core/reassign_partitions_test.py b/tests/kafkatest/tests/core/reassign_partitions_test.py index c4397db899b5e..f36f77473557c 100644 --- a/tests/kafkatest/tests/core/reassign_partitions_test.py +++ b/tests/kafkatest/tests/core/reassign_partitions_test.py @@ -84,7 +84,7 @@ def reassign_partitions(self, bounce_brokers): self.logger.debug("Jumble partition assignment with seed " + str(seed)) random.seed(seed) # The list may still be in order, but that's ok - shuffled_list = range(0, self.num_partitions) + shuffled_list = list(range(0, self.num_partitions)) random.shuffle(shuffled_list) for i in range(0, self.num_partitions): diff --git a/tests/kafkatest/tests/core/replica_scale_test.py b/tests/kafkatest/tests/core/replica_scale_test.py index fbbf2069a29f6..dd8f89fbeb9c7 100644 --- a/tests/kafkatest/tests/core/replica_scale_test.py +++ b/tests/kafkatest/tests/core/replica_scale_test.py @@ -16,7 +16,6 @@ from ducktape.mark.resource import cluster from ducktape.mark import parametrize from ducktape.tests.test import Test -from ducktape.utils.util import wait_until from kafkatest.services.trogdor.produce_bench_workload import ProduceBenchWorkloadService, ProduceBenchWorkloadSpec from kafkatest.services.trogdor.consume_bench_workload import ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec @@ -25,7 +24,6 @@ from kafkatest.services.trogdor.trogdor import TrogdorService from kafkatest.services.zookeeper import ZookeeperService -import json import time @@ -48,7 +46,7 @@ def teardown(self): self.zk.stop() @cluster(num_nodes=12) - @parametrize(topic_count=500, partition_count=34, replication_factor=3) + @parametrize(topic_count=50, partition_count=34, replication_factor=3) def test_produce_consume(self, topic_count, partition_count, replication_factor): topics_create_start_time = time.time() for i in range(topic_count): @@ -103,7 +101,7 @@ def test_produce_consume(self, topic_count, partition_count, replication_factor) trogdor.stop() @cluster(num_nodes=12) - @parametrize(topic_count=500, partition_count=34, replication_factor=3) + @parametrize(topic_count=50, partition_count=34, replication_factor=3) def test_clean_bounce(self, topic_count, partition_count, replication_factor): topics_create_start_time = time.time() for i in range(topic_count): diff --git a/tests/kafkatest/tests/core/throttling_test.py b/tests/kafkatest/tests/core/throttling_test.py index f29ec2b63cb42..1a0c1c6ec0940 100644 --- a/tests/kafkatest/tests/core/throttling_test.py +++ b/tests/kafkatest/tests/core/throttling_test.py @@ -73,7 +73,7 @@ def __init__(self, test_context): self.num_records = 2000 self.record_size = 4096 * 100 # 400 KB # 1 MB per partition on average. - self.partition_size = (self.num_records * self.record_size) / self.num_partitions + self.partition_size = (self.num_records * self.record_size) // self.num_partitions self.num_producers = 2 self.num_consumers = 1 self.throttle = 4 * 1024 * 1024 # 4 MB/s diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index 9aeff230fd16d..ad8d0a7dd01f5 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -248,7 +248,7 @@ def test_transactions(self, failure_mode, bounce_target, check_order, use_group_ # We reduce the number of seed messages to copy to account for the fewer output # partitions, and thus lower parallelism. This helps keep the test # time shorter. - self.num_seed_messages = self.num_seed_messages / 3 + self.num_seed_messages = self.num_seed_messages // 3 self.num_input_partitions = 1 self.num_output_partitions = 1 diff --git a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py index 7d610daf39b8a..15eac3b58c17f 100644 --- a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py +++ b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.mark import matrix, ignore +from ducktape.mark import matrix from ducktape.mark.resource import cluster from kafkatest.services.zookeeper import ZookeeperService diff --git a/tests/kafkatest/tests/core/zookeeper_tls_encrypt_only_test.py b/tests/kafkatest/tests/core/zookeeper_tls_encrypt_only_test.py index 7e97a4cd685fc..d1ec11009568f 100644 --- a/tests/kafkatest/tests/core/zookeeper_tls_encrypt_only_test.py +++ b/tests/kafkatest/tests/core/zookeeper_tls_encrypt_only_test.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.mark import matrix, ignore from ducktape.mark.resource import cluster from kafkatest.services.zookeeper import ZookeeperService @@ -23,8 +22,6 @@ from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int -import logging - class ZookeeperTlsEncryptOnlyTest(ProduceConsumeValidateTest): """Tests TLS encryption-only (ssl.clientAuth=none) connectivity to zookeeper. """ diff --git a/tests/kafkatest/tests/core/zookeeper_tls_test.py b/tests/kafkatest/tests/core/zookeeper_tls_test.py index a3d8ab5109fae..ea8e34492d8a2 100644 --- a/tests/kafkatest/tests/core/zookeeper_tls_test.py +++ b/tests/kafkatest/tests/core/zookeeper_tls_test.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.mark import matrix, ignore from ducktape.mark.resource import cluster from kafkatest.services.zookeeper import ZookeeperService @@ -23,8 +22,6 @@ from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int -import logging - class ZookeeperTlsTest(ProduceConsumeValidateTest): """Tests TLS connectivity to zookeeper. """ diff --git a/tests/kafkatest/tests/end_to_end.py b/tests/kafkatest/tests/end_to_end.py index f385b996e4101..7ef6b974f6905 100644 --- a/tests/kafkatest/tests/end_to_end.py +++ b/tests/kafkatest/tests/end_to_end.py @@ -23,8 +23,6 @@ from kafkatest.services.zookeeper import ZookeeperService from kafkatest.utils import validate_delivery -import time - class EndToEndTest(Test): """This class provides a shared template for tests which follow the common pattern of: @@ -87,7 +85,7 @@ def on_record_consumed(self, record, node): def await_consumed_offsets(self, last_acked_offsets, timeout_sec): def has_finished_consuming(): - for partition, offset in last_acked_offsets.iteritems(): + for partition, offset in last_acked_offsets.items(): if not partition in self.last_consumed_offsets: return False last_commit = self.consumer.last_commit(partition) diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py index a0e69376043a8..d899280812764 100644 --- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py @@ -17,7 +17,7 @@ from ducktape.tests.test import Test from ducktape.mark.resource import cluster from ducktape.mark import matrix -from ducktape.mark import parametrize, ignore +from ducktape.mark import ignore from kafkatest.services.kafka import KafkaService from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService @@ -119,7 +119,7 @@ def __init__(self, test_context): def fail_broker_type(self, failure_mode, broker_type): # Pick a random topic and bounce it's leader topic_index = randint(0, len(self.topics.keys()) - 1) - topic = self.topics.keys()[topic_index] + topic = list(self.topics.keys())[topic_index] failures[failure_mode](self, topic, broker_type) def fail_many_brokers(self, failure_mode, num_failures): diff --git a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py b/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py index c5dd169338eb4..461573a0a9e1a 100644 --- a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py @@ -20,8 +20,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ - LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, \ - DEV_BRANCH, DEV_VERSION, KafkaVersion + LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3 from kafkatest.services.streams import CooperativeRebalanceUpgradeService from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running diff --git a/tests/kafkatest/tests/streams/templates/log4j_template.properties b/tests/kafkatest/tests/streams/templates/log4j_template.properties index f1cf342fff422..3f83b4220a1f5 100644 --- a/tests/kafkatest/tests/streams/templates/log4j_template.properties +++ b/tests/kafkatest/tests/streams/templates/log4j_template.properties @@ -17,7 +17,7 @@ log4j.rootLogger = {{ log_level|default("INFO") }}, FILE {% if loggers is defined %} -{% for logger, log_level in loggers.iteritems() %} +{% for logger, log_level in loggers.items() %} log4j.logger.{{ logger }}={{ log_level }} {% endfor %} {% endif %} diff --git a/tests/kafkatest/tests/streams/utils/__init__.py b/tests/kafkatest/tests/streams/utils/__init__.py index ed36ca2dcdf87..dcc4cba4966d4 100644 --- a/tests/kafkatest/tests/streams/utils/__init__.py +++ b/tests/kafkatest/tests/streams/utils/__init__.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -from util import verify_running, verify_stopped, stop_processors, extract_generation_from_logs, extract_generation_id +from .util import verify_running, verify_stopped, stop_processors, extract_generation_from_logs, extract_generation_id diff --git a/tests/kafkatest/tests/tools/log4j_appender_test.py b/tests/kafkatest/tests/tools/log4j_appender_test.py index 03879e15d488c..61a5d2a8932ba 100644 --- a/tests/kafkatest/tests/tools/log4j_appender_test.py +++ b/tests/kafkatest/tests/tools/log4j_appender_test.py @@ -23,7 +23,6 @@ from kafkatest.services.kafka import KafkaService from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.kafka_log4j_appender import KafkaLog4jAppender -from kafkatest.services.security.security_config import SecurityConfig TOPIC = "topic-log4j-appender" MAX_MESSAGES = 100 diff --git a/tests/kafkatest/tests/verifiable_consumer_test.py b/tests/kafkatest/tests/verifiable_consumer_test.py index 071439db8f207..e416690bfc402 100644 --- a/tests/kafkatest/tests/verifiable_consumer_test.py +++ b/tests/kafkatest/tests/verifiable_consumer_test.py @@ -40,7 +40,7 @@ def _all_partitions(self, topic, num_partitions): def _partitions(self, assignment): partitions = [] - for parts in assignment.itervalues(): + for parts in assignment.values(): partitions += parts return partitions diff --git a/tests/kafkatest/utils/__init__.py b/tests/kafkatest/utils/__init__.py index 1c1d5e02a5983..5d5d38dfe0aad 100644 --- a/tests/kafkatest/utils/__init__.py +++ b/tests/kafkatest/utils/__init__.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -from util import kafkatest_version, is_version, is_int, is_int_with_prefix, node_is_reachable, validate_delivery +from .util import kafkatest_version, is_version, is_int, is_int_with_prefix, node_is_reachable, validate_delivery diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py index 60f60075c1263..af986ba1ee54a 100644 --- a/tests/kafkatest/utils/util.py +++ b/tests/kafkatest/utils/util.py @@ -14,9 +14,7 @@ from kafkatest import __version__ as __kafkatest_version__ -import math import re -import time def kafkatest_version(): diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index adcad440f7673..77bbccb49c82f 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -49,6 +49,19 @@ def __str__(self): else: return LooseVersion.__str__(self) + def _cmp(self, other): + if isinstance(other, str): + other = KafkaVersion(other) + + if other.is_dev: + if self.is_dev: + return 0 + return -1 + elif self.is_dev: + return 1 + + return LooseVersion._cmp(self, other) + def supports_named_listeners(self): return self >= V_0_10_2_0