MINOR: Fix join group request timeout lower bound#8702
MINOR: Fix join group request timeout lower bound#8702hachikuji merged 3 commits intoapache:trunkfrom
Conversation
| // maximum time that it may block on the coordinator. We add an extra 5 seconds for small delays. | ||
|
|
||
| int joinGroupTimeoutMs = Math.max(rebalanceConfig.rebalanceTimeoutMs, rebalanceConfig.rebalanceTimeoutMs + 5000); | ||
| int joinGroupTimeoutMs = Math.max(client.defaultRequestTimeoutMs(), |
There was a problem hiding this comment.
The previous max check was wrong, but an alternative here is to use rebalanceTimeout + 5s in all cases regardless of the request timeout.
There was a problem hiding this comment.
Could we log a debug info here for the timeout we used?
There was a problem hiding this comment.
I added the request timeout to the send message in NetworkClient. Also made some tweaks for more consistent logging.
|
retest this please |
|
retest this please |
| // maximum time that it may block on the coordinator. We add an extra 5 seconds for small delays. | ||
|
|
||
| int joinGroupTimeoutMs = Math.max(rebalanceConfig.rebalanceTimeoutMs, rebalanceConfig.rebalanceTimeoutMs + 5000); | ||
| int joinGroupTimeoutMs = Math.max(client.defaultRequestTimeoutMs(), |
There was a problem hiding this comment.
Could we log a debug info here for the timeout we used?
| mockTime.sleep(rebalanceTimeoutMs + AbstractCoordinator.JOIN_GROUP_TIMEOUT_LAPSE + 1); | ||
| assertFalse(consumerClient.poll(future, mockTime.timer(0))); | ||
|
|
||
| mockTime.sleep(expectedRequestDeadline - mockTime.milliseconds() + 1); |
There was a problem hiding this comment.
We could just test mockTime.sleep(REQUEST_TIMEOUT_MS + 1) for this case and get rid of expectedRequestDeadline
There was a problem hiding this comment.
We need to take into account the time that has already passed. I was a little annoyed at having to write REQUEST_TIMEOUT - rebalanceTimeoutMs - AbstractCoordinator.JOIN_GROUP_TIMEOUT_LAPSE. A bit annoying either way I guess.
| if (log.isDebugEnabled()) { | ||
| int latestClientVersion = clientRequest.apiKey().latestVersion(); | ||
| if (header.apiVersion() == latestClientVersion) { | ||
| log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request, |
There was a problem hiding this comment.
Given that we have to work with all client versions, it's just as common for the client not to match the broker version, so it's not really useful for the behavior to be different when they do match.
| InFlightRequest req = inFlightRequests.completeNext(source); | ||
| Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header, | ||
| throttleTimeSensor, now); | ||
| if (log.isTraceEnabled()) { |
There was a problem hiding this comment.
Letting some requests be debug level, but making the responses be trace often means we are left with only half of the picture.
* 'trunk' of github.com:apache/kafka: KAFKA-9888: Copy connector configs before passing to REST extensions (apache#8511) KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations (apache#8654) KAFKA-6145: Add unit tests for assignments of only stateless tasks (apache#8713) MINOR: Fix join group request timeout lower bound (apache#8702) MINOR: Improve security documentation for Kafka Streams apache#8710 KAFKA-6145: KIP-441: Enforce Standby Task Stickiness (apache#8696) KAFKA-10003: Mark KStream.through() as deprecated and update Scala API (apache#8679)
If the request timeout is larger than the rebalance timeout, we should use the former as the JoinGroup request timeout.
Committer Checklist (excluded from commit message)