Skip to content

KAFKA-5097: guard against unassigned partitions#2876

Closed
enothereska wants to merge 3 commits intoapache:0.10.2from
enothereska:KAFKA-4755-fetcher-only
Closed

KAFKA-5097: guard against unassigned partitions#2876
enothereska wants to merge 3 commits intoapache:0.10.2from
enothereska:KAFKA-4755-fetcher-only

Conversation

@enothereska
Copy link
Copy Markdown
Contributor

No description provided.

@enothereska
Copy link
Copy Markdown
Contributor Author

enothereska commented Apr 20, 2017

@ijuma @hachikuji in trunk this has been fixed, but not in 0.10.2. Results in occasional stream test failure. A separate PR updates the stream tests and triggers the failure all the time #2867 (but can be ignored for now since problem manifests itself even without updated tests, although only occassionally). Note that Fetcher.java has diverged quite a bit in trunk from 0.10.2, so a full cherrypicking is hard. This just fixes the immediate problem.

TopicPartition partition = nextInLineRecords.partition;
fetchedPartition = partition;
fetchedOffsets = subscriptions.position(partition);
if (subscriptions.isAssigned(partition)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to skip all the code in this else block if the partition is no longer assigned.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure. In the trunk code https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L523 we seem to drain the records (but the logic is slightly different, in that we drain them, but don't process them). Any suggestions welcome since I'm not super familiar with this code.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a closer look and it seems like this is a regression from a267316#diff-b45245913eaae46aa847d2615d62cde0R490 so we may need another RC.

Maybe we need to move the position call to inside !records.isEmpty since drainRecords does the check. But not sure, better to wait for @hachikuji to take a look.

cc @becketqin @lindong28 @hachikuji

Copy link
Copy Markdown
Member

@ijuma ijuma Apr 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can go with this solution since it seems pretty safe. Shall we just remove the log.debug since we'll get that from the drainRecords call anyway? And let's run the system tests on this branch after that.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@enothereska Looking at the previous patch, it seems like the scope of the try/catch is unnecessarily large. I think it only needs to cover the call to parseCompletedFetch. If we do that, then there should be no need to access subscriptions.position down this path.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mentioned offline, but a better solution (if Becket is right that the try/catch needs to cover this branch) would be the following:

fetchedOffsets = nextInLineRecords.fetchOffset;

Also, seems this variable shouldn't be plural.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@enothereska The trunk code does not need to access subscription.position, instead it uses PartitionRecords.nextInlineOffsets which should be the same as position because the position is updated to this value every time after a successful fetchRecords().

The big try/catch is to make sure the the exception from fetchRecords will also be caught and not result in loss of non-empty fetched.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so @becketqin to confirm, you are saying @hachikuji's suggestion on fetchedOffsets = nextInLineRecords.fetchOffset; is the way to go?
I ask because it takes 7 hours to run system tests :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@enothereska Yes, I think that is a better solution. But I think @hachikuji was right that we don't need to cover both if/else branches. We just need to cover the parseCompletedFetch

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, stay tuned, making that change with next commit.

@asfbot
Copy link
Copy Markdown

asfbot commented Apr 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3051/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Apr 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3046/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Apr 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3047/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Apr 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3050/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Apr 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3054/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Apr 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3049/
Test FAILed (JDK 7 and Scala 2.10).

@enothereska
Copy link
Copy Markdown
Contributor Author

@enothereska
Copy link
Copy Markdown
Contributor Author

enothereska commented Apr 20, 2017

I believe unit test failure is known problem unrelated to PR, checking with @mjsax. Indeed it is fixed in trunk and is not related to PR. 92b7d75

@enothereska
Copy link
Copy Markdown
Contributor Author

Retest this please

Copy link
Copy Markdown
Member

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks. Will wait for @hachikuji's review.

@asfbot
Copy link
Copy Markdown

asfbot commented Apr 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3054/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Apr 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3055/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Apr 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3059/
Test PASSed (JDK 8 and Scala 2.11).

@enothereska enothereska changed the title HOTFIX: guard against unassigned partitions KAFKA-5097: guard against unassigned partitions Apr 20, 2017
@becketqin
Copy link
Copy Markdown
Contributor

Sorry for the regression. It was my bad. Will submit a PR next when backporting a patch does not merge cleanly.

@hachikuji The reason to have the big try catch is that some exceptions such as InvalidRecordException can be thrown from drainRecords() as well. So the previous patch was trying to cover that as well.

The fix LGTM.

@hachikuji
Copy link
Copy Markdown
Contributor

@becketqin Are you sure about that? I must be missing it, can you point me to the line?

@becketqin
Copy link
Copy Markdown
Contributor

@hachikuji You are right. This part of the code is different from trunk. We already parsed all the records in parseCompletedFetch. It was different from the trunk code.

@enothereska
Copy link
Copy Markdown
Contributor Author

enothereska commented Apr 20, 2017

@hachikuji @becketqin @ijuma done. Restarting system tests. Btw, with previous change, all tests passed.

@asfbot
Copy link
Copy Markdown

asfbot commented Apr 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3064/
Test PASSed (JDK 7 and Scala 2.10).

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Good catch and thanks for the fix!

@enothereska
Copy link
Copy Markdown
Contributor Author

@enothereska
Copy link
Copy Markdown
Contributor Author

asfgit pushed a commit that referenced this pull request Apr 20, 2017
…entially unassigned partitions

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jiangjie Qin <becket.qin@gmail.com>, Jason Gustafson <jason@confluent.io>

Closes #2876 from enothereska/KAFKA-4755-fetcher-only
@hachikuji
Copy link
Copy Markdown
Contributor

@enothereska I merged this to 0.10.2. Would you mind closing the PR since it looks like the merge doesn't take care of this automatically.

@enothereska enothereska deleted the KAFKA-4755-fetcher-only branch April 20, 2017 21:39
@asfbot
Copy link
Copy Markdown

asfbot commented Apr 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3069/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Apr 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3065/
Test FAILed (JDK 8 and Scala 2.12).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants