Skip to content

KAFKA-13126: guard against overflow when computing joinGroupTimeoutMs#11111

Merged
ableegoldman merged 4 commits intoapache:trunkfrom
ableegoldman:HOTFIX-guard-against-joinGroupTimeoutMs-overflow
Jul 23, 2021
Merged

KAFKA-13126: guard against overflow when computing joinGroupTimeoutMs#11111
ableegoldman merged 4 commits intoapache:trunkfrom
ableegoldman:HOTFIX-guard-against-joinGroupTimeoutMs-overflow

Conversation

@ableegoldman
Copy link
Copy Markdown
Member

@ableegoldman ableegoldman commented Jul 22, 2021

In older versions of Kafka Streams, the max.poll.interval.ms config was overridden by default to Integer.MAX_VALUE. Even after we removed this override, users of both the plain consumer client and kafka streams still set the poll interval to MAX_VALUE somewhat often. Unfortunately, this causes an overflow when computing the joinGroupTimeoutMs and results in it being set to the request.timeout.ms instead, which is much lower.

This can easily make consumers drop out of the group, since they must rejoin now within 30s (by default) yet have no obligation to almost ever call poll() given the high max.poll.interval.ms. We just need to check for overflow and fix it to Integer.MAX_VALUE when it occurs.

Also fixes a few other misc. possible overflows on the side (from a ticket I came across while searching for existing tickets on the joinGroupTimeout bug: KAFKA-6948)

@ableegoldman ableegoldman requested a review from mjsax July 22, 2021 22:17
@ableegoldman ableegoldman changed the title HOTFIX: guard against overflow when computing joinGroupTimeoutMs KAFKA-13126: guard against overflow when computing joinGroupTimeoutMs Jul 22, 2021
Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @ableegoldman ! These all look good except the one I marked.

throw new IOException("Connection to " + node + " failed.");
}
long pollTimeout = expiryTime - attemptStartTime;
long pollTimeout = (startTime - attemptStartTime) + timeoutMs;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would still overflow if timeoutMs is MAX_VALUE, right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The startTime is set once at the beginning of the method while the attemptStartTime is initialized just before the first attempt and then updated again after every iteration. So the attemptStartTime is always greater than the startTime and therefore the quantity being added to the timeoutMs here is actually negative.

But I see how that's confusing, I'll refactor the expression to make this more clear

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks!

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@showuon
Copy link
Copy Markdown
Member

showuon commented Jul 23, 2021

Failed tests are unrelated.

    Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsExpirationTest.testBumpTransactionalEpochAfterInvalidProducerIdMapping()
    Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
    Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
    Build / JDK 16 and Scala 2.13 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
    Build / JDK 16 and Scala 2.13 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance()

@ableegoldman ableegoldman merged commit 8b1eca1 into apache:trunk Jul 23, 2021
@ableegoldman
Copy link
Copy Markdown
Member Author

Merged to trunk

xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…s` (apache#11111)

Setting the max.poll.interval.ms to MAX_VALUE causes overflow when computing the joinGroupTimeoutMs and results in the JoinGroup timeout being set to the request.timeout.ms instead, which is much lower.

This can easily make consumers drop out of the group, since they must rejoin now within 30s (by default) yet have no obligation to almost ever call poll() given the high max.poll.interval.ms, especially when each record takes a long time to process or the `max.poll.records` is also very large. We just need to check for overflow and fix it to Integer.MAX_VALUE when it occurs.

Reviewers: Luke Chen <showuon@gmail.com>, John Roesler <vvcephei@apache.org>
MaximGonnissen added a commit to MaximGonnissen/kafka that referenced this pull request May 29, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants