Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
931b273
KAFKA-10402: WIP
nizhikov Aug 17, 2020
7271e4d
KAFKA-10402: fixing imports.
nizhikov Aug 17, 2020
5f5ed71
KAFKA-10402: has_key -> in, iteritems -> iter(items) fixes.
nizhikov Aug 17, 2020
9973345
KAFKA-10402: iteriterms -> iter(dict.items), itervalues -> iter(dict.…
nizhikov Aug 17, 2020
9486c96
KAFKA-10402: basestring -> str
nizhikov Aug 18, 2020
71218d8
KAFKA-10402: syntax fixes.
nizhikov Aug 18, 2020
f0b68c0
KAFKA-10402: reduce topic_count to run tests on small servers.
nizhikov Aug 18, 2020
a814612
KAFKA-10402: fix usage iterable as array.
nizhikov Aug 18, 2020
575818b
KAFKA-10402: fix comparation of str(DEV_BRANCH)
nizhikov Aug 19, 2020
8a4ea97
KAFKA-10402: xrange -> range
nizhikov Aug 20, 2020
4522601
KAFKA-10402: division operator + import http.server
nizhikov Aug 20, 2020
b7c6833
KAFKA-10402: division operator fixes.
nizhikov Aug 20, 2020
887d428
KAFKA-10402: various syntax fixes based on the tests results.
nizhikov Aug 21, 2020
975968a
KAFKA-10402: final test fixes.
nizhikov Aug 22, 2020
a9be51f
KAFKA-10402: final test fixes.
nizhikov Aug 23, 2020
9614020
KAFKA-10402: code review fixes.
nizhikov Aug 23, 2020
e1d523e
KAFKA-10402: code review fixes.
nizhikov Aug 23, 2020
18e48af
KAFKA-10402: code review fixes.
nizhikov Aug 23, 2020
d706fc7
KAFKA-10402: code review fixes.
nizhikov Aug 24, 2020
14712e6
KAFKA-10402: code review fixes.
nizhikov Sep 3, 2020
4bb0f1d
KAFKA-10402: code review fixes.
nizhikov Sep 3, 2020
c31fd72
KAFKA-10402: code review fixes.
nizhikov Sep 3, 2020
316dc95
KAFKA-10402: decreasing topic count to run tests on docker
nizhikov Sep 17, 2020
2db3aa5
Merge branch 'trunk' into KAFKA-10402
nizhikov Oct 7, 2020
2e47ec3
KAFKA-10402: ducktape 0.8.0
nizhikov Oct 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions tests/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ LABEL ducker.creator=$ducker_creator

# Update Linux and install necessary utilities.
# we have to install git since it is included in openjdk:8 but not openjdk:11
RUN apt update && apt install -y sudo git netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python-pip python-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean
RUN python -m pip install -U pip==9.0.3;
RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip install --upgrade ducktape==0.7.9
RUN apt update && apt install -y sudo git netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean
RUN python3 -m pip install -U pip==20.2.2;
RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip3 install --upgrade ducktape==0.8.0

# Set up ssh
COPY ./ssh-config /root/.ssh/config
Expand Down
2 changes: 1 addition & 1 deletion tests/docker/ducker-ak
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ setup_custom_ducktape() {
docker_run ducker01 "${image_name}"
local running_container="$(docker ps -f=network=ducknet -q)"
must_do -v -o docker cp "${custom_ducktape}" "${running_container}:/opt/ducktape"
docker exec --user=root ducker01 bash -c 'set -x && cd /opt/kafka-dev/tests && sudo python ./setup.py develop install && cd /opt/ducktape && sudo python ./setup.py develop install'
docker exec --user=root ducker01 bash -c 'set -x && cd /opt/kafka-dev/tests && sudo python3 ./setup.py develop install && cd /opt/ducktape && sudo python3 ./setup.py develop install'
[[ $? -ne 0 ]] && die "failed to install the new ducktape."
must_do -v -o docker commit ducker01 "${image_name}"
must_do -v docker kill "${running_container}"
Expand Down
4 changes: 2 additions & 2 deletions tests/kafkatest/benchmarks/core/benchmark_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ def test_long_term_producer_throughput(self, compression_type="none",
# FIXME we should be generating a graph too
# Try to break it into 5 blocks, but fall back to a smaller number if
# there aren't even 5 elements
block_size = max(len(self.producer.stats[0]) / 5, 1)
nblocks = len(self.producer.stats[0]) / block_size
block_size = max(len(self.producer.stats[0]) // 5, 1)
nblocks = len(self.producer.stats[0]) // block_size

for i in range(nblocks):
subset = self.producer.stats[0][i*block_size:min((i+1)*block_size, len(self.producer.stats[0]))]
Expand Down
1 change: 0 additions & 1 deletion tests/kafkatest/sanity_checks/test_console_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import time

from ducktape.mark import matrix
from ducktape.mark import parametrize
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/sanity_checks/test_verifiable_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, test_context):
self.num_messages = 1000
# This will produce to source kafka cluster
self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.kafka, topic=self.topic,
max_messages=self.num_messages, throughput=self.num_messages/5)
max_messages=self.num_messages, throughput=self.num_messages // 5)

def setUp(self):
self.zk.start()
Expand Down
1 change: 0 additions & 1 deletion tests/kafkatest/services/console_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import itertools
import os

from ducktape.cluster.remoteaccount import RemoteCommandError
Expand Down
6 changes: 3 additions & 3 deletions tests/kafkatest/services/kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from kafka import KafkaService
from util import TopicPartition
from config import KafkaConfig
from .kafka import KafkaService
from .util import TopicPartition
from .config import KafkaConfig
4 changes: 2 additions & 2 deletions tests/kafkatest/services/kafka/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import config_property
from . import config_property


class KafkaConfig(dict):
Expand All @@ -34,7 +34,7 @@ def __init__(self, **kwargs):

# Set defaults
for key, val in self.DEFAULTS.items():
if not self.has_key(key):
if key not in self:
self[key] = val

def render(self):
Expand Down
6 changes: 3 additions & 3 deletions tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from ducktape.utils.util import wait_until
from ducktape.cluster.remoteaccount import RemoteCommandError

from config import KafkaConfig
from .config import KafkaConfig
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.kafka import config_property
from kafkatest.services.monitor.jmx import JmxMixin
Expand Down Expand Up @@ -678,11 +678,11 @@ def parse_describe_topic(self, topic_description):

fields = line.split("\t")
# ["Partition: 4", "Leader: 0"] -> ["4", "0"]
fields = map(lambda x: x.split(" ")[1], fields)
fields = list(map(lambda x: x.split(" ")[1], fields))
partitions.append(
{"topic": fields[0],
"partition": int(fields[1]),
"replicas": map(int, fields[3].split(','))})
"replicas": list(map(int, fields[3].split(',')))})
return {"partitions": partitions}


Expand Down
4 changes: 2 additions & 2 deletions tests/kafkatest/services/kafka/templates/kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ inter.broker.listener.name={{ interbroker_listener.name }}
security.inter.broker.protocol={{ interbroker_listener.security_protocol }}
{% endif %}

{% for k, v in listener_security_config.client_listener_overrides.iteritems() %}
{% for k, v in listener_security_config.client_listener_overrides.items() %}
{% if listener_security_config.requires_sasl_mechanism_prefix(k) %}
listener.name.{{ security_protocol.lower() }}.{{ security_config.client_sasl_mechanism.lower() }}.{{ k }}={{ v }}
{% else %}
Expand All @@ -36,7 +36,7 @@ listener.name.{{ security_protocol.lower() }}.{{ k }}={{ v }}
{% endfor %}

{% if interbroker_listener.name != security_protocol %}
{% for k, v in listener_security_config.interbroker_listener_overrides.iteritems() %}
{% for k, v in listener_security_config.interbroker_listener_overrides.items() %}
{% if listener_security_config.requires_sasl_mechanism_prefix(k) %}
listener.name.{{ interbroker_listener.name.lower() }}.{{ security_config.interbroker_sasl_mechanism.lower() }}.{{ k }}={{ v }}
{% else %}
Expand Down
6 changes: 3 additions & 3 deletions tests/kafkatest/services/monitor/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from http.server import BaseHTTPRequestHandler, HTTPServer
from collections import defaultdict, namedtuple
import json
from threading import Thread
Expand Down Expand Up @@ -114,7 +114,7 @@ def metrics(self, host=None, client_id=None, name=None, group=None, tags=None):
Get any collected metrics that match the specified parameters, yielding each as a tuple of
(key, [<timestamp, value>, ...]) values.
"""
for k, values in self._http_metrics.iteritems():
for k, values in self._http_metrics.items():
if ((host is None or host == k.host) and
(client_id is None or client_id == k.client_id) and
(name is None or name == k.name) and
Expand Down Expand Up @@ -154,7 +154,7 @@ def do_POST(self):
name = raw_metric['name']
group = raw_metric['group']
# Convert to tuple of pairs because dicts & lists are unhashable
tags = tuple([(k, v) for k, v in raw_metric['tags'].iteritems()]),
tags = tuple((k, v) for k, v in raw_metric['tags'].items()),
value = raw_metric['value']

key = MetricKey(host=host, client_id=client_id, name=name, group=group, tags=tags)
Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/services/monitor/jmx.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def read_jmx_output(self, idx, node):

for name in object_attribute_names:
aggregates_per_time = []
for time_sec in xrange(start_time_sec, end_time_sec + 1):
for time_sec in range(start_time_sec, end_time_sec + 1):
# assume that value is 0 if it is not read by jmx tool at the given time. This is appropriate for metrics such as bandwidth
values_per_node = [time_to_stats.get(time_sec, {}).get(name, 0) for time_to_stats in self.jmx_stats]
# assume that value is aggregated across nodes by sum. This is appropriate for metrics such as bandwidth
Expand Down
8 changes: 4 additions & 4 deletions tests/kafkatest/services/performance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from performance import PerformanceService, throughput, latency, compute_aggregate_throughput
from end_to_end_latency import EndToEndLatencyService
from producer_performance import ProducerPerformanceService
from consumer_performance import ConsumerPerformanceService
from .performance import PerformanceService, throughput, latency, compute_aggregate_throughput
from .end_to_end_latency import EndToEndLatencyService
from .producer_performance import ProducerPerformanceService
from .consumer_performance import ConsumerPerformanceService
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def start_cmd(self, node):
'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol),
'client_id': self.client_id,
'kafka_run_class': self.path.script("kafka-run-class.sh", node),
'metrics_props': ' '.join(["%s=%s" % (k, v) for k, v in self.http_metrics_client_configs.iteritems()])
'metrics_props': ' '.join("%s=%s" % (k, v) for k, v in self.http_metrics_client_configs.items())
})

cmd = ""
Expand Down
2 changes: 0 additions & 2 deletions tests/kafkatest/services/security/minikdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
# limitations under the License.

import os
import random
import uuid
from io import open
from os import remove, close
from shutil import move
Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/services/security/security_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ def props(self, prefix=''):
return ""
if self.has_sasl and not self.static_jaas_conf and 'sasl.jaas.config' not in self.properties:
raise Exception("JAAS configuration property has not yet been initialized")
config_lines = (prefix + key + "=" + value for key, value in self.properties.iteritems())
config_lines = (prefix + key + "=" + value for key, value in self.properties.items())
# Extra blank lines ensure this can be appended/prepended safely
return "\n".join(itertools.chain([""], config_lines, [""]))

Expand Down
7 changes: 3 additions & 4 deletions tests/kafkatest/services/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@

import os.path
import signal
import streams_property
import consumer_property
from ducktape.cluster.remoteaccount import RemoteCommandError
from . import streams_property
from . import consumer_property
from ducktape.services.service import Service
from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
Expand Down Expand Up @@ -231,7 +230,7 @@ def pids(self, node):
try:
pids = [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=str)]
return [int(pid) for pid in pids]
except Exception, exception:
except Exception as exception:
self.logger.debug(str(exception))
return []

Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/services/templates/tools_log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
log4j.rootLogger = {{ log_level|default("INFO") }}, FILE

{% if loggers is defined %}
{% for logger, log_level in loggers.iteritems() %}
{% for logger, log_level in loggers.items() %}
log4j.logger.{{ logger }}={{ log_level }}
{% endfor %}
{% endif %}
Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/services/trogdor/task_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def to_node_names(nodes):
"""
node_names = []
for obj in nodes:
if isinstance(obj, basestring):
if isinstance(obj, str):
node_names.append(obj)
else:
node_names.append(obj.name)
Expand Down
1 change: 0 additions & 1 deletion tests/kafkatest/services/verifiable_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from ducktape.cluster.remoteaccount import RemoteCommandError

import importlib
import os
import subprocess
import signal
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
Expand Down
20 changes: 10 additions & 10 deletions tests/kafkatest/services/verifiable_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ def clean_node(self, node):

def current_assignment(self):
with self.lock:
return { handler.node: handler.current_assignment() for handler in self.event_handlers.itervalues() }
return { handler.node: handler.current_assignment() for handler in self.event_handlers.values() }

def current_position(self, tp):
with self.lock:
Expand All @@ -372,7 +372,7 @@ def current_position(self, tp):

def owner(self, tp):
with self.lock:
for handler in self.event_handlers.itervalues():
for handler in self.event_handlers.values():
if tp in handler.current_assignment():
return handler.node
return None
Expand All @@ -386,33 +386,33 @@ def last_commit(self, tp):

def total_consumed(self):
with self.lock:
return sum(handler.total_consumed for handler in self.event_handlers.itervalues())
return sum(handler.total_consumed for handler in self.event_handlers.values())

def num_rebalances(self):
with self.lock:
return max(handler.assigned_count for handler in self.event_handlers.itervalues())
return max(handler.assigned_count for handler in self.event_handlers.values())

def num_revokes_for_alive(self, keep_alive=1):
with self.lock:
return max([handler.revoked_count for handler in self.event_handlers.itervalues()
if handler.idx <= keep_alive])
return max(handler.revoked_count for handler in self.event_handlers.values()
if handler.idx <= keep_alive)

def joined_nodes(self):
with self.lock:
return [handler.node for handler in self.event_handlers.itervalues()
return [handler.node for handler in self.event_handlers.values()
if handler.state == ConsumerState.Joined]

def rebalancing_nodes(self):
with self.lock:
return [handler.node for handler in self.event_handlers.itervalues()
return [handler.node for handler in self.event_handlers.values()
if handler.state == ConsumerState.Rebalancing]

def dead_nodes(self):
with self.lock:
return [handler.node for handler in self.event_handlers.itervalues()
return [handler.node for handler in self.event_handlers.values()
if handler.state == ConsumerState.Dead]

def alive_nodes(self):
with self.lock:
return [handler.node for handler in self.event_handlers.itervalues()
return [handler.node for handler in self.event_handlers.values()
if handler.state != ConsumerState.Dead]
1 change: 0 additions & 1 deletion tests/kafkatest/services/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import os
import re
import time

from ducktape.services.service import Service
from ducktape.utils.util import wait_until
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# limitations under the License.

import os

import errno
import time
from random import randint

Expand Down Expand Up @@ -53,7 +55,7 @@ def run_command(node, cmd, ssh_log_file):
f.write(line)
except Exception as e:
f.write("** Command failed!")
print e
print(e)
raise


Expand Down Expand Up @@ -86,14 +88,14 @@ def invoke_compatibility_program(self, features):
"--topic %s " % (self.zk.path.script("kafka-run-class.sh", node),
self.kafka.bootstrap_servers(),
len(self.kafka.nodes),
self.topics.keys()[0]))
for k, v in features.iteritems():
list(self.topics.keys())[0]))
for k, v in features.items():
cmd = cmd + ("--%s %s " % (k, v))
results_dir = TestContext.results_dir(self.test_context, 0)
try:
os.makedirs(results_dir)
except OSError as e:
if e.errno == errno.EEXIST and os.path.isdir(path):
if e.errno == errno.EEXIST and os.path.isdir(results_dir):
pass
else:
raise
Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/tests/client/quota_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_n
jmx_attributes=['bytes-consumed-rate'], version=client_version)
consumer.run()

for idx, messages in consumer.messages_consumed.iteritems():
for idx, messages in consumer.messages_consumed.items():
assert len(messages) > 0, "consumer %d didn't consume any message before timeout" % idx

success, msg = self.validate(self.kafka, producer, consumer)
Expand Down
Loading