Skip to content

KAFKA-10113; Specify fetch offsets correctly in LogTruncationException#8822

Merged
hachikuji merged 3 commits intoapache:trunkfrom
hachikuji:KAFKA-10113
Jun 19, 2020
Merged

KAFKA-10113; Specify fetch offsets correctly in LogTruncationException#8822
hachikuji merged 3 commits intoapache:trunkfrom
hachikuji:KAFKA-10113

Conversation

@hachikuji
Copy link
Copy Markdown
Contributor

This patch fixes a bug in the constructor of LogTruncationException. We were passing the divergent offsets to the super constructor as the fetch offsets. There is no way to fix this without breaking compatibility, but the harm is probably minimal since this exception was not getting raised properly until KAFKA-9840 anyway.

Note that I have also moved the check for unknown offset and epoch into SubscriptionState, which ensures that the partition is still awaiting validation and that the fetch offset hasn't changed. Finally, I made some minor improvements to the logging and exception messages to ensure that we always have the fetch offset and epoch as well as the divergent offset and epoch included.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown

@abbccdda abbccdda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, left a couple of comments

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we move this check out of setFatalOffsetForLeaderException?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seemed inconsistent to have a method named setFatal which checks for retriable exceptions.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should require non-null for fetchPosition

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why fetchPosition specifically and not the other fields?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a second thought, I don't feel strong about it.

Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we could still share handleOffsetOutOfRange in two places by letting it return a struct of Optional<LogTruncation> and decide when to throw it by the caller.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about it, it seemed simpler to always use LogTruncationException for validation failures, even if the divergent offset is not known. Then direct OffsetOutOfRange errors are reserved for fetch responses which indicate the OFFSET_OUT_OF_RANGE error.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking where is the best to put the check, since previously it was before maybeCompleteValidation. If the partition is not awaiting validation or the returned result doesn't match our current position, should we still trigger undefined epoch offset logic here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the fetch position has changed or we are no longer awaiting validation, we want to ignore the result. This was a bug in the previous patch which we didn't catch.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so before this change, we were raising OffsetOutOfRangeException regardless of the state of the subscription which meant that a regular truncation case was being masked as a failed offset validation?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was what @abbccdda and I had agreed in the previous PR. The problem was that we didn't have divergent offsets to include in the exception, so we just raised it as OffsetOutOfRange. After I noticed the problem with LogTruncationException here, I decided to just simplify the logic here and return the truncation exception with the divergent offsets undefined.

Copy link
Copy Markdown

@abbccdda abbccdda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the patch!

Copy link
Copy Markdown
Member

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Fetcher#initializeCompletedFetch, we ignore all errors if the subscription isn't in the FETCHING state. Do we need to add similar logic to perform an offset reset if we're in AWAITING_VALIDATION?

  • In FETCHING state
  • Fetch request goes out
  • Enter AWAIT_VALIDATION due to leader change
  • Handle Fetch response, get OffsetOutOfRange
  • ???

I guess validation would fail in this case anyways, so maybe it's fine if we just ignore the error in the completed fetch code path. WDYT @hachikuji ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so before this change, we were raising OffsetOutOfRangeException regardless of the state of the subscription which meant that a regular truncation case was being masked as a failed offset validation?

@hachikuji
Copy link
Copy Markdown
Contributor Author

@mumrah Hmm, we have the following check in initializeCompletedFetch:

            if (!subscriptions.hasValidPosition(tp)) {
                // this can happen when a rebalance happened while fetch is still in-flight
                log.debug("Ignoring fetched records for partition {} since it no longer has valid position", tp);
            } else if (error == Errors.NONE) {

Are you suggesting we may want to remove this check?

@hachikuji
Copy link
Copy Markdown
Contributor Author

retest this please

@mumrah
Copy link
Copy Markdown
Member

mumrah commented Jun 18, 2020

@hachikuji yea that's the check I was referring to (where we disregard the fetch response, errors included). Do you think any of the errors we handle besides OOOR are worth handling in the case that we're no longer in the FETCHING state? Like maybe one of the errors that triggers a metadata update?

However, that might be adding complexity for little gain. I'm fine with it either way.

@hachikuji
Copy link
Copy Markdown
Contributor Author

@mumrah Hmm, I think I like the current approach of discarding the response if we're no longer in the same state in which the fetch state was sent. Mainly because it's simple. Arguably we could do something more refined. For example, a topic authorization error is still going to be relevant even if the partition is being reset. However, since we're talking about rare cases, it doesn't seem too worthwhile to try and optimize; worst case, we'll send the request again and get the same error.

@mumrah
Copy link
Copy Markdown
Member

mumrah commented Jun 18, 2020

Sounds good to me

Copy link
Copy Markdown
Member

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM :shipit:

@hachikuji hachikuji merged commit f3c00ae into apache:trunk Jun 19, 2020
hachikuji added a commit that referenced this pull request Jun 19, 2020
…on` (#8822)

This patch fixes a bug in the constructor of `LogTruncationException`. We were passing the divergent offsets to the super constructor as the fetch offsets. There is no way to fix this without breaking compatibility, but the harm is probably minimal since this exception was not getting raised properly until KAFKA-9840 anyway.

Note that I have also moved the check for unknown offset and epoch into `SubscriptionState`, which ensures that the partition is still awaiting validation and that the fetch offset hasn't changed. Finally, I made some minor improvements to the logging and exception messages to ensure that we always have the fetch offset and epoch as well as the divergent offset and epoch included.

Reviewers: Boyang Chen <boyang@confluent.io>, David Arthur <mumrah@gmail.com>
Kvicii pushed a commit to Kvicii/kafka that referenced this pull request Jun 21, 2020
* 'trunk' of github.com:apache/kafka:
  KAFKA-10168: fix StreamsConfig parameter name variable (apache#8865)
  MINOR: code cleanup for inconsistent naming (apache#8871)
  KAFKA-10138: Prefer --bootstrap-server for reassign_partitions command in ducktape tests (apache#8898)
  KAFKA-10185: Restoration info logging (apache#8896)
  KAFKA-9891: add integration tests for EOS and StandbyTask (apache#8890)
  MINOR: Reduce build time by gating test coverage plugins behind a flag (apache#8899)
  KAFKA-10141; Add more detail to log segment delete messages (apache#8850)
  KAFKA-10113; Specify fetch offsets correctly in `LogTruncationException` (apache#8822)
  KAFKA-10167: use the admin client to read end-offset (apache#8876)
  MINOR: Upgrade ducktape to 0.7.8 (apache#8879)
  KAFKA-10123; Fix incorrect value for AWAIT_RESET#hasPosition (apache#8841)
  KAFKA-9896: fix flaky StandbyTaskEOSIntegrationTest (apache#8883)
  MINOR: clean up unused checkstyle suppressions for Streams (apache#8861)
  MINOR: reuse toConfigObject(Map) to generate Config (apache#8889)
  MINOR: Upgrade jetty to 9.4.27.v20200227 and jersey to 2.31 (apache#8859)
  MINOR: Fix flaky HighAvailabilityTaskAssignorIntegrationTest (apache#8884)
  KAFKA-10147 MockAdminClient#describeConfigs(Collection<ConfigResource>) is unable to handle broker resource (apache#8853)
  KAFKA-10165: Remove Percentiles from e2e metrics (apache#8882)

# Conflicts:
#	core/src/main/scala/kafka/log/Log.scala
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants