Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions tests/kafkatest/services/verifiable_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput
message_validator=is_int, compression_types=None, version=DEV_BRANCH, acks=None,
stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO",
enable_idempotence=False, offline_nodes=[], create_time=-1, repeating_keys=None,
jaas_override_variables=None, kafka_opts_override="", client_prop_file_override=""):
jaas_override_variables=None, kafka_opts_override="", client_prop_file_override="",
retries=None):
"""
Args:
:param max_messages number of messages to be produced per producer
Expand Down Expand Up @@ -102,7 +103,7 @@ def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput
self.jaas_override_variables = jaas_override_variables or {}
self.kafka_opts_override = kafka_opts_override
self.client_prop_file_override = client_prop_file_override

self.retries = retries

def java_class_name(self):
return "VerifiableProducer"
Expand Down Expand Up @@ -145,6 +146,10 @@ def _worker(self, idx, node):
producer_prop_file += "\nmax.in.flight.requests.per.connection=5\n"
producer_prop_file += "\nretries=1000000\n"
producer_prop_file += "\nenable.idempotence=true\n"
elif self.retries is not None:
self.logger.info("VerifiableProducer (index = %d) will use retries = %s", idx, self.retries)
producer_prop_file += "\nretries=%s\n" % self.retries
producer_prop_file += "\ndelivery.timeout.ms=%s\n" % (self.request_timeout_sec * 1000 * self.retries)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retries parameter is ignored if EOS is enabled.

Also, along with passing the retries config along to the producer, we set the delivery timeout high enough to accomodate the desired retry value.


self.logger.info("verifiable_producer.properties:")
self.logger.info(producer_prop_file)
Expand Down
5 changes: 3 additions & 2 deletions tests/kafkatest/tests/streams/base_streams_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ def get_producer(self, topic, num_messages, throughput=1000, repeating_keys=None
self.kafka,
topic,
max_messages=num_messages,
acks=1,
acks=-1,
throughput=throughput,
repeating_keys=repeating_keys)
repeating_keys=repeating_keys,
retries=10)

def assert_produce_consume(self,
streams_source_topic,
Expand Down