From 931b2737701818da8e898aac00d03ba48489500e Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Mon, 17 Aug 2020 12:21:11 +0300 Subject: [PATCH 01/24] KAFKA-10402: WIP --- tests/docker/Dockerfile | 62 ++++++++++--------- tests/docker/ducker-ak | 2 +- tests/docker/run_tests.sh | 2 +- .../tests/verifiable_consumer_test.py | 2 +- 4 files changed, 35 insertions(+), 33 deletions(-) diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index f7d3c60ba300f..92b92448a44eb 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -33,9 +33,11 @@ LABEL ducker.creator=$ducker_creator # Update Linux and install necessary utilities. # we have to install git since it is included in openjdk:8 but not openjdk:11 +RUN apt-mark hold python2 python2-minimal python2.7 python2.7-minimal libpython2-stdlib libpython2.7-minimal libpython2.7-stdlib RUN apt update && apt install -y sudo git netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python-pip python-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean -RUN python -m pip install -U pip==9.0.3; -RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip install --upgrade ducktape==0.7.9 +RUN python3 -m pip install -U pip==20.2.2; +RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 +#RUN pip3 install git+https://github.com/confluentinc/ducktape # Set up ssh COPY ./ssh-config /root/.ssh/config @@ -46,36 +48,36 @@ RUN echo 'PermitUserEnvironment yes' >> /etc/ssh/sshd_config # Install binary test dependencies. # we use the same versions as in vagrant/base.sh ARG KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages" -RUN mkdir -p "/opt/kafka-0.8.2.2" && chmod a+rw /opt/kafka-0.8.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.8.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.8.2.2" -RUN mkdir -p "/opt/kafka-0.9.0.1" && chmod a+rw /opt/kafka-0.9.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.9.0.1" -RUN mkdir -p "/opt/kafka-0.10.0.1" && chmod a+rw /opt/kafka-0.10.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1" -RUN mkdir -p "/opt/kafka-0.10.1.1" && chmod a+rw /opt/kafka-0.10.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1" -RUN mkdir -p "/opt/kafka-0.10.2.2" && chmod a+rw /opt/kafka-0.10.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.2" -RUN mkdir -p "/opt/kafka-0.11.0.3" && chmod a+rw /opt/kafka-0.11.0.3 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.3.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.3" -RUN mkdir -p "/opt/kafka-1.0.2" && chmod a+rw /opt/kafka-1.0.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.2" -RUN mkdir -p "/opt/kafka-1.1.1" && chmod a+rw /opt/kafka-1.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.1.1" -RUN mkdir -p "/opt/kafka-2.0.1" && chmod a+rw /opt/kafka-2.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.0.1" -RUN mkdir -p "/opt/kafka-2.1.1" && chmod a+rw /opt/kafka-2.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.1.1" -RUN mkdir -p "/opt/kafka-2.2.2" && chmod a+rw /opt/kafka-2.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.2.2" -RUN mkdir -p "/opt/kafka-2.3.1" && chmod a+rw /opt/kafka-2.3.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.3.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.3.1" -RUN mkdir -p "/opt/kafka-2.4.1" && chmod a+rw /opt/kafka-2.4.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.4.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.4.1" -RUN mkdir -p "/opt/kafka-2.5.1" && chmod a+rw /opt/kafka-2.5.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.5.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.5.1" -RUN mkdir -p "/opt/kafka-2.6.0" && chmod a+rw /opt/kafka-2.6.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.6.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.6.0" +#RUN mkdir -p "/opt/kafka-0.8.2.2" && chmod a+rw /opt/kafka-0.8.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.8.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.8.2.2" +#RUN mkdir -p "/opt/kafka-0.9.0.1" && chmod a+rw /opt/kafka-0.9.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.9.0.1" +#RUN mkdir -p "/opt/kafka-0.10.0.1" && chmod a+rw /opt/kafka-0.10.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1" +#RUN mkdir -p "/opt/kafka-0.10.1.1" && chmod a+rw /opt/kafka-0.10.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1" +#RUN mkdir -p "/opt/kafka-0.10.2.2" && chmod a+rw /opt/kafka-0.10.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.2" +#RUN mkdir -p "/opt/kafka-0.11.0.3" && chmod a+rw /opt/kafka-0.11.0.3 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.3.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.3" +#RUN mkdir -p "/opt/kafka-1.0.2" && chmod a+rw /opt/kafka-1.0.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.2" +#RUN mkdir -p "/opt/kafka-1.1.1" && chmod a+rw /opt/kafka-1.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.1.1" +#RUN mkdir -p "/opt/kafka-2.0.1" && chmod a+rw /opt/kafka-2.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.0.1" +#RUN mkdir -p "/opt/kafka-2.1.1" && chmod a+rw /opt/kafka-2.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.1.1" +#RUN mkdir -p "/opt/kafka-2.2.2" && chmod a+rw /opt/kafka-2.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.2.2" +#RUN mkdir -p "/opt/kafka-2.3.1" && chmod a+rw /opt/kafka-2.3.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.3.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.3.1" +#RUN mkdir -p "/opt/kafka-2.4.1" && chmod a+rw /opt/kafka-2.4.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.4.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.4.1" +#RUN mkdir -p "/opt/kafka-2.5.1" && chmod a+rw /opt/kafka-2.5.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.5.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.5.1" +#RUN mkdir -p "/opt/kafka-2.6.0" && chmod a+rw /opt/kafka-2.6.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.6.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.6.0" # Streams test dependencies -RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o /opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar -RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.1.1-test.jar" -o /opt/kafka-0.10.1.1/libs/kafka-streams-0.10.1.1-test.jar -RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.2.2-test.jar" -o /opt/kafka-0.10.2.2/libs/kafka-streams-0.10.2.2-test.jar -RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.3-test.jar" -o /opt/kafka-0.11.0.3/libs/kafka-streams-0.11.0.3-test.jar -RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.0.2-test.jar" -o /opt/kafka-1.0.2/libs/kafka-streams-1.0.2-test.jar -RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.1.1-test.jar" -o /opt/kafka-1.1.1/libs/kafka-streams-1.1.1-test.jar -RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.0.1-test.jar" -o /opt/kafka-2.0.1/libs/kafka-streams-2.0.1-test.jar -RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.1.1-test.jar" -o /opt/kafka-2.1.1/libs/kafka-streams-2.1.1-test.jar -RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.2.2-test.jar" -o /opt/kafka-2.2.2/libs/kafka-streams-2.2.2-test.jar -RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.3.1-test.jar" -o /opt/kafka-2.3.1/libs/kafka-streams-2.3.1-test.jar -RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.4.1-test.jar" -o /opt/kafka-2.4.1/libs/kafka-streams-2.4.1-test.jar -RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.5.1-test.jar" -o /opt/kafka-2.5.1/libs/kafka-streams-2.5.1-test.jar -RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.6.0-test.jar" -o /opt/kafka-2.6.0/libs/kafka-streams-2.6.0-test.jar +#RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o /opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar +#RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.1.1-test.jar" -o /opt/kafka-0.10.1.1/libs/kafka-streams-0.10.1.1-test.jar +#RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.2.2-test.jar" -o /opt/kafka-0.10.2.2/libs/kafka-streams-0.10.2.2-test.jar +#RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.3-test.jar" -o /opt/kafka-0.11.0.3/libs/kafka-streams-0.11.0.3-test.jar +#RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.0.2-test.jar" -o /opt/kafka-1.0.2/libs/kafka-streams-1.0.2-test.jar +#RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.1.1-test.jar" -o /opt/kafka-1.1.1/libs/kafka-streams-1.1.1-test.jar +#RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.0.1-test.jar" -o /opt/kafka-2.0.1/libs/kafka-streams-2.0.1-test.jar +#RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.1.1-test.jar" -o /opt/kafka-2.1.1/libs/kafka-streams-2.1.1-test.jar +#RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.2.2-test.jar" -o /opt/kafka-2.2.2/libs/kafka-streams-2.2.2-test.jar +#RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.3.1-test.jar" -o /opt/kafka-2.3.1/libs/kafka-streams-2.3.1-test.jar +#RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.4.1-test.jar" -o /opt/kafka-2.4.1/libs/kafka-streams-2.4.1-test.jar +#RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.5.1-test.jar" -o /opt/kafka-2.5.1/libs/kafka-streams-2.5.1-test.jar +#RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.6.0-test.jar" -o /opt/kafka-2.6.0/libs/kafka-streams-2.6.0-test.jar # The version of Kibosh to use for testing. # If you update this, also update vagrant/base.sh diff --git a/tests/docker/ducker-ak b/tests/docker/ducker-ak index ca227917d7294..0a7604a44dfd9 100755 --- a/tests/docker/ducker-ak +++ b/tests/docker/ducker-ak @@ -270,7 +270,7 @@ setup_custom_ducktape() { docker_run ducker01 "${image_name}" local running_container="$(docker ps -f=network=ducknet -q)" must_do -v -o docker cp "${custom_ducktape}" "${running_container}:/opt/ducktape" - docker exec --user=root ducker01 bash -c 'set -x && cd /opt/kafka-dev/tests && sudo python ./setup.py develop install && cd /opt/ducktape && sudo python ./setup.py develop install' + docker exec --user=root ducker01 bash -c 'set -x && cd /opt/kafka-dev/tests && sudo python3 ./setup.py develop install && cd /opt/ducktape && sudo python3 ./setup.py develop install' [[ $? -ne 0 ]] && die "failed to install the new ducktape." must_do -v -o docker commit ducker01 "${image_name}" must_do -v docker kill "${running_container}" diff --git a/tests/docker/run_tests.sh b/tests/docker/run_tests.sh index 063e24d178765..120249d15be2e 100755 --- a/tests/docker/run_tests.sh +++ b/tests/docker/run_tests.sh @@ -16,7 +16,7 @@ # limitations under the License. SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" -KAFKA_NUM_CONTAINERS=${KAFKA_NUM_CONTAINERS:-14} +KAFKA_NUM_CONTAINERS=${KAFKA_NUM_CONTAINERS:-2} TC_PATHS=${TC_PATHS:-./kafkatest/} REBUILD=${REBUILD:f} diff --git a/tests/kafkatest/tests/verifiable_consumer_test.py b/tests/kafkatest/tests/verifiable_consumer_test.py index 071439db8f207..02b64db41fc46 100644 --- a/tests/kafkatest/tests/verifiable_consumer_test.py +++ b/tests/kafkatest/tests/verifiable_consumer_test.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.utils.util import wait_until +#from ducktape.utils.util import wait_until from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.verifiable_producer import VerifiableProducer From 7271e4dc727d8ba778bb375a74ede6e9427832ce Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Mon, 17 Aug 2020 16:44:50 +0300 Subject: [PATCH 02/24] KAFKA-10402: fixing imports. --- tests/kafkatest/sanity_checks/test_console_consumer.py | 1 - tests/kafkatest/services/console_consumer.py | 1 - tests/kafkatest/services/kafka/__init__.py | 6 +++--- tests/kafkatest/services/kafka/config.py | 2 +- tests/kafkatest/services/kafka/kafka.py | 2 +- tests/kafkatest/services/performance/__init__.py | 8 ++++---- tests/kafkatest/services/security/minikdc.py | 2 -- tests/kafkatest/services/streams.py | 5 ++--- tests/kafkatest/services/verifiable_client.py | 1 - tests/kafkatest/services/zookeeper.py | 1 - tests/kafkatest/tests/connect/connect_test.py | 2 +- tests/kafkatest/tests/core/downgrade_test.py | 7 +------ tests/kafkatest/tests/core/get_offset_shell_test.py | 1 - tests/kafkatest/tests/core/mirror_maker_test.py | 2 +- tests/kafkatest/tests/core/replica_scale_test.py | 2 -- .../tests/core/zookeeper_security_upgrade_test.py | 2 +- .../tests/core/zookeeper_tls_encrypt_only_test.py | 3 --- tests/kafkatest/tests/core/zookeeper_tls_test.py | 3 --- tests/kafkatest/tests/end_to_end.py | 2 -- .../kafkatest/tests/streams/streams_broker_bounce_test.py | 2 +- .../streams/streams_cooperative_rebalance_upgrade_test.py | 3 +-- tests/kafkatest/tests/streams/utils/__init__.py | 2 +- tests/kafkatest/tests/tools/log4j_appender_test.py | 1 - tests/kafkatest/tests/verifiable_consumer_test.py | 2 +- tests/kafkatest/utils/__init__.py | 2 +- tests/kafkatest/utils/util.py | 2 -- 26 files changed, 20 insertions(+), 47 deletions(-) diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py index acf1184e0595d..686cd42e123e5 100644 --- a/tests/kafkatest/sanity_checks/test_console_consumer.py +++ b/tests/kafkatest/sanity_checks/test_console_consumer.py @@ -16,7 +16,6 @@ import time from ducktape.mark import matrix -from ducktape.mark import parametrize from ducktape.mark.resource import cluster from ducktape.tests.test import Test from ducktape.utils.util import wait_until diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 47c2456ebd1cf..14b450c0871e2 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import itertools import os from ducktape.cluster.remoteaccount import RemoteCommandError diff --git a/tests/kafkatest/services/kafka/__init__.py b/tests/kafkatest/services/kafka/__init__.py index 5abbbee6029c9..5ae879db65e3c 100644 --- a/tests/kafkatest/services/kafka/__init__.py +++ b/tests/kafkatest/services/kafka/__init__.py @@ -13,6 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from kafka import KafkaService -from util import TopicPartition -from config import KafkaConfig +from .kafka import KafkaService +from .util import TopicPartition +from .config import KafkaConfig diff --git a/tests/kafkatest/services/kafka/config.py b/tests/kafkatest/services/kafka/config.py index 251a61f0a8d9b..a2097d9dbfea4 100644 --- a/tests/kafkatest/services/kafka/config.py +++ b/tests/kafkatest/services/kafka/config.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import config_property +from . import config_property class KafkaConfig(dict): diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index e164c917a6c83..3bc36e8c4f4cb 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -23,7 +23,7 @@ from ducktape.utils.util import wait_until from ducktape.cluster.remoteaccount import RemoteCommandError -from config import KafkaConfig +from .config import KafkaConfig from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.kafka import config_property from kafkatest.services.monitor.jmx import JmxMixin diff --git a/tests/kafkatest/services/performance/__init__.py b/tests/kafkatest/services/performance/__init__.py index 9eddcaa6dbcc1..69686f750519c 100644 --- a/tests/kafkatest/services/performance/__init__.py +++ b/tests/kafkatest/services/performance/__init__.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from performance import PerformanceService, throughput, latency, compute_aggregate_throughput -from end_to_end_latency import EndToEndLatencyService -from producer_performance import ProducerPerformanceService -from consumer_performance import ConsumerPerformanceService +from .performance import PerformanceService, throughput, latency, compute_aggregate_throughput +from .end_to_end_latency import EndToEndLatencyService +from .producer_performance import ProducerPerformanceService +from .consumer_performance import ConsumerPerformanceService diff --git a/tests/kafkatest/services/security/minikdc.py b/tests/kafkatest/services/security/minikdc.py index f26b379a52a73..23327db988379 100644 --- a/tests/kafkatest/services/security/minikdc.py +++ b/tests/kafkatest/services/security/minikdc.py @@ -14,8 +14,6 @@ # limitations under the License. import os -import random -import uuid from io import open from os import remove, close from shutil import move diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 66be72df107aa..c7c471e31ed3e 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -15,9 +15,8 @@ import os.path import signal -import streams_property -import consumer_property -from ducktape.cluster.remoteaccount import RemoteCommandError +from . import streams_property +from . import consumer_property from ducktape.services.service import Service from ducktape.utils.util import wait_until from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin diff --git a/tests/kafkatest/services/verifiable_client.py b/tests/kafkatest/services/verifiable_client.py index 72114a4ccdf4d..a2cbf96ef3646 100644 --- a/tests/kafkatest/services/verifiable_client.py +++ b/tests/kafkatest/services/verifiable_client.py @@ -18,7 +18,6 @@ from ducktape.cluster.remoteaccount import RemoteCommandError import importlib -import os import subprocess import signal from kafkatest.services.kafka.util import fix_opts_for_new_jvm diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py index b8cb9b69cbd82..51f2b31de2dcf 100644 --- a/tests/kafkatest/services/zookeeper.py +++ b/tests/kafkatest/services/zookeeper.py @@ -16,7 +16,6 @@ import os import re -import time from ducktape.services.service import Service from ducktape.utils.util import wait_until diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py index 9a1ff1bb63123..4009f751bec97 100644 --- a/tests/kafkatest/tests/connect/connect_test.py +++ b/tests/kafkatest/tests/connect/connect_test.py @@ -16,7 +16,7 @@ from ducktape.tests.test import Test from ducktape.mark.resource import cluster from ducktape.utils.util import wait_until -from ducktape.mark import parametrize, matrix +from ducktape.mark import parametrize from ducktape.cluster.remoteaccount import RemoteCommandError from ducktape.errors import TimeoutError diff --git a/tests/kafkatest/tests/core/downgrade_test.py b/tests/kafkatest/tests/core/downgrade_test.py index 7ed3cfbe55b95..250e5639dacb5 100644 --- a/tests/kafkatest/tests/core/downgrade_test.py +++ b/tests/kafkatest/tests/core/downgrade_test.py @@ -17,14 +17,9 @@ from ducktape.mark.resource import cluster from ducktape.utils.util import wait_until -from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.services.kafka import KafkaService from kafkatest.services.kafka import config_property -from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.end_to_end import EndToEndTest -from kafkatest.utils import is_int -from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, V_0_9_0_0, V_0_11_0_0, DEV_BRANCH, KafkaVersion +from kafkatest.version import LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, DEV_BRANCH, KafkaVersion class TestDowngrade(EndToEndTest): PARTITIONS = 3 diff --git a/tests/kafkatest/tests/core/get_offset_shell_test.py b/tests/kafkatest/tests/core/get_offset_shell_test.py index 8fbba17e53fff..82291d7db48c6 100644 --- a/tests/kafkatest/tests/core/get_offset_shell_test.py +++ b/tests/kafkatest/tests/core/get_offset_shell_test.py @@ -22,7 +22,6 @@ from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.services.security.security_config import SecurityConfig TOPIC = "topic-get-offset-shell" MAX_MESSAGES = 100 diff --git a/tests/kafkatest/tests/core/mirror_maker_test.py b/tests/kafkatest/tests/core/mirror_maker_test.py index c33f103e50e9e..05cdb4b030818 100644 --- a/tests/kafkatest/tests/core/mirror_maker_test.py +++ b/tests/kafkatest/tests/core/mirror_maker_test.py @@ -14,7 +14,7 @@ # limitations under the License. from ducktape.utils.util import wait_until -from ducktape.mark import parametrize, matrix +from ducktape.mark import matrix from ducktape.mark.resource import cluster from kafkatest.services.zookeeper import ZookeeperService diff --git a/tests/kafkatest/tests/core/replica_scale_test.py b/tests/kafkatest/tests/core/replica_scale_test.py index fbbf2069a29f6..0fb803dadd416 100644 --- a/tests/kafkatest/tests/core/replica_scale_test.py +++ b/tests/kafkatest/tests/core/replica_scale_test.py @@ -16,7 +16,6 @@ from ducktape.mark.resource import cluster from ducktape.mark import parametrize from ducktape.tests.test import Test -from ducktape.utils.util import wait_until from kafkatest.services.trogdor.produce_bench_workload import ProduceBenchWorkloadService, ProduceBenchWorkloadSpec from kafkatest.services.trogdor.consume_bench_workload import ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec @@ -25,7 +24,6 @@ from kafkatest.services.trogdor.trogdor import TrogdorService from kafkatest.services.zookeeper import ZookeeperService -import json import time diff --git a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py index 7d610daf39b8a..15eac3b58c17f 100644 --- a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py +++ b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.mark import matrix, ignore +from ducktape.mark import matrix from ducktape.mark.resource import cluster from kafkatest.services.zookeeper import ZookeeperService diff --git a/tests/kafkatest/tests/core/zookeeper_tls_encrypt_only_test.py b/tests/kafkatest/tests/core/zookeeper_tls_encrypt_only_test.py index 7e97a4cd685fc..d1ec11009568f 100644 --- a/tests/kafkatest/tests/core/zookeeper_tls_encrypt_only_test.py +++ b/tests/kafkatest/tests/core/zookeeper_tls_encrypt_only_test.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.mark import matrix, ignore from ducktape.mark.resource import cluster from kafkatest.services.zookeeper import ZookeeperService @@ -23,8 +22,6 @@ from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int -import logging - class ZookeeperTlsEncryptOnlyTest(ProduceConsumeValidateTest): """Tests TLS encryption-only (ssl.clientAuth=none) connectivity to zookeeper. """ diff --git a/tests/kafkatest/tests/core/zookeeper_tls_test.py b/tests/kafkatest/tests/core/zookeeper_tls_test.py index a3d8ab5109fae..ea8e34492d8a2 100644 --- a/tests/kafkatest/tests/core/zookeeper_tls_test.py +++ b/tests/kafkatest/tests/core/zookeeper_tls_test.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.mark import matrix, ignore from ducktape.mark.resource import cluster from kafkatest.services.zookeeper import ZookeeperService @@ -23,8 +22,6 @@ from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int -import logging - class ZookeeperTlsTest(ProduceConsumeValidateTest): """Tests TLS connectivity to zookeeper. """ diff --git a/tests/kafkatest/tests/end_to_end.py b/tests/kafkatest/tests/end_to_end.py index f385b996e4101..2bafa76386f04 100644 --- a/tests/kafkatest/tests/end_to_end.py +++ b/tests/kafkatest/tests/end_to_end.py @@ -23,8 +23,6 @@ from kafkatest.services.zookeeper import ZookeeperService from kafkatest.utils import validate_delivery -import time - class EndToEndTest(Test): """This class provides a shared template for tests which follow the common pattern of: diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py index a0e69376043a8..c1f36229bc726 100644 --- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py @@ -17,7 +17,7 @@ from ducktape.tests.test import Test from ducktape.mark.resource import cluster from ducktape.mark import matrix -from ducktape.mark import parametrize, ignore +from ducktape.mark import ignore from kafkatest.services.kafka import KafkaService from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService diff --git a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py b/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py index c5dd169338eb4..461573a0a9e1a 100644 --- a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py @@ -20,8 +20,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ - LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, \ - DEV_BRANCH, DEV_VERSION, KafkaVersion + LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3 from kafkatest.services.streams import CooperativeRebalanceUpgradeService from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running diff --git a/tests/kafkatest/tests/streams/utils/__init__.py b/tests/kafkatest/tests/streams/utils/__init__.py index ed36ca2dcdf87..dcc4cba4966d4 100644 --- a/tests/kafkatest/tests/streams/utils/__init__.py +++ b/tests/kafkatest/tests/streams/utils/__init__.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -from util import verify_running, verify_stopped, stop_processors, extract_generation_from_logs, extract_generation_id +from .util import verify_running, verify_stopped, stop_processors, extract_generation_from_logs, extract_generation_id diff --git a/tests/kafkatest/tests/tools/log4j_appender_test.py b/tests/kafkatest/tests/tools/log4j_appender_test.py index 03879e15d488c..61a5d2a8932ba 100644 --- a/tests/kafkatest/tests/tools/log4j_appender_test.py +++ b/tests/kafkatest/tests/tools/log4j_appender_test.py @@ -23,7 +23,6 @@ from kafkatest.services.kafka import KafkaService from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.kafka_log4j_appender import KafkaLog4jAppender -from kafkatest.services.security.security_config import SecurityConfig TOPIC = "topic-log4j-appender" MAX_MESSAGES = 100 diff --git a/tests/kafkatest/tests/verifiable_consumer_test.py b/tests/kafkatest/tests/verifiable_consumer_test.py index 02b64db41fc46..071439db8f207 100644 --- a/tests/kafkatest/tests/verifiable_consumer_test.py +++ b/tests/kafkatest/tests/verifiable_consumer_test.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -#from ducktape.utils.util import wait_until +from ducktape.utils.util import wait_until from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.verifiable_producer import VerifiableProducer diff --git a/tests/kafkatest/utils/__init__.py b/tests/kafkatest/utils/__init__.py index 1c1d5e02a5983..72a2858583b5e 100644 --- a/tests/kafkatest/utils/__init__.py +++ b/tests/kafkatest/utils/__init__.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -from util import kafkatest_version, is_version, is_int, is_int_with_prefix, node_is_reachable, validate_delivery +from .util import kafkatest_version, is_version, is_int, is_int_with_prefix, node_is_reachable, validate_delivery \ No newline at end of file diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py index 60f60075c1263..af986ba1ee54a 100644 --- a/tests/kafkatest/utils/util.py +++ b/tests/kafkatest/utils/util.py @@ -14,9 +14,7 @@ from kafkatest import __version__ as __kafkatest_version__ -import math import re -import time def kafkatest_version(): From 5f5ed7100f439a8e4a389cecb97e7aad0a2e0035 Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Mon, 17 Aug 2020 18:05:13 +0300 Subject: [PATCH 03/24] KAFKA-10402: has_key -> in, iteritems -> iter(items) fixes. --- tests/docker/Dockerfile | 2 +- tests/docker/run_tests.sh | 2 +- tests/kafkatest/services/kafka/config.py | 2 +- tests/kafkatest/services/kafka/templates/kafka.properties | 4 ++-- tests/kafkatest/services/monitor/http.py | 4 ++-- tests/kafkatest/services/performance/producer_performance.py | 2 +- tests/kafkatest/services/security/security_config.py | 2 +- tests/kafkatest/services/templates/tools_log4j.properties | 2 +- .../tests/client/client_compatibility_features_test.py | 2 +- tests/kafkatest/tests/client/quota_test.py | 2 +- tests/kafkatest/tests/connect/connect_distributed_test.py | 4 ++-- tests/kafkatest/tests/end_to_end.py | 2 +- .../tests/streams/templates/log4j_template.properties | 2 +- 13 files changed, 16 insertions(+), 16 deletions(-) diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index 92b92448a44eb..19bb100d08715 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -37,7 +37,7 @@ RUN apt-mark hold python2 python2-minimal python2.7 python2.7-minimal libpython2 RUN apt update && apt install -y sudo git netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python-pip python-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean RUN python3 -m pip install -U pip==20.2.2; RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 -#RUN pip3 install git+https://github.com/confluentinc/ducktape +RUN pip3 install git+https://github.com/confluentinc/ducktape # Set up ssh COPY ./ssh-config /root/.ssh/config diff --git a/tests/docker/run_tests.sh b/tests/docker/run_tests.sh index 120249d15be2e..063e24d178765 100755 --- a/tests/docker/run_tests.sh +++ b/tests/docker/run_tests.sh @@ -16,7 +16,7 @@ # limitations under the License. SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" -KAFKA_NUM_CONTAINERS=${KAFKA_NUM_CONTAINERS:-2} +KAFKA_NUM_CONTAINERS=${KAFKA_NUM_CONTAINERS:-14} TC_PATHS=${TC_PATHS:-./kafkatest/} REBUILD=${REBUILD:f} diff --git a/tests/kafkatest/services/kafka/config.py b/tests/kafkatest/services/kafka/config.py index a2097d9dbfea4..64e96e3886aa5 100644 --- a/tests/kafkatest/services/kafka/config.py +++ b/tests/kafkatest/services/kafka/config.py @@ -34,7 +34,7 @@ def __init__(self, **kwargs): # Set defaults for key, val in self.DEFAULTS.items(): - if not self.has_key(key): + if key not in self: self[key] = val def render(self): diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index 0c028aac7073c..86adbce1bbd60 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -27,7 +27,7 @@ inter.broker.listener.name={{ interbroker_listener.name }} security.inter.broker.protocol={{ interbroker_listener.security_protocol }} {% endif %} -{% for k, v in listener_security_config.client_listener_overrides.iteritems() %} +{% for k, v in iter(listener_security_config.client_listener_overrides.items()) %} {% if listener_security_config.requires_sasl_mechanism_prefix(k) %} listener.name.{{ security_protocol.lower() }}.{{ security_config.client_sasl_mechanism.lower() }}.{{ k }}={{ v }} {% else %} @@ -36,7 +36,7 @@ listener.name.{{ security_protocol.lower() }}.{{ k }}={{ v }} {% endfor %} {% if interbroker_listener.name != security_protocol %} -{% for k, v in listener_security_config.interbroker_listener_overrides.iteritems() %} +{% for k, v in iter(listener_security_config.interbroker_listener_overrides.items()) %} {% if listener_security_config.requires_sasl_mechanism_prefix(k) %} listener.name.{{ interbroker_listener.name.lower() }}.{{ security_config.interbroker_sasl_mechanism.lower() }}.{{ k }}={{ v }} {% else %} diff --git a/tests/kafkatest/services/monitor/http.py b/tests/kafkatest/services/monitor/http.py index 88ddb2ef68f3a..8182b2edacc1e 100644 --- a/tests/kafkatest/services/monitor/http.py +++ b/tests/kafkatest/services/monitor/http.py @@ -114,7 +114,7 @@ def metrics(self, host=None, client_id=None, name=None, group=None, tags=None): Get any collected metrics that match the specified parameters, yielding each as a tuple of (key, [, ...]) values. """ - for k, values in self._http_metrics.iteritems(): + for k, values in iter(self._http_metrics.items()): if ((host is None or host == k.host) and (client_id is None or client_id == k.client_id) and (name is None or name == k.name) and @@ -154,7 +154,7 @@ def do_POST(self): name = raw_metric['name'] group = raw_metric['group'] # Convert to tuple of pairs because dicts & lists are unhashable - tags = tuple([(k, v) for k, v in raw_metric['tags'].iteritems()]), + tags = tuple([(k, v) for k, v in iter(raw_metric['tags'].items())]), value = raw_metric['value'] key = MetricKey(host=host, client_id=client_id, name=name, group=group, tags=tags) diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py index 9cd56ec6e096d..7feab180c69fd 100644 --- a/tests/kafkatest/services/performance/producer_performance.py +++ b/tests/kafkatest/services/performance/producer_performance.py @@ -78,7 +78,7 @@ def start_cmd(self, node): 'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol), 'client_id': self.client_id, 'kafka_run_class': self.path.script("kafka-run-class.sh", node), - 'metrics_props': ' '.join(["%s=%s" % (k, v) for k, v in self.http_metrics_client_configs.iteritems()]) + 'metrics_props': ' '.join(["%s=%s" % (k, v) for k, v in iter(self.http_metrics_client_configs.items())]) }) cmd = "" diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py index d619630ee716e..e744026acca2a 100644 --- a/tests/kafkatest/services/security/security_config.py +++ b/tests/kafkatest/services/security/security_config.py @@ -386,7 +386,7 @@ def props(self, prefix=''): return "" if self.has_sasl and not self.static_jaas_conf and 'sasl.jaas.config' not in self.properties: raise Exception("JAAS configuration property has not yet been initialized") - config_lines = (prefix + key + "=" + value for key, value in self.properties.iteritems()) + config_lines = (prefix + key + "=" + value for key, value in iter(self.properties.items())) # Extra blank lines ensure this can be appended/prepended safely return "\n".join(itertools.chain([""], config_lines, [""])) diff --git a/tests/kafkatest/services/templates/tools_log4j.properties b/tests/kafkatest/services/templates/tools_log4j.properties index f1cf342fff422..a5fb4d125166d 100644 --- a/tests/kafkatest/services/templates/tools_log4j.properties +++ b/tests/kafkatest/services/templates/tools_log4j.properties @@ -17,7 +17,7 @@ log4j.rootLogger = {{ log_level|default("INFO") }}, FILE {% if loggers is defined %} -{% for logger, log_level in loggers.iteritems() %} +{% for logger, log_level in iter(loggers.items()) %} log4j.logger.{{ logger }}={{ log_level }} {% endfor %} {% endif %} diff --git a/tests/kafkatest/tests/client/client_compatibility_features_test.py b/tests/kafkatest/tests/client/client_compatibility_features_test.py index 50f4f7f8af69b..88c6c7d2691d0 100644 --- a/tests/kafkatest/tests/client/client_compatibility_features_test.py +++ b/tests/kafkatest/tests/client/client_compatibility_features_test.py @@ -87,7 +87,7 @@ def invoke_compatibility_program(self, features): self.kafka.bootstrap_servers(), len(self.kafka.nodes), self.topics.keys()[0])) - for k, v in features.iteritems(): + for k, v in iter(features.items()): cmd = cmd + ("--%s %s " % (k, v)) results_dir = TestContext.results_dir(self.test_context, 0) try: diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py index 394a0f3b5718d..5ccf345a2886d 100644 --- a/tests/kafkatest/tests/client/quota_test.py +++ b/tests/kafkatest/tests/client/quota_test.py @@ -162,7 +162,7 @@ def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_n jmx_attributes=['bytes-consumed-rate'], version=client_version) consumer.run() - for idx, messages in consumer.messages_consumed.iteritems(): + for idx, messages in iter(consumer.messages_consumed.items()): assert len(messages) > 0, "consumer %d didn't consume any message before timeout" % idx success, msg = self.validate(self.kafka, producer, consumer) diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index 107c0d4290597..7cc064c26f708 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -424,7 +424,7 @@ def test_bounce(self, clean, connect_protocol): self.logger.debug("Max source seqno: %d", src_seqno_max) src_seqno_counts = Counter(src_seqnos) missing_src_seqnos = sorted(set(range(src_seqno_max)).difference(set(src_seqnos))) - duplicate_src_seqnos = sorted([seqno for seqno,count in src_seqno_counts.iteritems() if count > 1]) + duplicate_src_seqnos = sorted([seqno for seqno,count in iter(src_seqno_counts.items()) if count > 1]) if missing_src_seqnos: self.logger.error("Missing source sequence numbers for task " + str(task)) @@ -444,7 +444,7 @@ def test_bounce(self, clean, connect_protocol): self.logger.debug("Max sink seqno: %d", sink_seqno_max) sink_seqno_counts = Counter(sink_seqnos) missing_sink_seqnos = sorted(set(range(sink_seqno_max)).difference(set(sink_seqnos))) - duplicate_sink_seqnos = sorted([seqno for seqno,count in sink_seqno_counts.iteritems() if count > 1]) + duplicate_sink_seqnos = sorted([seqno for seqno,count in iter(sink_seqno_counts.items()) if count > 1]) if missing_sink_seqnos: self.logger.error("Missing sink sequence numbers for task " + str(task)) diff --git a/tests/kafkatest/tests/end_to_end.py b/tests/kafkatest/tests/end_to_end.py index 2bafa76386f04..6da11b56fb47a 100644 --- a/tests/kafkatest/tests/end_to_end.py +++ b/tests/kafkatest/tests/end_to_end.py @@ -85,7 +85,7 @@ def on_record_consumed(self, record, node): def await_consumed_offsets(self, last_acked_offsets, timeout_sec): def has_finished_consuming(): - for partition, offset in last_acked_offsets.iteritems(): + for partition, offset in iter(last_acked_offsets.items()): if not partition in self.last_consumed_offsets: return False last_commit = self.consumer.last_commit(partition) diff --git a/tests/kafkatest/tests/streams/templates/log4j_template.properties b/tests/kafkatest/tests/streams/templates/log4j_template.properties index f1cf342fff422..a5fb4d125166d 100644 --- a/tests/kafkatest/tests/streams/templates/log4j_template.properties +++ b/tests/kafkatest/tests/streams/templates/log4j_template.properties @@ -17,7 +17,7 @@ log4j.rootLogger = {{ log_level|default("INFO") }}, FILE {% if loggers is defined %} -{% for logger, log_level in loggers.iteritems() %} +{% for logger, log_level in iter(loggers.items()) %} log4j.logger.{{ logger }}={{ log_level }} {% endfor %} {% endif %} From 9973345959483f3ee9bceafb8d302f192e6e539f Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Mon, 17 Aug 2020 18:56:21 +0300 Subject: [PATCH 04/24] KAFKA-10402: iteriterms -> iter(dict.items), itervalues -> iter(dict.values). --- .../services/kafka/templates/kafka.properties | 4 ++-- .../services/templates/tools_log4j.properties | 2 +- .../kafkatest/services/verifiable_consumer.py | 18 +++++++++--------- .../templates/log4j_template.properties | 2 +- .../tests/verifiable_consumer_test.py | 2 +- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index 86adbce1bbd60..96acc2cc96020 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -27,7 +27,7 @@ inter.broker.listener.name={{ interbroker_listener.name }} security.inter.broker.protocol={{ interbroker_listener.security_protocol }} {% endif %} -{% for k, v in iter(listener_security_config.client_listener_overrides.items()) %} +{% for k, v in listener_security_config.client_listener_overrides.items() %} {% if listener_security_config.requires_sasl_mechanism_prefix(k) %} listener.name.{{ security_protocol.lower() }}.{{ security_config.client_sasl_mechanism.lower() }}.{{ k }}={{ v }} {% else %} @@ -36,7 +36,7 @@ listener.name.{{ security_protocol.lower() }}.{{ k }}={{ v }} {% endfor %} {% if interbroker_listener.name != security_protocol %} -{% for k, v in iter(listener_security_config.interbroker_listener_overrides.items()) %} +{% for k, v in listener_security_config.interbroker_listener_overrides.items() %} {% if listener_security_config.requires_sasl_mechanism_prefix(k) %} listener.name.{{ interbroker_listener.name.lower() }}.{{ security_config.interbroker_sasl_mechanism.lower() }}.{{ k }}={{ v }} {% else %} diff --git a/tests/kafkatest/services/templates/tools_log4j.properties b/tests/kafkatest/services/templates/tools_log4j.properties index a5fb4d125166d..3f83b4220a1f5 100644 --- a/tests/kafkatest/services/templates/tools_log4j.properties +++ b/tests/kafkatest/services/templates/tools_log4j.properties @@ -17,7 +17,7 @@ log4j.rootLogger = {{ log_level|default("INFO") }}, FILE {% if loggers is defined %} -{% for logger, log_level in iter(loggers.items()) %} +{% for logger, log_level in loggers.items() %} log4j.logger.{{ logger }}={{ log_level }} {% endfor %} {% endif %} diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py index 433823bbab1ad..5471a0c0716f4 100644 --- a/tests/kafkatest/services/verifiable_consumer.py +++ b/tests/kafkatest/services/verifiable_consumer.py @@ -361,7 +361,7 @@ def clean_node(self, node): def current_assignment(self): with self.lock: - return { handler.node: handler.current_assignment() for handler in self.event_handlers.itervalues() } + return { handler.node: handler.current_assignment() for handler in iter(self.event_handlers.values()) } def current_position(self, tp): with self.lock: @@ -372,7 +372,7 @@ def current_position(self, tp): def owner(self, tp): with self.lock: - for handler in self.event_handlers.itervalues(): + for handler in iter(self.event_handlers.values()): if tp in handler.current_assignment(): return handler.node return None @@ -386,33 +386,33 @@ def last_commit(self, tp): def total_consumed(self): with self.lock: - return sum(handler.total_consumed for handler in self.event_handlers.itervalues()) + return sum(handler.total_consumed for handler in iter(self.event_handlers.values())) def num_rebalances(self): with self.lock: - return max(handler.assigned_count for handler in self.event_handlers.itervalues()) + return max(handler.assigned_count for handler in iter(self.event_handlers.values())) def num_revokes_for_alive(self, keep_alive=1): with self.lock: - return max([handler.revoked_count for handler in self.event_handlers.itervalues() + return max([handler.revoked_count for handler in iter(self.event_handlers.values()) if handler.idx <= keep_alive]) def joined_nodes(self): with self.lock: - return [handler.node for handler in self.event_handlers.itervalues() + return [handler.node for handler in iter(self.event_handlers.values()) if handler.state == ConsumerState.Joined] def rebalancing_nodes(self): with self.lock: - return [handler.node for handler in self.event_handlers.itervalues() + return [handler.node for handler in iter(self.event_handlers.values()) if handler.state == ConsumerState.Rebalancing] def dead_nodes(self): with self.lock: - return [handler.node for handler in self.event_handlers.itervalues() + return [handler.node for handler in iter(self.event_handlers.values()) if handler.state == ConsumerState.Dead] def alive_nodes(self): with self.lock: - return [handler.node for handler in self.event_handlers.itervalues() + return [handler.node for handler in iter(self.event_handlers.values()) if handler.state != ConsumerState.Dead] diff --git a/tests/kafkatest/tests/streams/templates/log4j_template.properties b/tests/kafkatest/tests/streams/templates/log4j_template.properties index a5fb4d125166d..3f83b4220a1f5 100644 --- a/tests/kafkatest/tests/streams/templates/log4j_template.properties +++ b/tests/kafkatest/tests/streams/templates/log4j_template.properties @@ -17,7 +17,7 @@ log4j.rootLogger = {{ log_level|default("INFO") }}, FILE {% if loggers is defined %} -{% for logger, log_level in iter(loggers.items()) %} +{% for logger, log_level in loggers.items() %} log4j.logger.{{ logger }}={{ log_level }} {% endfor %} {% endif %} diff --git a/tests/kafkatest/tests/verifiable_consumer_test.py b/tests/kafkatest/tests/verifiable_consumer_test.py index 071439db8f207..0783768c504c7 100644 --- a/tests/kafkatest/tests/verifiable_consumer_test.py +++ b/tests/kafkatest/tests/verifiable_consumer_test.py @@ -40,7 +40,7 @@ def _all_partitions(self, topic, num_partitions): def _partitions(self, assignment): partitions = [] - for parts in assignment.itervalues(): + for parts in iter(assignment.values()): partitions += parts return partitions From 9486c964b0454ac5488a7ca41e9afcbd98e6bd6d Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Tue, 18 Aug 2020 18:06:28 +0300 Subject: [PATCH 05/24] KAFKA-10402: basestring -> str --- tests/kafkatest/services/trogdor/task_spec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/kafkatest/services/trogdor/task_spec.py b/tests/kafkatest/services/trogdor/task_spec.py index ae457aebc06d0..aa5766ee81654 100644 --- a/tests/kafkatest/services/trogdor/task_spec.py +++ b/tests/kafkatest/services/trogdor/task_spec.py @@ -44,7 +44,7 @@ def to_node_names(nodes): """ node_names = [] for obj in nodes: - if isinstance(obj, basestring): + if isinstance(obj, str): node_names.append(obj) else: node_names.append(obj.name) From 71218d875780acf67ece492aed20a12bf109efdf Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Tue, 18 Aug 2020 18:37:08 +0300 Subject: [PATCH 06/24] KAFKA-10402: syntax fixes. --- tests/kafkatest/services/streams.py | 2 +- .../tests/client/client_compatibility_features_test.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index c7c471e31ed3e..dc6facca26841 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -230,7 +230,7 @@ def pids(self, node): try: pids = [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=str)] return [int(pid) for pid in pids] - except Exception, exception: + except Exception as exception: self.logger.debug(str(exception)) return [] diff --git a/tests/kafkatest/tests/client/client_compatibility_features_test.py b/tests/kafkatest/tests/client/client_compatibility_features_test.py index 88c6c7d2691d0..0dc6044bfd501 100644 --- a/tests/kafkatest/tests/client/client_compatibility_features_test.py +++ b/tests/kafkatest/tests/client/client_compatibility_features_test.py @@ -14,6 +14,8 @@ # limitations under the License. import os + +import errno import time from random import randint @@ -53,7 +55,7 @@ def run_command(node, cmd, ssh_log_file): f.write(line) except Exception as e: f.write("** Command failed!") - print e + print(e) raise @@ -93,7 +95,7 @@ def invoke_compatibility_program(self, features): try: os.makedirs(results_dir) except OSError as e: - if e.errno == errno.EEXIST and os.path.isdir(path): + if e.errno == errno.EEXIST and os.path.isdir(results_dir): pass else: raise From f0b68c095be8f5604faa19a8084222d77a6071d6 Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Tue, 18 Aug 2020 18:42:29 +0300 Subject: [PATCH 07/24] KAFKA-10402: reduce topic_count to run tests on small servers. --- tests/kafkatest/tests/core/replica_scale_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/kafkatest/tests/core/replica_scale_test.py b/tests/kafkatest/tests/core/replica_scale_test.py index 0fb803dadd416..5b6e5b9582dda 100644 --- a/tests/kafkatest/tests/core/replica_scale_test.py +++ b/tests/kafkatest/tests/core/replica_scale_test.py @@ -46,7 +46,7 @@ def teardown(self): self.zk.stop() @cluster(num_nodes=12) - @parametrize(topic_count=500, partition_count=34, replication_factor=3) + @parametrize(topic_count=100, partition_count=34, replication_factor=3) def test_produce_consume(self, topic_count, partition_count, replication_factor): topics_create_start_time = time.time() for i in range(topic_count): @@ -101,7 +101,7 @@ def test_produce_consume(self, topic_count, partition_count, replication_factor) trogdor.stop() @cluster(num_nodes=12) - @parametrize(topic_count=500, partition_count=34, replication_factor=3) + @parametrize(topic_count=100, partition_count=34, replication_factor=3) def test_clean_bounce(self, topic_count, partition_count, replication_factor): topics_create_start_time = time.time() for i in range(topic_count): From a814612a0e30513e70b48eb1a0e8cbe07eb03f93 Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Tue, 18 Aug 2020 19:49:04 +0300 Subject: [PATCH 08/24] KAFKA-10402: fix usage iterable as array. --- .../tests/client/client_compatibility_features_test.py | 2 +- tests/kafkatest/tests/streams/streams_broker_bounce_test.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/kafkatest/tests/client/client_compatibility_features_test.py b/tests/kafkatest/tests/client/client_compatibility_features_test.py index 0dc6044bfd501..1b9d5248baa44 100644 --- a/tests/kafkatest/tests/client/client_compatibility_features_test.py +++ b/tests/kafkatest/tests/client/client_compatibility_features_test.py @@ -88,7 +88,7 @@ def invoke_compatibility_program(self, features): "--topic %s " % (self.zk.path.script("kafka-run-class.sh", node), self.kafka.bootstrap_servers(), len(self.kafka.nodes), - self.topics.keys()[0])) + [*self.topics.keys()][0])) for k, v in iter(features.items()): cmd = cmd + ("--%s %s " % (k, v)) results_dir = TestContext.results_dir(self.test_context, 0) diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py index c1f36229bc726..384d71ffc5b93 100644 --- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py @@ -119,7 +119,7 @@ def __init__(self, test_context): def fail_broker_type(self, failure_mode, broker_type): # Pick a random topic and bounce it's leader topic_index = randint(0, len(self.topics.keys()) - 1) - topic = self.topics.keys()[topic_index] + topic = [*self.topics.keys()][topic_index] failures[failure_mode](self, topic, broker_type) def fail_many_brokers(self, failure_mode, num_failures): From 575818b55229196b2c874c97be0587e7f5d0fac1 Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Wed, 19 Aug 2020 16:18:26 +0300 Subject: [PATCH 09/24] KAFKA-10402: fix comparation of str(DEV_BRANCH) --- tests/kafkatest/version.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index adcad440f7673..a23001d45b026 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -49,6 +49,34 @@ def __str__(self): else: return LooseVersion.__str__(self) + def __eq__(self, other): + return self._cmp(other) == 0 + + def __lt__(self, other): + return self._cmp(other) < 0 + + def __le__(self, other): + return self._cmp(other) <= 0 + + def __gt__(self, other): + return self._cmp(other) > 0 + + def __ge__(self, other): + return self._cmp(other) >= 0 + + def _cmp(self, other): + if isinstance(other, str): + other = KafkaVersion(other) + + if other.is_dev: + if self.is_dev: + return 0 + return -1 + elif self.is_dev: + return 1 + + return LooseVersion._cmp(self, other) + def supports_named_listeners(self): return self >= V_0_10_2_0 From 8a4ea97fc9fe5f88982ee00a17ffe4c7acff0525 Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Thu, 20 Aug 2020 10:55:49 +0300 Subject: [PATCH 10/24] KAFKA-10402: xrange -> range --- tests/kafkatest/services/monitor/jmx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py index 2dcd369512dac..bff1878fada62 100644 --- a/tests/kafkatest/services/monitor/jmx.py +++ b/tests/kafkatest/services/monitor/jmx.py @@ -128,7 +128,7 @@ def read_jmx_output(self, idx, node): for name in object_attribute_names: aggregates_per_time = [] - for time_sec in xrange(start_time_sec, end_time_sec + 1): + for time_sec in range(start_time_sec, end_time_sec + 1): # assume that value is 0 if it is not read by jmx tool at the given time. This is appropriate for metrics such as bandwidth values_per_node = [time_to_stats.get(time_sec, {}).get(name, 0) for time_to_stats in self.jmx_stats] # assume that value is aggregated across nodes by sum. This is appropriate for metrics such as bandwidth From 4522601f3450a163c33d91205af1ca7be881a4e2 Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Thu, 20 Aug 2020 13:54:00 +0300 Subject: [PATCH 11/24] KAFKA-10402: division operator + import http.server --- tests/kafkatest/services/monitor/http.py | 2 +- tests/kafkatest/tests/core/transactions_test.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/kafkatest/services/monitor/http.py b/tests/kafkatest/services/monitor/http.py index 8182b2edacc1e..3e1a989a662e1 100644 --- a/tests/kafkatest/services/monitor/http.py +++ b/tests/kafkatest/services/monitor/http.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer +from http.server import BaseHTTPRequestHandler, HTTPServer from collections import defaultdict, namedtuple import json from threading import Thread diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index 9aeff230fd16d..ad8d0a7dd01f5 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -248,7 +248,7 @@ def test_transactions(self, failure_mode, bounce_target, check_order, use_group_ # We reduce the number of seed messages to copy to account for the fewer output # partitions, and thus lower parallelism. This helps keep the test # time shorter. - self.num_seed_messages = self.num_seed_messages / 3 + self.num_seed_messages = self.num_seed_messages // 3 self.num_input_partitions = 1 self.num_output_partitions = 1 From b7c6833723960e26e2ec94d7430d37fc4d61d1c4 Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Thu, 20 Aug 2020 17:15:32 +0300 Subject: [PATCH 12/24] KAFKA-10402: division operator fixes. --- tests/kafkatest/tests/core/network_degrade_test.py | 4 ++-- tests/kafkatest/tests/core/throttling_test.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/kafkatest/tests/core/network_degrade_test.py b/tests/kafkatest/tests/core/network_degrade_test.py index 5b77d99c62da3..808b4f9faef07 100644 --- a/tests/kafkatest/tests/core/network_degrade_test.py +++ b/tests/kafkatest/tests/core/network_degrade_test.py @@ -129,10 +129,10 @@ def test_rate(self, task_name, device_name, latency_ms, rate_limit_kbit): self.logger.info("Measured rates: %s" % measured_rates) # We expect to see measured rates within an order of magnitude of our target rate - low_kbps = rate_limit_kbit / 10 + low_kbps = rate_limit_kbit // 10 high_kbps = rate_limit_kbit * 10 acceptable_rates = [r for r in measured_rates if low_kbps < r < high_kbps] msg = "Expected most of the measured rates to be within an order of magnitude of target %d." % rate_limit_kbit - msg += " This means `tc` did not limit the bandwidth as expected." + msg += " This means `tc` did not limit the bandwidth as expected. Measured rates %s" % str(measured_rates) assert len(acceptable_rates) > 5, msg diff --git a/tests/kafkatest/tests/core/throttling_test.py b/tests/kafkatest/tests/core/throttling_test.py index f29ec2b63cb42..1a0c1c6ec0940 100644 --- a/tests/kafkatest/tests/core/throttling_test.py +++ b/tests/kafkatest/tests/core/throttling_test.py @@ -73,7 +73,7 @@ def __init__(self, test_context): self.num_records = 2000 self.record_size = 4096 * 100 # 400 KB # 1 MB per partition on average. - self.partition_size = (self.num_records * self.record_size) / self.num_partitions + self.partition_size = (self.num_records * self.record_size) // self.num_partitions self.num_producers = 2 self.num_consumers = 1 self.throttle = 4 * 1024 * 1024 # 4 MB/s From 887d4284819bb53f92442b09ea106857b6a6cdfa Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Fri, 21 Aug 2020 13:04:03 +0300 Subject: [PATCH 13/24] KAFKA-10402: various syntax fixes based on the tests results. --- tests/kafkatest/benchmarks/core/benchmark_test.py | 6 +++--- tests/kafkatest/sanity_checks/test_verifiable_producer.py | 2 +- tests/kafkatest/services/kafka/kafka.py | 4 ++-- tests/kafkatest/tests/connect/connect_distributed_test.py | 3 ++- tests/kafkatest/tests/connect/connect_test.py | 3 +-- tests/kafkatest/tests/core/reassign_partitions_test.py | 2 +- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py b/tests/kafkatest/benchmarks/core/benchmark_test.py index 6bab304f3037b..b53e629c4c143 100644 --- a/tests/kafkatest/benchmarks/core/benchmark_test.py +++ b/tests/kafkatest/benchmarks/core/benchmark_test.py @@ -88,7 +88,7 @@ def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DE self.validate_versions(client_version, broker_version) self.start_kafka(security_protocol, security_protocol, broker_version, tls_version) # Always generate the same total amount of data - nrecords = int(self.target_data_size / message_size) + nrecords = int(self.target_data_size // message_size) self.producer = ProducerPerformanceService( self.test_context, num_producers, self.kafka, topic=topic, @@ -140,8 +140,8 @@ def test_long_term_producer_throughput(self, compression_type="none", # FIXME we should be generating a graph too # Try to break it into 5 blocks, but fall back to a smaller number if # there aren't even 5 elements - block_size = max(len(self.producer.stats[0]) / 5, 1) - nblocks = len(self.producer.stats[0]) / block_size + block_size = max(len(self.producer.stats[0]) // 5, 1) + nblocks = len(self.producer.stats[0]) // block_size for i in range(nblocks): subset = self.producer.stats[0][i*block_size:min((i+1)*block_size, len(self.producer.stats[0]))] diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py index a008532cb1135..fde2abf1f819f 100644 --- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py +++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py @@ -39,7 +39,7 @@ def __init__(self, test_context): self.num_messages = 1000 # This will produce to source kafka cluster self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, - max_messages=self.num_messages, throughput=self.num_messages/5) + max_messages=self.num_messages, throughput=self.num_messages // 5) def setUp(self): self.zk.start() diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 3bc36e8c4f4cb..be56b859c37f5 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -678,11 +678,11 @@ def parse_describe_topic(self, topic_description): fields = line.split("\t") # ["Partition: 4", "Leader: 0"] -> ["4", "0"] - fields = map(lambda x: x.split(" ")[1], fields) + fields = list(map(lambda x: x.split(" ")[1], fields)) partitions.append( {"topic": fields[0], "partition": int(fields[1]), - "replicas": map(int, fields[3].split(','))}) + "replicas": list(map(int, fields[3].split(',')))}) return {"partitions": partitions} diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index 7cc064c26f708..d0a5a22506a80 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -26,6 +26,7 @@ from kafkatest.services.security.security_config import SecurityConfig from kafkatest.version import DEV_BRANCH, LATEST_2_3, LATEST_2_2, LATEST_2_1, LATEST_2_0, LATEST_1_1, LATEST_1_0, LATEST_0_11_0, LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, KafkaVersion +from functools import reduce from collections import Counter, namedtuple import itertools import json @@ -258,7 +259,7 @@ def test_pause_and_resume_sink(self, connect_protocol): self.source = VerifiableSource(self.cc, topic=self.TOPIC) self.source.start() - wait_until(lambda: len(self.source.committed_messages()) > 0, timeout_sec=30, + wait_until(lambda: len(list(self.source.committed_messages())) > 0, timeout_sec=30, err_msg="Timeout expired waiting for source task to produce a message") self.sink = VerifiableSink(self.cc, topics=[self.TOPIC]) diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py index 4009f751bec97..580d9f376c627 100644 --- a/tests/kafkatest/tests/connect/connect_test.py +++ b/tests/kafkatest/tests/connect/connect_test.py @@ -28,7 +28,6 @@ import hashlib import json -import os.path class ConnectStandaloneFileTest(Test): @@ -130,7 +129,7 @@ def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.Jso def validate_output(self, value): try: output_hash = list(self.sink.node.account.ssh_capture("md5sum " + self.OUTPUT_FILE))[0].strip().split()[0] - return output_hash == hashlib.md5(value).hexdigest() + return output_hash == hashlib.md5(value.encode('utf-8')).hexdigest() except RemoteCommandError: return False diff --git a/tests/kafkatest/tests/core/reassign_partitions_test.py b/tests/kafkatest/tests/core/reassign_partitions_test.py index c4397db899b5e..f36f77473557c 100644 --- a/tests/kafkatest/tests/core/reassign_partitions_test.py +++ b/tests/kafkatest/tests/core/reassign_partitions_test.py @@ -84,7 +84,7 @@ def reassign_partitions(self, bounce_brokers): self.logger.debug("Jumble partition assignment with seed " + str(seed)) random.seed(seed) # The list may still be in order, but that's ok - shuffled_list = range(0, self.num_partitions) + shuffled_list = list(range(0, self.num_partitions)) random.shuffle(shuffled_list) for i in range(0, self.num_partitions): From 975968a98ac1bd3efec429ea1fe1f74d68606b75 Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Sat, 22 Aug 2020 11:59:53 +0300 Subject: [PATCH 14/24] KAFKA-10402: final test fixes. --- .../tests/connect/connect_distributed_test.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index d0a5a22506a80..b709b9e10492b 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -228,9 +228,9 @@ def test_pause_and_resume_source(self, connect_protocol): err_msg="Failed to see connector transition to the PAUSED state") # verify that we do not produce new messages while paused - num_messages = len(self.source.sent_messages()) + num_messages = len(list(self.source.sent_messages())) time.sleep(10) - assert num_messages == len(self.source.sent_messages()), "Paused source connector should not produce any messages" + assert num_messages == len(list(self.source.sent_messages())), "Paused source connector should not produce any messages" self.cc.resume_connector(self.source.name) @@ -239,7 +239,7 @@ def test_pause_and_resume_source(self, connect_protocol): err_msg="Failed to see connector transition to the RUNNING state") # after resuming, we should see records produced again - wait_until(lambda: len(self.source.sent_messages()) > num_messages, timeout_sec=30, + wait_until(lambda: len(list(self.source.sent_messages())) > num_messages, timeout_sec=30, err_msg="Failed to produce messages after resuming source connector") @cluster(num_nodes=5) @@ -276,9 +276,9 @@ def test_pause_and_resume_sink(self, connect_protocol): err_msg="Failed to see connector transition to the PAUSED state") # verify that we do not consume new messages while paused - num_messages = len(self.sink.received_messages()) + num_messages = len(list(self.sink.received_messages())) time.sleep(10) - assert num_messages == len(self.sink.received_messages()), "Paused sink connector should not consume any messages" + assert num_messages == len(list(self.sink.received_messages())), "Paused sink connector should not consume any messages" self.cc.resume_connector(self.sink.name) @@ -287,7 +287,7 @@ def test_pause_and_resume_sink(self, connect_protocol): err_msg="Failed to see connector transition to the RUNNING state") # after resuming, we should see records consumed again - wait_until(lambda: len(self.sink.received_messages()) > num_messages, timeout_sec=30, + wait_until(lambda: len(list(self.sink.received_messages())) > num_messages, timeout_sec=30, err_msg="Failed to consume messages after resuming sink connector") @cluster(num_nodes=5) @@ -421,7 +421,10 @@ def test_bounce(self, clean, connect_protocol): src_seqnos = [msg['seqno'] for msg in src_messages if msg['task'] == task] # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because clean # bouncing should commit on rebalance. - src_seqno_max = max(src_seqnos) + if len(src_seqnos) == 0: + src_seqno_max = 0 + else: + src_seqno_max = max(src_seqnos) self.logger.debug("Max source seqno: %d", src_seqno_max) src_seqno_counts = Counter(src_seqnos) missing_src_seqnos = sorted(set(range(src_seqno_max)).difference(set(src_seqnos))) From a9be51f9f9dedf920f272b0c6b8ef42ab373b16f Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Sun, 23 Aug 2020 10:58:02 +0300 Subject: [PATCH 15/24] KAFKA-10402: final test fixes. --- tests/kafkatest/tests/connect/connect_distributed_test.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index b709b9e10492b..eabe9a61f8a72 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -444,7 +444,11 @@ def test_bounce(self, clean, connect_protocol): sink_seqnos = [msg['seqno'] for msg in sink_messages if msg['task'] == task] # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because # clean bouncing should commit on rebalance. - sink_seqno_max = max(sink_seqnos) + + if len(sink_seqnos) == 0: + sink_seqno_max = 0 + else: + sink_seqno_max = max(sink_seqnos) self.logger.debug("Max sink seqno: %d", sink_seqno_max) sink_seqno_counts = Counter(sink_seqnos) missing_sink_seqnos = sorted(set(range(sink_seqno_max)).difference(set(sink_seqnos))) From 9614020d7b5fb0332fb4e3229bfe6e269454c079 Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Sun, 23 Aug 2020 11:20:09 +0300 Subject: [PATCH 16/24] KAFKA-10402: code review fixes. --- .../benchmarks/core/benchmark_test.py | 2 +- tests/kafkatest/services/monitor/http.py | 4 ++-- .../performance/producer_performance.py | 2 +- .../services/security/security_config.py | 2 +- .../kafkatest/services/verifiable_consumer.py | 18 +++++++++--------- .../client_compatibility_features_test.py | 4 ++-- tests/kafkatest/tests/client/quota_test.py | 2 +- .../tests/connect/connect_distributed_test.py | 13 +++---------- tests/kafkatest/tests/end_to_end.py | 2 +- .../streams/streams_broker_bounce_test.py | 2 +- 10 files changed, 22 insertions(+), 29 deletions(-) diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py b/tests/kafkatest/benchmarks/core/benchmark_test.py index b53e629c4c143..5e7bddeb5d107 100644 --- a/tests/kafkatest/benchmarks/core/benchmark_test.py +++ b/tests/kafkatest/benchmarks/core/benchmark_test.py @@ -88,7 +88,7 @@ def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DE self.validate_versions(client_version, broker_version) self.start_kafka(security_protocol, security_protocol, broker_version, tls_version) # Always generate the same total amount of data - nrecords = int(self.target_data_size // message_size) + nrecords = int(self.target_data_size / message_size) self.producer = ProducerPerformanceService( self.test_context, num_producers, self.kafka, topic=topic, diff --git a/tests/kafkatest/services/monitor/http.py b/tests/kafkatest/services/monitor/http.py index 3e1a989a662e1..63ca3424182b2 100644 --- a/tests/kafkatest/services/monitor/http.py +++ b/tests/kafkatest/services/monitor/http.py @@ -114,7 +114,7 @@ def metrics(self, host=None, client_id=None, name=None, group=None, tags=None): Get any collected metrics that match the specified parameters, yielding each as a tuple of (key, [, ...]) values. """ - for k, values in iter(self._http_metrics.items()): + for k, values in self._http_metrics.items(): if ((host is None or host == k.host) and (client_id is None or client_id == k.client_id) and (name is None or name == k.name) and @@ -154,7 +154,7 @@ def do_POST(self): name = raw_metric['name'] group = raw_metric['group'] # Convert to tuple of pairs because dicts & lists are unhashable - tags = tuple([(k, v) for k, v in iter(raw_metric['tags'].items())]), + tags = tuple([(k, v) for k, v in raw_metric['tags'].items()]), value = raw_metric['value'] key = MetricKey(host=host, client_id=client_id, name=name, group=group, tags=tags) diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py index 7feab180c69fd..87d4bcfd96e6a 100644 --- a/tests/kafkatest/services/performance/producer_performance.py +++ b/tests/kafkatest/services/performance/producer_performance.py @@ -78,7 +78,7 @@ def start_cmd(self, node): 'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol), 'client_id': self.client_id, 'kafka_run_class': self.path.script("kafka-run-class.sh", node), - 'metrics_props': ' '.join(["%s=%s" % (k, v) for k, v in iter(self.http_metrics_client_configs.items())]) + 'metrics_props': ' '.join(["%s=%s" % (k, v) for k, v in self.http_metrics_client_configs.items()]) }) cmd = "" diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py index e744026acca2a..437084ba16cc8 100644 --- a/tests/kafkatest/services/security/security_config.py +++ b/tests/kafkatest/services/security/security_config.py @@ -386,7 +386,7 @@ def props(self, prefix=''): return "" if self.has_sasl and not self.static_jaas_conf and 'sasl.jaas.config' not in self.properties: raise Exception("JAAS configuration property has not yet been initialized") - config_lines = (prefix + key + "=" + value for key, value in iter(self.properties.items())) + config_lines = (prefix + key + "=" + value for key, value in self.properties.items()) # Extra blank lines ensure this can be appended/prepended safely return "\n".join(itertools.chain([""], config_lines, [""])) diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py index 5471a0c0716f4..8f994bda87d3b 100644 --- a/tests/kafkatest/services/verifiable_consumer.py +++ b/tests/kafkatest/services/verifiable_consumer.py @@ -361,7 +361,7 @@ def clean_node(self, node): def current_assignment(self): with self.lock: - return { handler.node: handler.current_assignment() for handler in iter(self.event_handlers.values()) } + return { handler.node: handler.current_assignment() for handler in self.event_handlers.values() } def current_position(self, tp): with self.lock: @@ -372,7 +372,7 @@ def current_position(self, tp): def owner(self, tp): with self.lock: - for handler in iter(self.event_handlers.values()): + for handler in self.event_handlers.values(): if tp in handler.current_assignment(): return handler.node return None @@ -386,33 +386,33 @@ def last_commit(self, tp): def total_consumed(self): with self.lock: - return sum(handler.total_consumed for handler in iter(self.event_handlers.values())) + return sum(handler.total_consumed for handler in self.event_handlers.values()) def num_rebalances(self): with self.lock: - return max(handler.assigned_count for handler in iter(self.event_handlers.values())) + return max(handler.assigned_count for handler in self.event_handlers.values()) def num_revokes_for_alive(self, keep_alive=1): with self.lock: - return max([handler.revoked_count for handler in iter(self.event_handlers.values()) + return max([handler.revoked_count for handler in self.event_handlers.values() if handler.idx <= keep_alive]) def joined_nodes(self): with self.lock: - return [handler.node for handler in iter(self.event_handlers.values()) + return [handler.node for handler in self.event_handlers.values() if handler.state == ConsumerState.Joined] def rebalancing_nodes(self): with self.lock: - return [handler.node for handler in iter(self.event_handlers.values()) + return [handler.node for handler in self.event_handlers.values() if handler.state == ConsumerState.Rebalancing] def dead_nodes(self): with self.lock: - return [handler.node for handler in iter(self.event_handlers.values()) + return [handler.node for handler in self.event_handlers.values() if handler.state == ConsumerState.Dead] def alive_nodes(self): with self.lock: - return [handler.node for handler in iter(self.event_handlers.values()) + return [handler.node for handler in self.event_handlers.values() if handler.state != ConsumerState.Dead] diff --git a/tests/kafkatest/tests/client/client_compatibility_features_test.py b/tests/kafkatest/tests/client/client_compatibility_features_test.py index 1b9d5248baa44..f815326868032 100644 --- a/tests/kafkatest/tests/client/client_compatibility_features_test.py +++ b/tests/kafkatest/tests/client/client_compatibility_features_test.py @@ -88,8 +88,8 @@ def invoke_compatibility_program(self, features): "--topic %s " % (self.zk.path.script("kafka-run-class.sh", node), self.kafka.bootstrap_servers(), len(self.kafka.nodes), - [*self.topics.keys()][0])) - for k, v in iter(features.items()): + list(self.topics.keys())[0])) + for k, v in features.items(): cmd = cmd + ("--%s %s " % (k, v)) results_dir = TestContext.results_dir(self.test_context, 0) try: diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py index 5ccf345a2886d..204cabbecdf41 100644 --- a/tests/kafkatest/tests/client/quota_test.py +++ b/tests/kafkatest/tests/client/quota_test.py @@ -162,7 +162,7 @@ def test_quota(self, quota_type, override_quota=True, producer_num=1, consumer_n jmx_attributes=['bytes-consumed-rate'], version=client_version) consumer.run() - for idx, messages in iter(consumer.messages_consumed.items()): + for idx, messages in consumer.messages_consumed.items(): assert len(messages) > 0, "consumer %d didn't consume any message before timeout" % idx success, msg = self.validate(self.kafka, producer, consumer) diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index eabe9a61f8a72..479436236e9a5 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -421,14 +421,11 @@ def test_bounce(self, clean, connect_protocol): src_seqnos = [msg['seqno'] for msg in src_messages if msg['task'] == task] # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because clean # bouncing should commit on rebalance. - if len(src_seqnos) == 0: - src_seqno_max = 0 - else: - src_seqno_max = max(src_seqnos) + src_seqno_max = max(src_seqnos) if len(src_seqnos) else 0 self.logger.debug("Max source seqno: %d", src_seqno_max) src_seqno_counts = Counter(src_seqnos) missing_src_seqnos = sorted(set(range(src_seqno_max)).difference(set(src_seqnos))) - duplicate_src_seqnos = sorted([seqno for seqno,count in iter(src_seqno_counts.items()) if count > 1]) + duplicate_src_seqnos = sorted([seqno for seqno,count in src_seqno_counts.items() if count > 1]) if missing_src_seqnos: self.logger.error("Missing source sequence numbers for task " + str(task)) @@ -444,11 +441,7 @@ def test_bounce(self, clean, connect_protocol): sink_seqnos = [msg['seqno'] for msg in sink_messages if msg['task'] == task] # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because # clean bouncing should commit on rebalance. - - if len(sink_seqnos) == 0: - sink_seqno_max = 0 - else: - sink_seqno_max = max(sink_seqnos) + sink_seqno_max = max(sink_seqnos) if len(sink_seqnos) else 0 self.logger.debug("Max sink seqno: %d", sink_seqno_max) sink_seqno_counts = Counter(sink_seqnos) missing_sink_seqnos = sorted(set(range(sink_seqno_max)).difference(set(sink_seqnos))) diff --git a/tests/kafkatest/tests/end_to_end.py b/tests/kafkatest/tests/end_to_end.py index 6da11b56fb47a..7ef6b974f6905 100644 --- a/tests/kafkatest/tests/end_to_end.py +++ b/tests/kafkatest/tests/end_to_end.py @@ -85,7 +85,7 @@ def on_record_consumed(self, record, node): def await_consumed_offsets(self, last_acked_offsets, timeout_sec): def has_finished_consuming(): - for partition, offset in iter(last_acked_offsets.items()): + for partition, offset in last_acked_offsets.items(): if not partition in self.last_consumed_offsets: return False last_commit = self.consumer.last_commit(partition) diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py index 384d71ffc5b93..d899280812764 100644 --- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py @@ -119,7 +119,7 @@ def __init__(self, test_context): def fail_broker_type(self, failure_mode, broker_type): # Pick a random topic and bounce it's leader topic_index = randint(0, len(self.topics.keys()) - 1) - topic = [*self.topics.keys()][topic_index] + topic = list(self.topics.keys())[topic_index] failures[failure_mode](self, topic, broker_type) def fail_many_brokers(self, failure_mode, num_failures): From e1d523efcefcf47ffd5d1f4b77833bc0f706e817 Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Sun, 23 Aug 2020 11:49:16 +0300 Subject: [PATCH 17/24] KAFKA-10402: code review fixes. --- tests/kafkatest/version.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index a23001d45b026..77bbccb49c82f 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -49,21 +49,6 @@ def __str__(self): else: return LooseVersion.__str__(self) - def __eq__(self, other): - return self._cmp(other) == 0 - - def __lt__(self, other): - return self._cmp(other) < 0 - - def __le__(self, other): - return self._cmp(other) <= 0 - - def __gt__(self, other): - return self._cmp(other) > 0 - - def __ge__(self, other): - return self._cmp(other) >= 0 - def _cmp(self, other): if isinstance(other, str): other = KafkaVersion(other) From 18e48af0a67f1b8a8a96455de5092583890e690d Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Sun, 23 Aug 2020 14:46:08 +0300 Subject: [PATCH 18/24] KAFKA-10402: code review fixes. --- tests/kafkatest/services/monitor/http.py | 2 +- tests/kafkatest/services/performance/producer_performance.py | 2 +- tests/kafkatest/services/verifiable_consumer.py | 4 ++-- tests/kafkatest/tests/connect/connect_distributed_test.py | 4 ++-- tests/kafkatest/tests/verifiable_consumer_test.py | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/kafkatest/services/monitor/http.py b/tests/kafkatest/services/monitor/http.py index 63ca3424182b2..0293fbd8c4f98 100644 --- a/tests/kafkatest/services/monitor/http.py +++ b/tests/kafkatest/services/monitor/http.py @@ -154,7 +154,7 @@ def do_POST(self): name = raw_metric['name'] group = raw_metric['group'] # Convert to tuple of pairs because dicts & lists are unhashable - tags = tuple([(k, v) for k, v in raw_metric['tags'].items()]), + tags = tuple((k, v) for k, v in raw_metric['tags'].items()), value = raw_metric['value'] key = MetricKey(host=host, client_id=client_id, name=name, group=group, tags=tags) diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py index 87d4bcfd96e6a..3c4369883040d 100644 --- a/tests/kafkatest/services/performance/producer_performance.py +++ b/tests/kafkatest/services/performance/producer_performance.py @@ -78,7 +78,7 @@ def start_cmd(self, node): 'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol), 'client_id': self.client_id, 'kafka_run_class': self.path.script("kafka-run-class.sh", node), - 'metrics_props': ' '.join(["%s=%s" % (k, v) for k, v in self.http_metrics_client_configs.items()]) + 'metrics_props': ' '.join("%s=%s" % (k, v) for k, v in self.http_metrics_client_configs.items()) }) cmd = "" diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py index 8f994bda87d3b..93d9446fb9bba 100644 --- a/tests/kafkatest/services/verifiable_consumer.py +++ b/tests/kafkatest/services/verifiable_consumer.py @@ -394,8 +394,8 @@ def num_rebalances(self): def num_revokes_for_alive(self, keep_alive=1): with self.lock: - return max([handler.revoked_count for handler in self.event_handlers.values() - if handler.idx <= keep_alive]) + return max(handler.revoked_count for handler in self.event_handlers.values() + if handler.idx <= keep_alive) def joined_nodes(self): with self.lock: diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index 479436236e9a5..684cbc6da56da 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -425,7 +425,7 @@ def test_bounce(self, clean, connect_protocol): self.logger.debug("Max source seqno: %d", src_seqno_max) src_seqno_counts = Counter(src_seqnos) missing_src_seqnos = sorted(set(range(src_seqno_max)).difference(set(src_seqnos))) - duplicate_src_seqnos = sorted([seqno for seqno,count in src_seqno_counts.items() if count > 1]) + duplicate_src_seqnos = sorted(seqno for seqno,count in src_seqno_counts.items() if count > 1) if missing_src_seqnos: self.logger.error("Missing source sequence numbers for task " + str(task)) @@ -445,7 +445,7 @@ def test_bounce(self, clean, connect_protocol): self.logger.debug("Max sink seqno: %d", sink_seqno_max) sink_seqno_counts = Counter(sink_seqnos) missing_sink_seqnos = sorted(set(range(sink_seqno_max)).difference(set(sink_seqnos))) - duplicate_sink_seqnos = sorted([seqno for seqno,count in iter(sink_seqno_counts.items()) if count > 1]) + duplicate_sink_seqnos = sorted(seqno for seqno,count in iter(sink_seqno_counts.items()) if count > 1) if missing_sink_seqnos: self.logger.error("Missing sink sequence numbers for task " + str(task)) diff --git a/tests/kafkatest/tests/verifiable_consumer_test.py b/tests/kafkatest/tests/verifiable_consumer_test.py index 0783768c504c7..e416690bfc402 100644 --- a/tests/kafkatest/tests/verifiable_consumer_test.py +++ b/tests/kafkatest/tests/verifiable_consumer_test.py @@ -40,7 +40,7 @@ def _all_partitions(self, topic, num_partitions): def _partitions(self, assignment): partitions = [] - for parts in iter(assignment.values()): + for parts in assignment.values(): partitions += parts return partitions From d706fc73d06b4edabd0979dcd2c3f1e75f5736b0 Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Mon, 24 Aug 2020 11:32:29 +0300 Subject: [PATCH 19/24] KAFKA-10402: code review fixes. --- tests/kafkatest/utils/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/kafkatest/utils/__init__.py b/tests/kafkatest/utils/__init__.py index 72a2858583b5e..5d5d38dfe0aad 100644 --- a/tests/kafkatest/utils/__init__.py +++ b/tests/kafkatest/utils/__init__.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .util import kafkatest_version, is_version, is_int, is_int_with_prefix, node_is_reachable, validate_delivery \ No newline at end of file +from .util import kafkatest_version, is_version, is_int, is_int_with_prefix, node_is_reachable, validate_delivery From 14712e64299d700bfd19279cf0ddc878b8d17ab2 Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Thu, 3 Sep 2020 12:42:03 +0300 Subject: [PATCH 20/24] KAFKA-10402: code review fixes. --- tests/kafkatest/tests/core/replica_scale_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/kafkatest/tests/core/replica_scale_test.py b/tests/kafkatest/tests/core/replica_scale_test.py index 5b6e5b9582dda..0fb803dadd416 100644 --- a/tests/kafkatest/tests/core/replica_scale_test.py +++ b/tests/kafkatest/tests/core/replica_scale_test.py @@ -46,7 +46,7 @@ def teardown(self): self.zk.stop() @cluster(num_nodes=12) - @parametrize(topic_count=100, partition_count=34, replication_factor=3) + @parametrize(topic_count=500, partition_count=34, replication_factor=3) def test_produce_consume(self, topic_count, partition_count, replication_factor): topics_create_start_time = time.time() for i in range(topic_count): @@ -101,7 +101,7 @@ def test_produce_consume(self, topic_count, partition_count, replication_factor) trogdor.stop() @cluster(num_nodes=12) - @parametrize(topic_count=100, partition_count=34, replication_factor=3) + @parametrize(topic_count=500, partition_count=34, replication_factor=3) def test_clean_bounce(self, topic_count, partition_count, replication_factor): topics_create_start_time = time.time() for i in range(topic_count): From 4bb0f1d7568d56640915ecb49fe47de5e9550c0c Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Thu, 3 Sep 2020 12:56:18 +0300 Subject: [PATCH 21/24] KAFKA-10402: code review fixes. --- tests/kafkatest/tests/core/network_degrade_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/kafkatest/tests/core/network_degrade_test.py b/tests/kafkatest/tests/core/network_degrade_test.py index 808b4f9faef07..76af8b0dd72c0 100644 --- a/tests/kafkatest/tests/core/network_degrade_test.py +++ b/tests/kafkatest/tests/core/network_degrade_test.py @@ -134,5 +134,5 @@ def test_rate(self, task_name, device_name, latency_ms, rate_limit_kbit): acceptable_rates = [r for r in measured_rates if low_kbps < r < high_kbps] msg = "Expected most of the measured rates to be within an order of magnitude of target %d." % rate_limit_kbit - msg += " This means `tc` did not limit the bandwidth as expected. Measured rates %s" % str(measured_rates) + msg += " This means `tc` did not limit the bandwidth as expected." assert len(acceptable_rates) > 5, msg From c31fd72593cb93b0b5785c31bd973f6a12b5c04c Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Thu, 3 Sep 2020 14:36:07 +0300 Subject: [PATCH 22/24] KAFKA-10402: code review fixes. --- tests/docker/Dockerfile | 59 ++++++++++++++++++++--------------------- 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index 19bb100d08715..238c1c11bad58 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -33,8 +33,7 @@ LABEL ducker.creator=$ducker_creator # Update Linux and install necessary utilities. # we have to install git since it is included in openjdk:8 but not openjdk:11 -RUN apt-mark hold python2 python2-minimal python2.7 python2.7-minimal libpython2-stdlib libpython2.7-minimal libpython2.7-stdlib -RUN apt update && apt install -y sudo git netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python-pip python-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean +RUN apt update && apt install -y sudo git netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean RUN python3 -m pip install -U pip==20.2.2; RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 RUN pip3 install git+https://github.com/confluentinc/ducktape @@ -48,36 +47,36 @@ RUN echo 'PermitUserEnvironment yes' >> /etc/ssh/sshd_config # Install binary test dependencies. # we use the same versions as in vagrant/base.sh ARG KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages" -#RUN mkdir -p "/opt/kafka-0.8.2.2" && chmod a+rw /opt/kafka-0.8.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.8.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.8.2.2" -#RUN mkdir -p "/opt/kafka-0.9.0.1" && chmod a+rw /opt/kafka-0.9.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.9.0.1" -#RUN mkdir -p "/opt/kafka-0.10.0.1" && chmod a+rw /opt/kafka-0.10.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1" -#RUN mkdir -p "/opt/kafka-0.10.1.1" && chmod a+rw /opt/kafka-0.10.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1" -#RUN mkdir -p "/opt/kafka-0.10.2.2" && chmod a+rw /opt/kafka-0.10.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.2" -#RUN mkdir -p "/opt/kafka-0.11.0.3" && chmod a+rw /opt/kafka-0.11.0.3 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.3.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.3" -#RUN mkdir -p "/opt/kafka-1.0.2" && chmod a+rw /opt/kafka-1.0.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.2" -#RUN mkdir -p "/opt/kafka-1.1.1" && chmod a+rw /opt/kafka-1.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.1.1" -#RUN mkdir -p "/opt/kafka-2.0.1" && chmod a+rw /opt/kafka-2.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.0.1" -#RUN mkdir -p "/opt/kafka-2.1.1" && chmod a+rw /opt/kafka-2.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.1.1" -#RUN mkdir -p "/opt/kafka-2.2.2" && chmod a+rw /opt/kafka-2.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.2.2" -#RUN mkdir -p "/opt/kafka-2.3.1" && chmod a+rw /opt/kafka-2.3.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.3.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.3.1" -#RUN mkdir -p "/opt/kafka-2.4.1" && chmod a+rw /opt/kafka-2.4.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.4.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.4.1" -#RUN mkdir -p "/opt/kafka-2.5.1" && chmod a+rw /opt/kafka-2.5.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.5.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.5.1" -#RUN mkdir -p "/opt/kafka-2.6.0" && chmod a+rw /opt/kafka-2.6.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.6.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.6.0" +RUN mkdir -p "/opt/kafka-0.8.2.2" && chmod a+rw /opt/kafka-0.8.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.8.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.8.2.2" +RUN mkdir -p "/opt/kafka-0.9.0.1" && chmod a+rw /opt/kafka-0.9.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.9.0.1" +RUN mkdir -p "/opt/kafka-0.10.0.1" && chmod a+rw /opt/kafka-0.10.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1" +RUN mkdir -p "/opt/kafka-0.10.1.1" && chmod a+rw /opt/kafka-0.10.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1" +RUN mkdir -p "/opt/kafka-0.10.2.2" && chmod a+rw /opt/kafka-0.10.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.2" +RUN mkdir -p "/opt/kafka-0.11.0.3" && chmod a+rw /opt/kafka-0.11.0.3 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.3.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.3" +RUN mkdir -p "/opt/kafka-1.0.2" && chmod a+rw /opt/kafka-1.0.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.2" +RUN mkdir -p "/opt/kafka-1.1.1" && chmod a+rw /opt/kafka-1.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.1.1" +RUN mkdir -p "/opt/kafka-2.0.1" && chmod a+rw /opt/kafka-2.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.0.1" +RUN mkdir -p "/opt/kafka-2.1.1" && chmod a+rw /opt/kafka-2.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.1.1" +RUN mkdir -p "/opt/kafka-2.2.2" && chmod a+rw /opt/kafka-2.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.2.2" +RUN mkdir -p "/opt/kafka-2.3.1" && chmod a+rw /opt/kafka-2.3.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.3.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.3.1" +RUN mkdir -p "/opt/kafka-2.4.1" && chmod a+rw /opt/kafka-2.4.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.4.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.4.1" +RUN mkdir -p "/opt/kafka-2.5.1" && chmod a+rw /opt/kafka-2.5.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.5.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.5.1" +RUN mkdir -p "/opt/kafka-2.6.0" && chmod a+rw /opt/kafka-2.6.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.6.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.6.0" # Streams test dependencies -#RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o /opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar -#RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.1.1-test.jar" -o /opt/kafka-0.10.1.1/libs/kafka-streams-0.10.1.1-test.jar -#RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.2.2-test.jar" -o /opt/kafka-0.10.2.2/libs/kafka-streams-0.10.2.2-test.jar -#RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.3-test.jar" -o /opt/kafka-0.11.0.3/libs/kafka-streams-0.11.0.3-test.jar -#RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.0.2-test.jar" -o /opt/kafka-1.0.2/libs/kafka-streams-1.0.2-test.jar -#RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.1.1-test.jar" -o /opt/kafka-1.1.1/libs/kafka-streams-1.1.1-test.jar -#RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.0.1-test.jar" -o /opt/kafka-2.0.1/libs/kafka-streams-2.0.1-test.jar -#RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.1.1-test.jar" -o /opt/kafka-2.1.1/libs/kafka-streams-2.1.1-test.jar -#RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.2.2-test.jar" -o /opt/kafka-2.2.2/libs/kafka-streams-2.2.2-test.jar -#RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.3.1-test.jar" -o /opt/kafka-2.3.1/libs/kafka-streams-2.3.1-test.jar -#RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.4.1-test.jar" -o /opt/kafka-2.4.1/libs/kafka-streams-2.4.1-test.jar -#RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.5.1-test.jar" -o /opt/kafka-2.5.1/libs/kafka-streams-2.5.1-test.jar -#RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.6.0-test.jar" -o /opt/kafka-2.6.0/libs/kafka-streams-2.6.0-test.jar +RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o /opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar +RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.1.1-test.jar" -o /opt/kafka-0.10.1.1/libs/kafka-streams-0.10.1.1-test.jar +RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.2.2-test.jar" -o /opt/kafka-0.10.2.2/libs/kafka-streams-0.10.2.2-test.jar +RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.3-test.jar" -o /opt/kafka-0.11.0.3/libs/kafka-streams-0.11.0.3-test.jar +RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.0.2-test.jar" -o /opt/kafka-1.0.2/libs/kafka-streams-1.0.2-test.jar +RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.1.1-test.jar" -o /opt/kafka-1.1.1/libs/kafka-streams-1.1.1-test.jar +RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.0.1-test.jar" -o /opt/kafka-2.0.1/libs/kafka-streams-2.0.1-test.jar +RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.1.1-test.jar" -o /opt/kafka-2.1.1/libs/kafka-streams-2.1.1-test.jar +RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.2.2-test.jar" -o /opt/kafka-2.2.2/libs/kafka-streams-2.2.2-test.jar +RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.3.1-test.jar" -o /opt/kafka-2.3.1/libs/kafka-streams-2.3.1-test.jar +RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.4.1-test.jar" -o /opt/kafka-2.4.1/libs/kafka-streams-2.4.1-test.jar +RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.5.1-test.jar" -o /opt/kafka-2.5.1/libs/kafka-streams-2.5.1-test.jar +RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.6.0-test.jar" -o /opt/kafka-2.6.0/libs/kafka-streams-2.6.0-test.jar # The version of Kibosh to use for testing. # If you update this, also update vagrant/base.sh From 316dc95a0dc4ae6d49327a34ea31e2824726bbed Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Thu, 17 Sep 2020 16:49:51 +0300 Subject: [PATCH 23/24] KAFKA-10402: decreasing topic count to run tests on docker --- tests/kafkatest/tests/core/replica_scale_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/kafkatest/tests/core/replica_scale_test.py b/tests/kafkatest/tests/core/replica_scale_test.py index 0fb803dadd416..dd8f89fbeb9c7 100644 --- a/tests/kafkatest/tests/core/replica_scale_test.py +++ b/tests/kafkatest/tests/core/replica_scale_test.py @@ -46,7 +46,7 @@ def teardown(self): self.zk.stop() @cluster(num_nodes=12) - @parametrize(topic_count=500, partition_count=34, replication_factor=3) + @parametrize(topic_count=50, partition_count=34, replication_factor=3) def test_produce_consume(self, topic_count, partition_count, replication_factor): topics_create_start_time = time.time() for i in range(topic_count): @@ -101,7 +101,7 @@ def test_produce_consume(self, topic_count, partition_count, replication_factor) trogdor.stop() @cluster(num_nodes=12) - @parametrize(topic_count=500, partition_count=34, replication_factor=3) + @parametrize(topic_count=50, partition_count=34, replication_factor=3) def test_clean_bounce(self, topic_count, partition_count, replication_factor): topics_create_start_time = time.time() for i in range(topic_count): From 2e47ec3772a39983c15ea7ae698915e78429454b Mon Sep 17 00:00:00 2001 From: Nikolay Izhikov Date: Wed, 7 Oct 2020 16:50:38 +0300 Subject: [PATCH 24/24] KAFKA-10402: ducktape 0.8.0 --- tests/docker/Dockerfile | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index 238c1c11bad58..8e8169aec3d1b 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -35,8 +35,7 @@ LABEL ducker.creator=$ducker_creator # we have to install git since it is included in openjdk:8 but not openjdk:11 RUN apt update && apt install -y sudo git netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean RUN python3 -m pip install -U pip==20.2.2; -RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 -RUN pip3 install git+https://github.com/confluentinc/ducktape +RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip3 install --upgrade ducktape==0.8.0 # Set up ssh COPY ./ssh-config /root/.ssh/config