KAFKA-8755: Fix state restore for standby tasks with optimized topology#7238
KAFKA-8755: Fix state restore for standby tasks with optimized topology#7238guozhangwang merged 6 commits intoapache:trunkfrom
Conversation
|
If this all looks good from a design / implementation perspective I'd like to get a few additional unit tests in on the changes too. I'll optimistically start working on those, but I'm happy to dump/rework them if we need to change the implementation. |
|
I've added additional test cases and comments. I'm not anticipating any more changes to this code unless I get review feedback requesting changes. |
|
The RebalanceSourceConnectorsIntegrationTest seems to be flaky - I can sometimes get it to fail with or without this patch locally. The kafka tests are below this change, not sure what happened there. The failure of the standby task test in one instance is concerning though. I want to spend some time digging through the logs to see if it looks like it is related to the kafka issues or if it is inherent to the test, or worse the code. For now, optimistically running another pass to see if the kafka issues get sorted out, while I investigate: retest this please |
|
The failure for JDK 8 / Scala 2.11 looks legit. For some reason we're still not getting the standby task to checkpoint. Possibly another bug lurking that I have not seen locally. Will do more investigation. |
|
It turns out there is no standby. Both partitions are getting assigned to the same instance. When the other task joins between PARTITIONS_ASSIGNED and RUNNING the partitions are not reassigned. Something is up in this phase and consequently the standby code is not even being used. |
cadonna
left a comment
There was a problem hiding this comment.
Thanks for the PR @cpettitt-confluent.
There was a problem hiding this comment.
According to the comment on line 158, the consumer API does not guarantee that endOffset is not null. The javadoc of KafkaConsumer#endOffsets() say that there will be a non-null endOffset for each given partition (at least in my interpretation). @guozhangwang, WDYT?
Please remove comment once this issue is solved.
There was a problem hiding this comment.
EndOffsets here should not be null, since if consumer.position timed out it would return early and not arrive here.
Please see my other comment about the awkwardness of remembering offset limit during restoration phase, which I think is avoidable.
There was a problem hiding this comment.
The following two tests have rather much code in common. Please try to extract common parts to methods.
There was a problem hiding this comment.
Is the question:
- Why AtomicInteger instead of a larger type? I'm not too worried about rollover here, but happy to make it long if you prefer.
- Why AtomicInteger instead of AtomicBoolean? I want to make sure we don't make a second consumer.committed() call.
- Why AtomticInteger instead a primitive type? I want to be able to set the value from a closure. I haven't done a lot of Java coding in the last year or two, so if you know a way to do it let me know and I will be happy to adopt it.
|
Thanks for the review @cadonna! Since my other PR went in it looks like I will need to rebase this one. I will also adopt changes above where mentioned - I think at least one comment I was not sure how to interpret. I also found at least one more failure case for standby state restore that I want to get into a follow up patch to this PR. |
guozhangwang
left a comment
There was a problem hiding this comment.
Made a pass over the non-testing code.
There was a problem hiding this comment.
I think we only need to refresh offset limit if !remainingRecords.isEmpty meanings that some records cannot be updated, to replace the first two conditions.
Also since this should only happen when partition is part of sourceChangelogPartitions (otherwise offset limit should always be MAX_VALUE and the remaining records should always be empty), so instead of containing it as a condition, we can just check if it is not the case and throw indicating a bug.
There was a problem hiding this comment.
think we only need to refresh offset limit if !remainingRecords.isEmpty meanings that some records cannot be updated, to replace the first two conditions.
@guozhangwang Can do. This says something a little different than the current code. This says if there were any records that could not be played then refresh the consumer committed offset. The current code says refresh the consumer committed offset only if we could not play any of the records. In other words, we may refresh more frequently. Sound good?
Also since this should only happen when partition is part of sourceChangelogPartitions (otherwise offset limit should always be MAX_VALUE and the remaining records should always be empty), so instead of containing it as a condition, we can just check if it is not the case and throw indicating a bug.
Makes sense, I will treat this case as a bug and throw.
There was a problem hiding this comment.
I ended up not using remainingRecords.isEmpty, but am open to switching to that. Let me explain why I didn't pick this up. IIUC, the behavior you're looking for is to make the consumer.committed query if any records cannot be applied because the offset limit is too low. The new code does this. However, if I use remainingRecords.isEmpty I have to do up to 2 full passes on the records: once to determine that all records are blocked and the second to replay what is no longer blocked. Assuming records are ordered by offset we can achieve this in one pass as in the updated code. Please let me know my assumption is incorrect, I misunderstood your goal, or you'd still prefer the two pass approach.
There was a problem hiding this comment.
To simplify this, should we just check if records.isEmpty() in the very beginning and return immediately?
I also think, this code may still do two passed, as we have no guarantee that get() will not need to traverse the list anyway. To get a single pass, I would rather piggy pack updating the offset limit to the for loop below (we would change if to a proper Iterator that we advance manually: if we hit the else, we try to update the offset limit and retry the current record without advancing the iterator.
There was a problem hiding this comment.
Why do we need this for active stream task? Only standby task need to maintain the offset limits right?
There was a problem hiding this comment.
With the other PR merged in, this seems redundant.
There was a problem hiding this comment.
Right, I can rebase this away now. I needed it to get the test to pass previously.
This is a response to your previous question (can't comment there for some reason):
Why do we need this for active stream task? Only standby task need to maintain the offset limits right?
This limit was the easiest way I could find to prevent the changelog restorerer from restoring beyond the committed offset. IIRC, without this change we end up playing all of the input topic messages to load up the store before even starting to process new records, which gave us that KTable out of order error. I believe we were playing all messages again after failover, but I would have to revert to verify.
If there is a better way to prevent the above, I would be happy to adopt it. It was definitely my intention to get rid of end offsets from stream task altogether.
There was a problem hiding this comment.
I think this is part of the reason why I'd advocate for removing the standby task's restoration phase all together, because:
- only standby task needs to maintain offset limit, for stream task the offset limit should always be MAX_VALUE.
- since we have a restoration phase before the running phase of the standby task, we need to keep this offset limit as part of the changelog-reader as well to make sure that we do not restore beyond the limit (in fact, the checkpoint offset should never be larger than the offset limit, at most be equal to it).
There was a problem hiding this comment.
@guozhangwang this is actually on the active task (StreamTask). This is probably the most minor of the list of issues, but this branch fixes calling restore at all on the active task if the standby was all caught up immediately before it became active.
As we discussed earlier, the standby doesn't do restore at all. Unfortunately, a lot of these hoops seem necessary given the current design. I do have some thoughts on how to simplify it if you have time to discuss at some point.
There was a problem hiding this comment.
BTW, to be clear, I mean the restore at task startup, which is what we discussed yesterday. We seem to have overloaded the term "restore" as we also call it "restore" when we replay consumed events during normal operation...
There was a problem hiding this comment.
EndOffsets here should not be null, since if consumer.position timed out it would return early and not arrive here.
Please see my other comment about the awkwardness of remembering offset limit during restoration phase, which I think is avoidable.
There was a problem hiding this comment.
I think this logic is still buggy that we need to fix: we should check if the partition is part of sourceChangelogPartitions here, and:
- if not, then we do not need to read the limit at all since there should be none.
- if yes, then we should initialize the limit to zero to block any updates since there should always be some value, and if the committed values are not read yet then initializing it to MAX_VALUE has a risk of going beyond the limit still.
There was a problem hiding this comment.
@guozhangwang I pre-initialized the source topics that are changelogs in initializeStateStores. Since the other partitions are not registered we get back MAX_VALUE (default for missing value in stateMgr). I think the only cases we end up using the offset limit right now are during standby -> active restore and to prevent the standby from getting ahead of the active consumer. If that's also your understanding then my inclination would be to move offset limit entirely into StandbyTask and to explicitly handle the standby -> active transition in StreamTask. The benefit is we have less inter-class collaboration, more explicit handling of various limits, and voiding overloading offset limit for yet something else :). WDYT?
There was a problem hiding this comment.
I have a meta comment: I think remembering the offset limit during restoration phase is a bit awkward (not introduced by your PR, but just how it's written today). I'd still suggest we skip the restoration phase of the standby task as a whole (we can do the refactoring later if you like, and for now just let StandbyTask#initializeStateStores to return false so that the task would move to running immediately. Doing this we can actually remove the offset limit from restorer all together since they are only needed for standby tasks during restoration, and if we enforce the restoration to never happen then it is useless.
ping @mjsax to have another look as well.
|
Thanks @guozhangwang! Per our discussion I will drop restore from standby task as it is only complicating things. I may also break this up into a few different commits to make it easier to consume. Lastly, there is still a way to get the standby task wedge with the current state of this commit. I have already identified the issue and I'm working through the fix. For now it is probably best to hold off on further reviews as things are about to shift a fair amount. |
bbdc598 to
82b378a
Compare
|
I've pushed a different version of this patch to hopefully reduce confusion. I'm attempting to push offset limits as low as I can to make their use in both tasks clearer. Things that could make this cleaner:
I have a few block comments in this commit. I received feedback last time that "we don't often do comments." Seeing that this is complicated enough that it has required a lot of back and forth, I am ready to debate the worthiness of comments in the code :). |
|
JDK 8 / Scala 2.11 timed out, which is concerning: JDK 11 / Scala 2.12 failed with: Which seems unrelated to my change. I'll check the flaky tests list and try to repro locally. On the positive side, my new tests seem more robust - they passed on JDK 8 / Scala 2.11. The new design may be useful elsewhere. |
|
Confirmed SaslOAuthBearerSslEndToEndAuthorizationTest is flaky (ticket was marked dup of KAFKA-8800). Retest this please. |
Prior to this change, there were a few bugs: 1. Records were never applied to the standby store if the topology was optimized. 2. Even if they had been, during failover the standby store would checkpoint with offset 0, effectively requiring a full rebuild of the state store. 3. After failover, the now active task would replay all records from the source topic, which could leave it ahead of the consumer committed offset. All of the above are addressed in this patch. In an attempt to keep things simpler for review, this patch does not address another issue uncovered during investigation: when the standby transitions to active the restore operation is started even when the store is fully caught up. For example, if the last consumer committed offset was 100 and the store was at 100, we still see a restore operation with start and end at 100.
80d5c2e to
3d92400
Compare
|
Only seems to be trying to run tests on one of the jenkins boxes? Retest this please. |
There was a problem hiding this comment.
To simplify this, should we just check if records.isEmpty() in the very beginning and return immediately?
I also think, this code may still do two passed, as we have no guarantee that get() will not need to traverse the list anyway. To get a single pass, I would rather piggy pack updating the offset limit to the for loop below (we would change if to a proper Iterator that we advance manually: if we hit the else, we try to update the offset limit and retry the current record without advancing the iterator.
| } else { | ||
| log.info("End offset cannot be found form the returned metadata; removing this partition from the current run loop"); | ||
| final StateRestorer restorer = stateRestorers.get(topicPartition); | ||
| if (restorer.checkpoint() >= endOffset) { |
There was a problem hiding this comment.
Thinking about this, the checkpoint should never be larger than the endOffset? Ie, is restorer.checkpoint() > endOffset actually an error condition and we should wipe out the state and recreate from scratch?
There was a problem hiding this comment.
Totally agree from a correctness perspective. Do you think we have enough test coverage to ensure this change is safe?
There was a problem hiding this comment.
Looking at this one a bit more, I think it might be a good idea to defer this change. It's pretty orthogonal to the changes I'm doing (this code was just moved in this patch). I can create a ticket if you like?
There was a problem hiding this comment.
Maybe. Would be good to get input from @guozhangwang about it -- maybe >= is actually correct and I am just missing something...
| restorer.setRestoredOffset(restorer.checkpoint()); | ||
| iter.remove(); | ||
| completedRestorers.add(topicPartition); | ||
| } else if (restorer.offsetLimit() == 0 || endOffset == 0) { |
There was a problem hiding this comment.
This conditions seems to be redundant as we compute Math.min() above anyway. I think we can simplify to endOffset == 0.
Actually, I am wondering if we can unify the first and second branch:
final int currentOffset = Math.max(restorer.checkpoint(), 0);
if (currentOffset == endOffset) {
restorer.setRestoredOffset(currentOffset);
iter.remove();
completedRestorers.add(topicPartition);
} else {
restorer.setEndingOffset(endOffset);
}
There was a problem hiding this comment.
Sounds great!
|
Thanks @mjsax!!! Lots of great feedback. I'll get the patch back up with fixes today. |
guozhangwang
left a comment
There was a problem hiding this comment.
Made another pass on the PR, left some comments (one of them seems affecting correctness).
@cpettitt-confluent could you also trigger a system test to double check?
| // stream task to only play records up to the last consumer committed offset. Here we find | ||
| // partitions of topics that are both sources and changelogs and set the consumer committed | ||
| // offset via stateMgr as there is not a more direct route. | ||
| final Set<String> changelogTopicNames = new HashSet<>(topology.storeToChangelogTopic().values()); |
There was a problem hiding this comment.
This code should be functionally equivalent to the previous code. Previously we initialized the offset limits in registerStateStores, which is called by initializeStateStores here. The biggest difference is that we no longer do this immediately for standby tasks - we defer to the first time we need a new offset limit to apply a record.
There was a problem hiding this comment.
The only exception that is non-fatal should be TimeoutException?
There was a problem hiding this comment.
Yeah this is not a comment for this PR but more or less an FYI for folks who are working on KIP-447 :) I also left some thoughts on the voting thread regarding this purpose.
@guozhangwang, which comment do you think affects correctness? My guess is your question about whether we default to Long.MAX_VALUE for offset limit for the first time. Hopefully my response there convinces you it works correctly. If something else, please let me know. W.r.t. system test, is that something beyond what we're doing on Jenkins? If so, is it basically following this doc: https://github.com/apache/kafka/blob/trunk/tests/README.md. If so, is there a subset you want me to run, or the whole suite? |
|
|
||
| long committedOffsetForPartition(final TopicPartition partition) { | ||
| try { | ||
| final OffsetAndMetadata metadata = consumer.committed(partition); |
There was a problem hiding this comment.
@guozhangwang With regard to you comment about exceptions for EOS -- it seem only relevant to catch and swallow a TimeoutException here? Or should the caller be responsible to decide if a timeout should be swallowed? Or do you have anything else in mind?
There was a problem hiding this comment.
I think TimeoutException is the one, yes, but more importantly we should see if our timeout values are set reasonably in Streams :)
Yes that's the one and yours makes sense!
You can trigger a system test out of any branch at https://jenkins.confluent.io/job/system-test-kafka-branch-builder/ where you can set the repo and branch name to yours, and then we can just run all the system test by having |
mjsax
left a comment
There was a problem hiding this comment.
I was wondering if we could add an integration test that verifies that a StandBy does not "over take" the active task, ie, is blocked by the committed offsets -- not sure how such a test could be written though atm.
| .toStream() | ||
| .peek((k, v) -> semaphore.release()); | ||
|
|
||
| final KafkaStreams client1 = createClient(builder, streamsConfiguration()); |
There was a problem hiding this comment.
nit: client1 -> kafkaStreams1
nit createClient -> createKafkaStreams
(similar for all other variables eg clientStates etc)
It's technically a client, but also consumer and producer are and we also use consumer/producer as variable name there -- makes it easier to read -- otherwise it unclear what client it is.
There was a problem hiding this comment.
Will do. Thanks!
There was a problem hiding this comment.
Any objection to calling it an instance?
There was a problem hiding this comment.
While I think it would be fine, it would be "new" -- if we think instance is better, we might want to migrate all other code lazily to use instance, too. It's always best to have the same naming conventions throughout the whole codebase IMHO.
There was a problem hiding this comment.
+1
I liked the name instance because it matched the documentation and was unambiguous. On the other hand, we're actually creating an object of type KafkaStreams, so that is a pretty strong argument to use the convention you originally suggested.
I'll switch instanceXXX to kafkaStreamsXXX and submit a follow up patch.
|
|
||
| produceValueRange(key, 0, 10); | ||
|
|
||
| assertThat("all messages in the first batch were processed in a timely manner", |
There was a problem hiding this comment.
Should the error message not point out what went wrong, ie, "messages in the first batch were [not] processed in a timely manner" -- same below
There was a problem hiding this comment.
I can change that. It reads a little funny "assert that messages in the first batch were not processed in a timely manner", but this should follow project convention.
There was a problem hiding this comment.
Looking at this again, it is going to confuse the heck out of me if I try to reason about these messages done in the negated form. I'm inclined to pull the messages and perhaps just put a comment above them instead.
There was a problem hiding this comment.
In most cases we don't have any message, so should be fine to remove. I see your point about assert that bla -- however, I think if the assertion hits, the error message reads different (ie, with reversed logic) and hence rephrasing would make it easier to read the error message if it fails (please correct me if I am wrong).
There was a problem hiding this comment.
I figured I would test it for my own knowledge.
Test case:
public class QuickTest {
@Test
public void testAssertThatTrueIsEqualToFalse() {
boolean result = false;
MatcherAssert.assertThat("the result is true", result, is(equalTo(true)));
}
}
Result:
java.lang.AssertionError: the result is true
Expected: is <true>
but: was <false>
Expected :is <true>
Actual :<false>
I think this actually works out. It is saying the assertion did not hold (AssertionError) and then provides the assertion in the positive form. It also spells out the expected and actual values.
That said, I'm not going to go change all of those back now :).
|
|
||
| final int totalNumMessages = batch1NumMessages + batch2NumMessages; | ||
|
|
||
| produceValueRange(key, batch1NumMessages, totalNumMessages); |
There was a problem hiding this comment.
Before we produce new messages, should we verify that the store was migrated correctly and contains store.get(key), is(equalTo(batch1NumMessages - 1)) ?
There was a problem hiding this comment.
Yes, we can do that with a time limited loop that queries for that state.
There was a problem hiding this comment.
@mjsax You may have uncovered yet another bug (BUG 6?) in optimized graph failover! I cannot seem to get any value for the key after failover. So, looks like this may involve yet more significant investigation. Thanks for the idea!
There was a problem hiding this comment.
This test fails with non-optimized graphs too. I hope I'm doing something wrong...
There was a problem hiding this comment.
whew... Apparently 10 seconds was not long enough to wait for the store to become available after failure. If I wait a bit longer it finally becomes available. Seems long though? This is the case for both non-optimized and optimized...
There was a problem hiding this comment.
If case of failure, we detect the failure only after session.timeout.ms (default 10 seconds) hit -- to speed up the test, we could decrease the session timeout via StreamsConfig
There was a problem hiding this comment.
I had the same exact thought. My only reservation was tweaking config too much might start to deviate too much from expected use. In other words, I'm totally for this if have a reasonable value in mind. I was thinking about 1s?
There was a problem hiding this comment.
I'm going to try with 1s. If you have a better value please let me know as soon as you get a chance :)
There was a problem hiding this comment.
We can try that -- but we need to reduce HEARTBEAT_INTERVAL_MS_CONFIG, too, for this case -- its default is 3 seconds and it must be smaller than session timeout.
Checking some existing tests, we have some that set them to 500ms / 1sec. Some other test reduce session timeout to 2 or 5 seconds...
| // task.commitNeeded and task.commit and instead just had task.commitIfNeeded. Currently | ||
| // we only call commit if commitNeeded is true, which means that we need a way to indicate | ||
| // that we are eligible for updating the offset limit outside of commit. | ||
| running.forEach((id, task) -> task.allowUpdateOfOffsetLimit()); |
There was a problem hiding this comment.
Seems we should add AssignedStandbyTasksTest.java to unit test the new commit() behavior.
There was a problem hiding this comment.
We could add a test here, but it is super implementation specific. This is not really the best home for the code, just the most convenient at the moment. All we would test, I think, is that we called allowUpdateOfOffsetLimit? The integration test covers this in a more comprehensive way and doesn't break when we move this code. What do you think?
There was a problem hiding this comment.
All we would test, I think, is that we called allowUpdateOfOffsetLimit?
Yes, the test should be shouldUpdateOffsetsLimtOnCommit().
The integration test covers this in a more comprehensive way and doesn't break when we move this code.
"and doesn't break" -- I guess it should break? The main purpose of the unit test is, that is would be hard to debug the integration test and figure out the root cause why it fails.
There was a problem hiding this comment.
I added back the StandbyTask tests that will cover this with a much more narrow test - specifically to cover when to update the offset limit. If this gets moved into StandbyTask that test should not fail, provided it was done right. If it's broken in any way that test will flag it and it should be easy to look at the test failure and the changed code to determine the cause.
Do you think StandbyTaskTest sufficiently addresses your concern?
|
The PR lgtm now, thanks @cpettitt-confluent ! Leaving to @mjsax to merge when he feels comfortable and system test passed. |
56d71ac to
7a18866
Compare
|
@mjsax I believe the latest patch addresses all of your feedback except for the one open question about testing that was left unresolved. Whenever you have time, this path is ready for review. Thanks! |
|
The system test had one failure: This doesn't seem related to my change, as it is in core. I did find a closed flaky test ticket for this test: https://issues.apache.org/jira/browse/KAFKA-8044. I will see if I can re-run just this test, since the whole suite took 16 hours! |
|
I'm seeing the same failure for other people's system test on Jenkins, suggesting that the ReassignPartitionsTest in Kafka core is in fact flaky again. |
|
retest this please. |
|
@mjsax new patch is up |
|
System test for |
|
Well that blew up spectacularly :). Log download is slow so I'll probably get to reviewing it tomorrow morning. If it is related to the change, my guess would be that it's the lowered session and heartbeat timeouts. |
|
I'm going to give this one more "retest this please" and if it blows up as badly I will revert the shorter timeouts (session / heartbeat). I didn't see anything about this test in the log I looked at. |
|
Test timed out on JDK 11 / Scala 2.12. We're green on JDK 11/2.13 and JDK 8/2.11. I looked at the three failures earlier and they were all time outs. Going to try this one more time... retest this please! |
|
Sigh... It seems timeouts are a thing now. Same timeout on 11/2.12. retest this please |
|
I realized I forgot to explicitly mention that the other two Jenkins tests 11/2.13 and 8/2.11 came back green on the last run. It was just 11/2.12 that timed out. |
|
Per request, the following passed for all 20 runs: |
|
LGTM (the failed test are due to 240min time out again, while the other two takes 230min and 190min respectively). |
Key changes include:
we need and updated offset limit to make progress.
we cannot apply any of the records.
checkpoint is greater or equal to the offset limit (consumer committed
offset). This needs special attention please. Code is in
StoreChangelogReader.
provides a way to prevent playing to many records from the changelog
(also the input topic with optimized topology).
NOTE: this PR depends on KAFKA-8816, which is under review separately. Fortunately the changes involved are few. You can focus just on the KAFKA-8755 commit if you prefer.
@guozhangwang @mjsax @cadonna
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)