KAFKA-7443: OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic#5946
Conversation
…m changelog topic
…hangelog topic when start offset of local checkpoint is smaller than that of changelog topic
|
Thanks for the PR @linyli001. Can we add a unit (or integration) test for this case? |
|
There is a checkstyle error: |
|
The test results are gone already, for some reason... |
|
Retest this, please. |
|
@linyli001 Can we add a test for this case? |
|
@mjsax Ok, I'll add an integration test. But the scenario only triggers under certain conditions: the changelog topic deleted the expired segments and updated the new starting offset, which is greater than the offset recorded in local checkpoint file. I'm confused how to simulate this scenario in my test case, so could you give me some suggestions? |
|
@linyli001 I see your point, just off the top of my head without looking at the code, could you simulate the condition better using mocks? In that case, you'd only need a unit test. |
|
I agree that it's a little tricky... A mocked unit test should be possible for sure as suggested by Bill. For an integration test, you could create a topic with very small segment size, write data into it, and user purgeDataAPI to delete old records. Then, a checkpoint file with old offsets is written and afterwards Streams is started and we check if it recovers correctly. Does this help? Thoughts? |
|
I think we'd need to see the unit test to decide if it's complete enough, but it feels like enough to me... The fundamental question is whether you trust the client to throw that exception when the offset is ... out of range. If that's the specified behavior of the client, then we don't need an integration test, we just need to make sure that Streams properly clears the offset when it gets the exception. In other words, we should be able to test it satisfactorily with a mock. |
|
Ok. I will add the test case according to @mjsax. Thanks! |
|
The unit test has some problems, and I'll check and push a new commit latter. |
mjsax
left a comment
There was a problem hiding this comment.
Thanks for adding the test. Some nits. Overall LGTM.
vvcephei
left a comment
There was a problem hiding this comment.
This looks good to me, @linyli001 .
Thanks!
|
@mjsax , is this ready to merge? |
…hangelog topic when start offset of local checkpoint is smaller than that of changelog topic (#5946) Reviewer: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>
…hangelog topic when start offset of local checkpoint is smaller than that of changelog topic (#5946) Reviewer: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>
…hangelog topic when start offset of local checkpoint is smaller than that of changelog topic (#5946) Reviewer: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>
|
Thanks for the PR @linyli001! Merged to |
|
@mjsax I have some problems when sync with the latest change. I can see my fix has been merged into the trunk branch, but when I folk the apache/kafka to my GitHub, I can't found the latest change. Any steps am I missing? |
|
Hi @linyli001 , IIRC, you have to check out your trunk branch, then pull from apache/kafka's trunk, then push it to your fork's trunk. Is that what you're already trying to do? |
|
@vvcephei Sorry I don't quite understand how to do it. I have cloned the git from my fork to local, but run "git pull origin trunk" not includes the latest change. I think it just pull from my fork's trunk, but I don't know how to pull from apache/kafka's trunk locally? Can you show me the git command? Thanks! |
|
If you fork the repo, your copy on github won't be updated automatically. You need to pull the latest changes from kafka repo into your local one (and than you can push those back to you copy on github). Assume you have an alias called |
|
@mjsax Thanks for the guide! I can update my folk now. |
…hangelog topic when start offset of local checkpoint is smaller than that of changelog topic (apache#5946) Reviewer: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>
See https://issues.apache.org/jira/browse/KAFKA-7443 for details.
The fix set this state partition to "NO_CHECKPOINT" when the offset in local checkpoint file has expired and older than the current start offset of changelog topic, thus making this task to restore local state from the current beginning offset of changelog topic, avoiding falling into the infinite loop caused by this exception.