From 27e2aedee387969bd00a32ec2b858da1199a84b4 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Tue, 11 Dec 2018 16:05:57 -0600 Subject: [PATCH] MINOR: improve resilience of Streams test producers --- tests/kafkatest/services/verifiable_producer.py | 9 +++++++-- tests/kafkatest/tests/streams/base_streams_test.py | 5 +++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index f339a62313fae..744524eda0e76 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -58,7 +58,8 @@ def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput message_validator=is_int, compression_types=None, version=DEV_BRANCH, acks=None, stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO", enable_idempotence=False, offline_nodes=[], create_time=-1, repeating_keys=None, - jaas_override_variables=None, kafka_opts_override="", client_prop_file_override=""): + jaas_override_variables=None, kafka_opts_override="", client_prop_file_override="", + retries=None): """ Args: :param max_messages number of messages to be produced per producer @@ -102,7 +103,7 @@ def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput self.jaas_override_variables = jaas_override_variables or {} self.kafka_opts_override = kafka_opts_override self.client_prop_file_override = client_prop_file_override - + self.retries = retries def java_class_name(self): return "VerifiableProducer" @@ -145,6 +146,10 @@ def _worker(self, idx, node): producer_prop_file += "\nmax.in.flight.requests.per.connection=5\n" producer_prop_file += "\nretries=1000000\n" producer_prop_file += "\nenable.idempotence=true\n" + elif self.retries is not None: + self.logger.info("VerifiableProducer (index = %d) will use retries = %s", idx, self.retries) + producer_prop_file += "\nretries=%s\n" % self.retries + producer_prop_file += "\ndelivery.timeout.ms=%s\n" % (self.request_timeout_sec * 1000 * self.retries) self.logger.info("verifiable_producer.properties:") self.logger.info(producer_prop_file) diff --git a/tests/kafkatest/tests/streams/base_streams_test.py b/tests/kafkatest/tests/streams/base_streams_test.py index 9a9704e3b8423..53e4231edb419 100644 --- a/tests/kafkatest/tests/streams/base_streams_test.py +++ b/tests/kafkatest/tests/streams/base_streams_test.py @@ -44,9 +44,10 @@ def get_producer(self, topic, num_messages, throughput=1000, repeating_keys=None self.kafka, topic, max_messages=num_messages, - acks=1, + acks=-1, throughput=throughput, - repeating_keys=repeating_keys) + repeating_keys=repeating_keys, + retries=10) def assert_produce_consume(self, streams_source_topic,