From a74b2fc63c3a12f433fb0383c062b13542c3043f Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Wed, 4 Dec 2024 22:49:43 +0800 Subject: [PATCH 1/8] KAFKA-18156: VerifiableConsumer should ignore "--session-timeout" when using CONSUMER protocol --- .../org/apache/kafka/clients/CommonClientConfigs.java | 3 ++- tests/kafkatest/services/verifiable_client.py | 2 +- .../java/org/apache/kafka/tools/VerifiableConsumer.java | 8 ++++++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index de01396c8c70b..aa3b5c9d628c9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -206,7 +206,8 @@ public class CommonClientConfigs { + "to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, " + "then the broker will remove this client from the group and initiate a rebalance. Note that the value " + "must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms " - + "and group.max.session.timeout.ms."; + + "and group.max.session.timeout.ms. Note that this configuration is not supported when group.protocol " + + "is set to \"consumer\"."; public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms"; public static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer " diff --git a/tests/kafkatest/services/verifiable_client.py b/tests/kafkatest/services/verifiable_client.py index 4971136a64e78..9c6daa2f218f3 100644 --- a/tests/kafkatest/services/verifiable_client.py +++ b/tests/kafkatest/services/verifiable_client.py @@ -70,7 +70,7 @@ * `--group-id ` * `--topic ` * `--broker-list ` - * `--session-timeout ` + * `--session-timeout ` - note that session timeout cannot be set when group protocol is consumer * `--enable-autocommit` * `--max-messages ` * `--assignment-strategy ` diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index 0436e0d85e8f4..63fb1c01762ac 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -578,7 +578,7 @@ private static ArgumentParser argParser() { .type(Integer.class) .metavar("TIMEOUT_MS") .dest("sessionTimeout") - .help("Set the consumer's session timeout"); + .help("Set the consumer's session timeout, note that session timeout cannot be set when group protocol is consumer"); parser.addArgument("--verbose") .action(storeTrue()) @@ -664,7 +664,11 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[] consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, useAutoCommit); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, res.getString("resetPolicy")); - consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout"))); + + // session.timeout.ms cannot be set when group.protocol=CONSUMER + if (!groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name)) { + consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout"))); + } StringDeserializer deserializer = new StringDeserializer(); KafkaConsumer consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer); From 64c61d5ee820ccc79a9f40470d1fc584d0466b8d Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sat, 7 Dec 2024 10:15:18 +0800 Subject: [PATCH 2/8] Address comment --- .../java/org/apache/kafka/tools/VerifiableConsumer.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index 63fb1c01762ac..bf7684930b984 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -649,8 +649,9 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[] if (groupRemoteAssignor != null) consumerProps.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, groupRemoteAssignor); } else { - // This means we're using the old consumer group protocol. + // This means we're using the CLASSIC consumer group protocol. consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, res.getString("assignmentStrategy")); + consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout"))); } consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, res.getString("groupId")); @@ -665,11 +666,6 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[] consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, useAutoCommit); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, res.getString("resetPolicy")); - // session.timeout.ms cannot be set when group.protocol=CONSUMER - if (!groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name)) { - consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout"))); - } - StringDeserializer deserializer = new StringDeserializer(); KafkaConsumer consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer); From 8a3ccc4d4b5d249d21d4ae190398bd539c126fe5 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sat, 7 Dec 2024 14:05:39 +0800 Subject: [PATCH 3/8] Address comment --- .../java/org/apache/kafka/tools/VerifiableConsumer.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index bf7684930b984..fbb0f102d6468 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -574,7 +574,6 @@ private static ArgumentParser argParser() { parser.addArgument("--session-timeout") .action(store()) .required(false) - .setDefault(30000) .type(Integer.class) .metavar("TIMEOUT_MS") .dest("sessionTimeout") @@ -651,7 +650,10 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[] } else { // This means we're using the CLASSIC consumer group protocol. consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, res.getString("assignmentStrategy")); - consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout"))); + Integer sessionTimeout = res.getInt("sessionTimeout"); + if (sessionTimeout != null) { + consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(sessionTimeout)); + } } consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, res.getString("groupId")); From 892a08ddf9a0aae548a73cbe36c675cf6b990846 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sat, 7 Dec 2024 14:09:02 +0800 Subject: [PATCH 4/8] Enhance config desc --- tests/kafkatest/services/verifiable_client.py | 2 +- .../main/java/org/apache/kafka/tools/VerifiableConsumer.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/kafkatest/services/verifiable_client.py b/tests/kafkatest/services/verifiable_client.py index 9c6daa2f218f3..4a3ea5e17da0b 100644 --- a/tests/kafkatest/services/verifiable_client.py +++ b/tests/kafkatest/services/verifiable_client.py @@ -70,7 +70,7 @@ * `--group-id ` * `--topic ` * `--broker-list ` - * `--session-timeout ` - note that session timeout cannot be set when group protocol is consumer + * `--session-timeout ` - note that this configuration is not supported when group protocol is consumer * `--enable-autocommit` * `--max-messages ` * `--assignment-strategy ` diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index fbb0f102d6468..de6f541154147 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -577,7 +577,7 @@ private static ArgumentParser argParser() { .type(Integer.class) .metavar("TIMEOUT_MS") .dest("sessionTimeout") - .help("Set the consumer's session timeout, note that session timeout cannot be set when group protocol is consumer"); + .help("Set the consumer's session timeout, note that this configuration is not supported when group protocol is consumer"); parser.addArgument("--verbose") .action(storeTrue()) From 5d77e9e1b1880cdb6c9e9c34f14306c894ad0343 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sat, 7 Dec 2024 14:38:42 +0800 Subject: [PATCH 5/8] Address comments --- tests/kafkatest/services/verifiable_consumer.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py index 7e81ca1f7ceea..977ca64ea77f7 100644 --- a/tests/kafkatest/services/verifiable_consumer.py +++ b/tests/kafkatest/services/verifiable_consumer.py @@ -231,7 +231,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou } def __init__(self, context, num_nodes, kafka, topic, group_id, - static_membership=False, max_messages=-1, session_timeout_sec=30, enable_autocommit=False, + static_membership=False, max_messages=-1, session_timeout_sec=0, enable_autocommit=False, assignment_strategy=None, group_protocol=None, group_remote_assignor=None, version=DEV_BRANCH, stop_timeout_sec=30, log_level="INFO", jaas_override_variables=None, on_record_consumed=None, reset_policy="earliest", verify_offsets=True): @@ -417,10 +417,13 @@ def start_cmd(self, node): else: cmd += " --bootstrap-server %s" % self.kafka.bootstrap_servers(self.security_config.security_protocol) - cmd += " --reset-policy %s --group-id %s --topic %s --session-timeout %s" % \ - (self.reset_policy, self.group_id, self.topic, - self.session_timeout_sec*1000) - + cmd += " --reset-policy %s --group-id %s --topic %s" % \ + (self.reset_policy, self.group_id, self.topic) + + # session timeout is not supported when using CONSUMER group protocol + if self.session_timeout_sec > 0 and self.is_consumer_group_protocol_enabled(): + cmd += " --session-timeout %s" % self.session_timeout_sec*1000 + if self.max_messages > 0: cmd += " --max-messages %s" % str(self.max_messages) From eb5047e02a1de24868b73426c630f15212840ce8 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sun, 8 Dec 2024 21:46:23 +0800 Subject: [PATCH 6/8] Address comment --- tests/kafkatest/services/verifiable_consumer.py | 5 +---- tests/kafkatest/tests/verifiable_consumer_test.py | 11 +++++------ 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py index 977ca64ea77f7..1dc1e4bd4ecd3 100644 --- a/tests/kafkatest/services/verifiable_consumer.py +++ b/tests/kafkatest/services/verifiable_consumer.py @@ -251,8 +251,6 @@ def __init__(self, context, num_nodes, kafka, topic, group_id, self.session_timeout_sec = session_timeout_sec self.enable_autocommit = enable_autocommit self.assignment_strategy = assignment_strategy - self.group_protocol = group_protocol - self.group_remote_assignor = group_remote_assignor self.prop_file = "" self.stop_timeout_sec = stop_timeout_sec self.on_record_consumed = on_record_consumed @@ -420,8 +418,7 @@ def start_cmd(self, node): cmd += " --reset-policy %s --group-id %s --topic %s" % \ (self.reset_policy, self.group_id, self.topic) - # session timeout is not supported when using CONSUMER group protocol - if self.session_timeout_sec > 0 and self.is_consumer_group_protocol_enabled(): + if self.session_timeout_sec > 0: cmd += " --session-timeout %s" % self.session_timeout_sec*1000 if self.max_messages > 0: diff --git a/tests/kafkatest/tests/verifiable_consumer_test.py b/tests/kafkatest/tests/verifiable_consumer_test.py index 08da754732d46..e79c316c3a297 100644 --- a/tests/kafkatest/tests/verifiable_consumer_test.py +++ b/tests/kafkatest/tests/verifiable_consumer_test.py @@ -24,13 +24,12 @@ class VerifiableConsumerTest(KafkaTest): PRODUCER_REQUEST_TIMEOUT_SEC = 30 def __init__(self, test_context, num_consumers=1, num_producers=0, - group_id="test_group_id", session_timeout_sec=10, **kwargs): + group_id="test_group_id", **kwargs): super(VerifiableConsumerTest, self).__init__(test_context, **kwargs) self.num_consumers = num_consumers self.num_producers = num_producers self.group_id = group_id - self.session_timeout_sec = session_timeout_sec - self.consumption_timeout_sec = max(self.PRODUCER_REQUEST_TIMEOUT_SEC + 5, 2 * session_timeout_sec) + self.consumption_timeout_sec = self.PRODUCER_REQUEST_TIMEOUT_SEC + 5 def _all_partitions(self, topic, num_partitions): partitions = set() @@ -56,7 +55,7 @@ def min_cluster_size(self): def setup_consumer(self, topic, static_membership=False, enable_autocommit=False, assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor", group_remote_assignor="range", **kwargs): return VerifiableConsumer(self.test_context, self.num_consumers, self.kafka, - topic, self.group_id, static_membership=static_membership, session_timeout_sec=self.session_timeout_sec, + topic, self.group_id, static_membership=static_membership, assignment_strategy=assignment_strategy, enable_autocommit=enable_autocommit, group_remote_assignor=group_remote_assignor, log_level="TRACE", **kwargs) @@ -81,9 +80,9 @@ def await_consumed_messages(self, consumer, min_messages=1): def await_members(self, consumer, num_consumers): # Wait until all members have joined the group wait_until(lambda: len(consumer.joined_nodes()) == num_consumers, - timeout_sec=self.session_timeout_sec*2, + timeout_sec=self.PRODUCER_REQUEST_TIMEOUT_SEC*2, err_msg="Consumers failed to join in a reasonable amount of time") - + def await_all_members(self, consumer): self.await_members(consumer, self.num_consumers) From bc10813bd89515524225986563798025fb3ccef8 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sun, 8 Dec 2024 22:49:11 +0800 Subject: [PATCH 7/8] Address comment --- .../java/org/apache/kafka/tools/VerifiableConsumer.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index de6f541154147..825e5ed2c6ec0 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -650,10 +650,11 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[] } else { // This means we're using the CLASSIC consumer group protocol. consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, res.getString("assignmentStrategy")); - Integer sessionTimeout = res.getInt("sessionTimeout"); - if (sessionTimeout != null) { - consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(sessionTimeout)); - } + } + + Integer sessionTimeout = res.getInt("sessionTimeout"); + if (sessionTimeout != null) { + consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(sessionTimeout)); } consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, res.getString("groupId")); From 84665d27b268697fd76bfda97b8bce7ea3b10932 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Mon, 9 Dec 2024 23:45:13 +0800 Subject: [PATCH 8/8] Address comments --- .../consumer_protocol_migration_test.py | 2 +- tests/kafkatest/tests/client/consumer_test.py | 21 +++++-------------- .../kafkatest/tests/client/pluggable_test.py | 2 +- .../tests/verifiable_consumer_test.py | 2 +- 4 files changed, 8 insertions(+), 19 deletions(-) diff --git a/tests/kafkatest/tests/client/consumer_protocol_migration_test.py b/tests/kafkatest/tests/client/consumer_protocol_migration_test.py index 07f501fe0c69b..a03228b617a2a 100644 --- a/tests/kafkatest/tests/client/consumer_protocol_migration_test.py +++ b/tests/kafkatest/tests/client/consumer_protocol_migration_test.py @@ -77,7 +77,7 @@ def rolling_bounce_consumers(self, consumer, clean_shutdown=True): consumer.stop_node(node, clean_shutdown) wait_until(lambda: len(consumer.dead_nodes()) == 1, - timeout_sec=self.session_timeout_sec+5, + timeout_sec=60, err_msg="Timed out waiting for the consumer to shutdown") consumer.start_node(node) diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py index 4bd680dd2a00e..a96d90bded804 100644 --- a/tests/kafkatest/tests/client/consumer_test.py +++ b/tests/kafkatest/tests/client/consumer_test.py @@ -39,7 +39,7 @@ def rolling_bounce_consumers(self, consumer, keep_alive=0, num_bounces=5, clean_ consumer.stop_node(node, clean_shutdown) wait_until(lambda: len(consumer.dead_nodes()) == 1, - timeout_sec=self.session_timeout_sec+5, + timeout_sec=60, err_msg="Timed out waiting for the consumer to shutdown") consumer.start_node(node) @@ -101,14 +101,6 @@ def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk, use_new_coordina partition = TopicPartition(self.TOPIC, 0) producer = self.setup_producer(self.TOPIC) - # The consumers' session timeouts must exceed the time it takes for a broker to roll. Consumers are likely - # to see cluster metadata consisting of just a single alive broker in the case where the cluster has just 2 - # brokers and the cluster is rolling (which is what is happening here). When the consumer sees a single alive - # broker, and then that broker rolls, the consumer will be unable to connect to the cluster until that broker - # completes its roll. In the meantime, the consumer group will move to the group coordinator on the other - # broker, and that coordinator will fail the consumer and trigger a group rebalance if its session times out. - # This test is asserting that no rebalances occur, so we increase the session timeout for this to be the case. - self.session_timeout_sec = 30 consumer = self.setup_consumer(self.TOPIC, group_protocol=group_protocol) producer.start() @@ -229,7 +221,6 @@ def test_static_consumer_bounce_with_eager_assignment(self, clean_shutdown, stat producer.start() self.await_produced_messages(producer) - self.session_timeout_sec = 60 consumer = self.setup_consumer(self.TOPIC, static_membership=static_membership, group_protocol=group_protocol, assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor") @@ -295,7 +286,6 @@ def test_static_consumer_persisted_after_rejoin(self, bounce_mode, metadata_quor producer = self.setup_producer(self.TOPIC) producer.start() self.await_produced_messages(producer) - self.session_timeout_sec = 60 consumer = self.setup_consumer(self.TOPIC, static_membership=True, group_protocol=group_protocol) consumer.start() self.await_all_members(consumer) @@ -340,7 +330,6 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me producer.start() self.await_produced_messages(producer) - self.session_timeout_sec = 60 consumer = self.setup_consumer(self.TOPIC, static_membership=True, group_protocol=group_protocol) self.num_consumers = num_conflict_consumers @@ -372,7 +361,7 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me # Stop existing nodes, so conflicting ones should be able to join. consumer.stop_all() wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes), - timeout_sec=self.session_timeout_sec+5, + timeout_sec=60, err_msg="Timed out waiting for the consumer to shutdown") conflict_consumer.start() self.await_members(conflict_consumer, num_conflict_consumers) @@ -383,13 +372,13 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me conflict_consumer.start() wait_until(lambda: len(consumer.joined_nodes()) + len(conflict_consumer.joined_nodes()) == len(consumer.nodes), - timeout_sec=self.session_timeout_sec*2, + timeout_sec=60, err_msg="Timed out waiting for consumers to join, expected total %d joined, but only see %d joined from " "normal consumer group and %d from conflict consumer group" % \ (len(consumer.nodes), len(consumer.joined_nodes()), len(conflict_consumer.joined_nodes())) ) wait_until(lambda: len(consumer.dead_nodes()) + len(conflict_consumer.dead_nodes()) == len(conflict_consumer.nodes), - timeout_sec=self.session_timeout_sec*2, + timeout_sec=60, err_msg="Timed out waiting for fenced consumers to die, expected total %d dead, but only see %d dead in " "normal consumer group and %d dead in conflict consumer group" % \ (len(conflict_consumer.nodes), len(consumer.dead_nodes()), len(conflict_consumer.dead_nodes())) @@ -427,7 +416,7 @@ def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quor # stop the partition owner and await its shutdown consumer.kill_node(partition_owner, clean_shutdown=clean_shutdown) wait_until(lambda: len(consumer.joined_nodes()) == (self.num_consumers - 1) and consumer.owner(partition) is not None, - timeout_sec=self.session_timeout_sec*2+5, + timeout_sec=60, err_msg="Timed out waiting for consumer to close") # ensure that the remaining consumer does some work after rebalancing diff --git a/tests/kafkatest/tests/client/pluggable_test.py b/tests/kafkatest/tests/client/pluggable_test.py index b2f726e016303..8f74ec1c8a52d 100644 --- a/tests/kafkatest/tests/client/pluggable_test.py +++ b/tests/kafkatest/tests/client/pluggable_test.py @@ -52,5 +52,5 @@ def test_start_stop(self, metadata_quorum=quorum.zk): self.logger.debug("Waiting for %d nodes to stop" % len(consumer.nodes)) wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes), - timeout_sec=self.session_timeout_sec+5, + timeout_sec=60, err_msg="Timed out waiting for consumers to shutdown") diff --git a/tests/kafkatest/tests/verifiable_consumer_test.py b/tests/kafkatest/tests/verifiable_consumer_test.py index e79c316c3a297..5353a6e82af54 100644 --- a/tests/kafkatest/tests/verifiable_consumer_test.py +++ b/tests/kafkatest/tests/verifiable_consumer_test.py @@ -80,7 +80,7 @@ def await_consumed_messages(self, consumer, min_messages=1): def await_members(self, consumer, num_consumers): # Wait until all members have joined the group wait_until(lambda: len(consumer.joined_nodes()) == num_consumers, - timeout_sec=self.PRODUCER_REQUEST_TIMEOUT_SEC*2, + timeout_sec=60, err_msg="Consumers failed to join in a reasonable amount of time") def await_all_members(self, consumer):