Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ public class CommonClientConfigs {
+ "to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, "
+ "then the broker will remove this client from the group and initiate a rebalance. Note that the value "
+ "must be in the allowable range as configured in the broker configuration by <code>group.min.session.timeout.ms</code> "
+ "and <code>group.max.session.timeout.ms</code>.";
+ "and <code>group.max.session.timeout.ms</code>. Note that this configuration is not supported when <code>group.protocol</code> "
+ "is set to \"consumer\".";

public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
public static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer "
Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/services/verifiable_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
* `--group-id <group-id>`
* `--topic <topic>`
* `--broker-list <brokers>`
* `--session-timeout <n>`
* `--session-timeout <n>` - note that this configuration is not supported when group protocol is consumer
* `--enable-autocommit`
* `--max-messages <n>`
* `--assignment-strategy <s>`
Expand Down
14 changes: 7 additions & 7 deletions tests/kafkatest/services/verifiable_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
}

def __init__(self, context, num_nodes, kafka, topic, group_id,
static_membership=False, max_messages=-1, session_timeout_sec=30, enable_autocommit=False,
static_membership=False, max_messages=-1, session_timeout_sec=0, enable_autocommit=False,
assignment_strategy=None, group_protocol=None, group_remote_assignor=None,
version=DEV_BRANCH, stop_timeout_sec=30, log_level="INFO", jaas_override_variables=None,
on_record_consumed=None, reset_policy="earliest", verify_offsets=True):
Expand All @@ -251,8 +251,6 @@ def __init__(self, context, num_nodes, kafka, topic, group_id,
self.session_timeout_sec = session_timeout_sec
self.enable_autocommit = enable_autocommit
self.assignment_strategy = assignment_strategy
self.group_protocol = group_protocol
self.group_remote_assignor = group_remote_assignor
Comment on lines -254 to -255
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these 2 lines are duplicate code, see L245, L246

self.prop_file = ""
self.stop_timeout_sec = stop_timeout_sec
self.on_record_consumed = on_record_consumed
Expand Down Expand Up @@ -417,10 +415,12 @@ def start_cmd(self, node):
else:
cmd += " --bootstrap-server %s" % self.kafka.bootstrap_servers(self.security_config.security_protocol)

cmd += " --reset-policy %s --group-id %s --topic %s --session-timeout %s" % \
(self.reset_policy, self.group_id, self.topic,
self.session_timeout_sec*1000)

cmd += " --reset-policy %s --group-id %s --topic %s" % \
(self.reset_policy, self.group_id, self.topic)

if self.session_timeout_sec > 0:
cmd += " --session-timeout %s" % self.session_timeout_sec*1000

if self.max_messages > 0:
cmd += " --max-messages %s" % str(self.max_messages)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def rolling_bounce_consumers(self, consumer, clean_shutdown=True):
consumer.stop_node(node, clean_shutdown)

wait_until(lambda: len(consumer.dead_nodes()) == 1,
timeout_sec=self.session_timeout_sec+5,
timeout_sec=60,
err_msg="Timed out waiting for the consumer to shutdown")

consumer.start_node(node)
Expand Down
21 changes: 5 additions & 16 deletions tests/kafkatest/tests/client/consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def rolling_bounce_consumers(self, consumer, keep_alive=0, num_bounces=5, clean_
consumer.stop_node(node, clean_shutdown)

wait_until(lambda: len(consumer.dead_nodes()) == 1,
timeout_sec=self.session_timeout_sec+5,
timeout_sec=60,
err_msg="Timed out waiting for the consumer to shutdown")

consumer.start_node(node)
Expand Down Expand Up @@ -101,14 +101,6 @@ def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk, use_new_coordina
partition = TopicPartition(self.TOPIC, 0)

producer = self.setup_producer(self.TOPIC)
# The consumers' session timeouts must exceed the time it takes for a broker to roll. Consumers are likely
# to see cluster metadata consisting of just a single alive broker in the case where the cluster has just 2
# brokers and the cluster is rolling (which is what is happening here). When the consumer sees a single alive
# broker, and then that broker rolls, the consumer will be unable to connect to the cluster until that broker
# completes its roll. In the meantime, the consumer group will move to the group coordinator on the other
# broker, and that coordinator will fail the consumer and trigger a group rebalance if its session times out.
# This test is asserting that no rebalances occur, so we increase the session timeout for this to be the case.
self.session_timeout_sec = 30
consumer = self.setup_consumer(self.TOPIC, group_protocol=group_protocol)

producer.start()
Expand Down Expand Up @@ -229,7 +221,6 @@ def test_static_consumer_bounce_with_eager_assignment(self, clean_shutdown, stat
producer.start()
self.await_produced_messages(producer)

self.session_timeout_sec = 60
consumer = self.setup_consumer(self.TOPIC, static_membership=static_membership, group_protocol=group_protocol,
assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor")

Expand Down Expand Up @@ -295,7 +286,6 @@ def test_static_consumer_persisted_after_rejoin(self, bounce_mode, metadata_quor
producer = self.setup_producer(self.TOPIC)
producer.start()
self.await_produced_messages(producer)
self.session_timeout_sec = 60
consumer = self.setup_consumer(self.TOPIC, static_membership=True, group_protocol=group_protocol)
consumer.start()
self.await_all_members(consumer)
Expand Down Expand Up @@ -340,7 +330,6 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
producer.start()
self.await_produced_messages(producer)

self.session_timeout_sec = 60
consumer = self.setup_consumer(self.TOPIC, static_membership=True, group_protocol=group_protocol)

self.num_consumers = num_conflict_consumers
Expand Down Expand Up @@ -372,7 +361,7 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
# Stop existing nodes, so conflicting ones should be able to join.
consumer.stop_all()
wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes),
timeout_sec=self.session_timeout_sec+5,
timeout_sec=60,
err_msg="Timed out waiting for the consumer to shutdown")
conflict_consumer.start()
self.await_members(conflict_consumer, num_conflict_consumers)
Expand All @@ -383,13 +372,13 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
conflict_consumer.start()

wait_until(lambda: len(consumer.joined_nodes()) + len(conflict_consumer.joined_nodes()) == len(consumer.nodes),
timeout_sec=self.session_timeout_sec*2,
timeout_sec=60,
err_msg="Timed out waiting for consumers to join, expected total %d joined, but only see %d joined from "
"normal consumer group and %d from conflict consumer group" % \
(len(consumer.nodes), len(consumer.joined_nodes()), len(conflict_consumer.joined_nodes()))
)
wait_until(lambda: len(consumer.dead_nodes()) + len(conflict_consumer.dead_nodes()) == len(conflict_consumer.nodes),
timeout_sec=self.session_timeout_sec*2,
timeout_sec=60,
err_msg="Timed out waiting for fenced consumers to die, expected total %d dead, but only see %d dead in "
"normal consumer group and %d dead in conflict consumer group" % \
(len(conflict_consumer.nodes), len(consumer.dead_nodes()), len(conflict_consumer.dead_nodes()))
Expand Down Expand Up @@ -427,7 +416,7 @@ def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quor
# stop the partition owner and await its shutdown
consumer.kill_node(partition_owner, clean_shutdown=clean_shutdown)
wait_until(lambda: len(consumer.joined_nodes()) == (self.num_consumers - 1) and consumer.owner(partition) is not None,
timeout_sec=self.session_timeout_sec*2+5,
timeout_sec=60,
err_msg="Timed out waiting for consumer to close")

# ensure that the remaining consumer does some work after rebalancing
Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/tests/client/pluggable_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,5 @@ def test_start_stop(self, metadata_quorum=quorum.zk):

self.logger.debug("Waiting for %d nodes to stop" % len(consumer.nodes))
wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes),
timeout_sec=self.session_timeout_sec+5,
timeout_sec=60,
err_msg="Timed out waiting for consumers to shutdown")
11 changes: 5 additions & 6 deletions tests/kafkatest/tests/verifiable_consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@ class VerifiableConsumerTest(KafkaTest):
PRODUCER_REQUEST_TIMEOUT_SEC = 30

def __init__(self, test_context, num_consumers=1, num_producers=0,
group_id="test_group_id", session_timeout_sec=10, **kwargs):
group_id="test_group_id", **kwargs):
super(VerifiableConsumerTest, self).__init__(test_context, **kwargs)
self.num_consumers = num_consumers
self.num_producers = num_producers
self.group_id = group_id
self.session_timeout_sec = session_timeout_sec
self.consumption_timeout_sec = max(self.PRODUCER_REQUEST_TIMEOUT_SEC + 5, 2 * session_timeout_sec)
self.consumption_timeout_sec = self.PRODUCER_REQUEST_TIMEOUT_SEC + 5

def _all_partitions(self, topic, num_partitions):
partitions = set()
Expand All @@ -56,7 +55,7 @@ def min_cluster_size(self):
def setup_consumer(self, topic, static_membership=False, enable_autocommit=False,
assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor", group_remote_assignor="range", **kwargs):
return VerifiableConsumer(self.test_context, self.num_consumers, self.kafka,
topic, self.group_id, static_membership=static_membership, session_timeout_sec=self.session_timeout_sec,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

session_timeout_sec is used by other end-to-end tests, so we need to perform some refactoring for those test cases.

Replace Dynamic Timeouts with Constants
For example, change timeout_sec=self.session_timeout_sec + 5 to timeout_sec=60.

Remove Use Cases of Increasing session_timeout_sec
These cases typically increase session_timeout_sec from 45 seconds to 60 seconds. This adjustment may be unnecessary if the tests can run stably without modifying the timeout.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested e2e client again. TC_PATHS="tests/kafkatest/tests/client" bash tests/docker/run_tests.sh and only one test failed.

test_id:    kafkatest.tests.client.consumer_test.OffsetValidationTest.test_broker_rolling_bounce.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
status:     FAIL
run time:   3 minutes 23.970 seconds


    AssertionError('Broker rolling bounce caused 7 unexpected group rebalances')
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 351, in _do_run
    data = self.run_test()
  File "/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 411, in run_test
    return self.test_context.function(self.test)
  File "/usr/local/lib/python3.7/dist-packages/ducktape/mark/_mark.py", line 438, in wrapper
    return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 121, in test_broker_rolling_bounce
    "Broker rolling bounce caused %d unexpected group rebalances" % unexpected_rebalances
AssertionError: Broker rolling bounce caused 7 unexpected group rebalances

And looks like it was tracked by https://issues.apache.org/jira/browse/KAFKA-18194

topic, self.group_id, static_membership=static_membership,
assignment_strategy=assignment_strategy, enable_autocommit=enable_autocommit,
group_remote_assignor=group_remote_assignor,
log_level="TRACE", **kwargs)
Expand All @@ -81,9 +80,9 @@ def await_consumed_messages(self, consumer, min_messages=1):
def await_members(self, consumer, num_consumers):
# Wait until all members have joined the group
wait_until(lambda: len(consumer.joined_nodes()) == num_consumers,
timeout_sec=self.session_timeout_sec*2,
timeout_sec=60,
err_msg="Consumers failed to join in a reasonable amount of time")

def await_all_members(self, consumer):
self.await_members(consumer, self.num_consumers)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,11 +574,10 @@ private static ArgumentParser argParser() {
parser.addArgument("--session-timeout")
.action(store())
.required(false)
.setDefault(30000)
.type(Integer.class)
.metavar("TIMEOUT_MS")
.dest("sessionTimeout")
.help("Set the consumer's session timeout");
.help("Set the consumer's session timeout, note that this configuration is not supported when group protocol is consumer");

parser.addArgument("--verbose")
.action(storeTrue())
Expand Down Expand Up @@ -649,10 +648,15 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[]
if (groupRemoteAssignor != null)
consumerProps.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, groupRemoteAssignor);
} else {
// This means we're using the old consumer group protocol.
// This means we're using the CLASSIC consumer group protocol.
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, res.getString("assignmentStrategy"));
}

Integer sessionTimeout = res.getInt("sessionTimeout");
if (sessionTimeout != null) {
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(sessionTimeout));
}

consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, res.getString("groupId"));

String groupInstanceId = res.getString("groupInstanceId");
Expand All @@ -664,7 +668,6 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[]

consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, useAutoCommit);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, res.getString("resetPolicy"));
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout")));

StringDeserializer deserializer = new StringDeserializer();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer);
Expand Down