KAFKA-18156: VerifiableConsumer should ignore "--session-timeout" when using CONSUMER protocol#18036
KAFKA-18156: VerifiableConsumer should ignore "--session-timeout" when using CONSUMER protocol#18036chia7712 merged 13 commits intoapache:trunkfrom
Conversation
…n using CONSUMER protocol
| consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout"))); | ||
|
|
||
| // session.timeout.ms cannot be set when group.protocol=CONSUMER | ||
| if (!groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name)) { |
There was a problem hiding this comment.
If we set --session-timeout for consumer protocol, can we add warning for this config?
There was a problem hiding this comment.
Thanks for the suggestion! However, I’m uncertain whether this change is necessary. When group.protocol=CONSUMER is used, setting a session timeout in the consumer prevents it from starting successfully. This makes me feel the change might be redundant. That said, I’d appreciate hearing others’ thoughts on this. Thanks!
There was a problem hiding this comment.
maybe we should call consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, xx) only if users do set the --session-timeout - that means we should remove the default value of --session-timeout
There was a problem hiding this comment.
Sounds good, btw we already set --session-timeout 30sec by default in
There was a problem hiding this comment.
The default value of session.timeout.ms is 45 seconds, so I'm unsure if it's necessary to set it to 30 seconds in the end-to-end tests. Could you please remove this setting from the e2e configuration? Additionally, e2e should not use --session-timeout when running AsyncConsumer.
There was a problem hiding this comment.
Thanks, I already addressed the comment in the latest commit.
| consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, res.getString("resetPolicy")); | ||
| consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout"))); | ||
|
|
||
| // session.timeout.ms cannot be set when group.protocol=CONSUMER |
There was a problem hiding this comment.
Could you please move this check to line#645?
chia7712
left a comment
There was a problem hiding this comment.
@brandboat I have left some comments to align with the following ideas:
- The default value should not cause an error.
- Honor users' configurations—avoid adding extra if-else statements that swallow errors.
| // This means we're using the old consumer group protocol. | ||
| // This means we're using the CLASSIC consumer group protocol. | ||
| consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, res.getString("assignmentStrategy")); | ||
| Integer sessionTimeout = res.getInt("sessionTimeout"); |
There was a problem hiding this comment.
If this tool does not offer the default value, it is fine to throw exception if user does set the --session-timeout
There was a problem hiding this comment.
it is fine to throw exception if user does set the --session-timeout
Pardon me, did you mean throw exception if user set session timeout when using CONSUMER group.protocol ? Not sure if I understand correctly 🙏
There was a problem hiding this comment.
yes, we should honor users' configs to let him encounter the error.
| (self.reset_policy, self.group_id, self.topic) | ||
|
|
||
| # session timeout is not supported when using CONSUMER group protocol | ||
| if self.session_timeout_sec > 0 and self.is_consumer_group_protocol_enabled(): |
There was a problem hiding this comment.
this will swallow the incorrect configs in e2e. We can just set the session timeout if e2e test case does set it.
There was a problem hiding this comment.
Only VerifiableConsumerTest specifically set session timeout, but I think it's a bit redundant, so I refactor them out.
| self.group_protocol = group_protocol | ||
| self.group_remote_assignor = group_remote_assignor |
There was a problem hiding this comment.
these 2 lines are duplicate code, see L245, L246
| def setup_consumer(self, topic, static_membership=False, enable_autocommit=False, | ||
| assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor", group_remote_assignor="range", **kwargs): | ||
| return VerifiableConsumer(self.test_context, self.num_consumers, self.kafka, | ||
| topic, self.group_id, static_membership=static_membership, session_timeout_sec=self.session_timeout_sec, |
There was a problem hiding this comment.
session_timeout_sec is used by other end-to-end tests, so we need to perform some refactoring for those test cases.
Replace Dynamic Timeouts with Constants
For example, change timeout_sec=self.session_timeout_sec + 5 to timeout_sec=60.
Remove Use Cases of Increasing session_timeout_sec
These cases typically increase session_timeout_sec from 45 seconds to 60 seconds. This adjustment may be unnecessary if the tests can run stably without modifying the timeout.
There was a problem hiding this comment.
I tested e2e client again. TC_PATHS="tests/kafkatest/tests/client" bash tests/docker/run_tests.sh and only one test failed.
test_id: kafkatest.tests.client.consumer_test.OffsetValidationTest.test_broker_rolling_bounce.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
status: FAIL
run time: 3 minutes 23.970 seconds
AssertionError('Broker rolling bounce caused 7 unexpected group rebalances')
Traceback (most recent call last):
File "/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 351, in _do_run
data = self.run_test()
File "/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 411, in run_test
return self.test_context.function(self.test)
File "/usr/local/lib/python3.7/dist-packages/ducktape/mark/_mark.py", line 438, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 121, in test_broker_rolling_bounce
"Broker rolling bounce caused %d unexpected group rebalances" % unexpected_rebalances
AssertionError: Broker rolling bounce caused 7 unexpected group rebalances
And looks like it was tracked by https://issues.apache.org/jira/browse/KAFKA-18194
|
the hanging test is traced by https://issues.apache.org/jira/browse/KAFKA-17554 |
… using CONSUMER protocol (apache#18036) Reviewers: TaiJuWu <tjwu1217@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
… using CONSUMER protocol (apache#18036) Reviewers: TaiJuWu <tjwu1217@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
related to KAFKA-18156
The "session.timeout.ms" is unsupported by CONSUMER protocol, so VerifiableConsumer should skip the config when using CONSUMER protocol. Additionally, we should update the docs of "--session-timeout" to highlight this change
Committer Checklist (excluded from commit message)