KAFKA-9742: Fix broken StandbyTaskEOSIntegrationTest#8330
KAFKA-9742: Fix broken StandbyTaskEOSIntegrationTest#8330vvcephei merged 2 commits intoapache:trunkfrom vvcephei:kafka-9742-fix-test
Conversation
| try ( | ||
| final KafkaStreams streamInstanceOne = buildStreamWithDirtyStateDir(appId, stateDirPath + "/" + appId + "-1/", instanceLatch); | ||
| final KafkaStreams streamInstanceTwo = buildStreamWithDirtyStateDir(appId, stateDirPath + "/" + appId + "-2/", instanceLatch); | ||
| ) { |
There was a problem hiding this comment.
I moved this into a try-with-resources so that the instances would get closed even if the assertions fail.
| streamInstanceOne.close(Duration.ZERO); | ||
| streamInstanceTwo.close(Duration.ZERO); |
There was a problem hiding this comment.
Close them both asynchronously so we can tear them down in parallel. The try-with-resources will block until they are really closed.
| final String stateDirPathOne = stateDirPath + "/" + appId + "-1/"; | ||
| final KafkaStreams streamInstanceOne = | ||
| buildStreamWithDirtyStateDir(appId, stateDirPathOne, instanceLatch); | ||
| try ( |
There was a problem hiding this comment.
This test isn't logically changed, but I noticed a couple of problems with it that I fixed on the side.
| " smaller than offsetSum=" + offsetSum + ". This probably means the task is corrupted," + | ||
| " which in turn indicates that it will need to restore from scratch, so we pin the lag" + | ||
| " to the end offset of the log."); | ||
| taskLagTotals.put(task, endOffsetSum); |
There was a problem hiding this comment.
This is the fix, to return an accurate estimate instead of throwing an exception. I felt a warning was appropriate, given that this does indicate task corruption, or some other unexpected situation.
There was a problem hiding this comment.
Should we report the lag as the whole log in this case? Even if the log is truncated it is not guaranteed to throw the invalid offset exception and hence task-corruption logic would not necessarily triggered.
There was a problem hiding this comment.
Do we know the exact reason, why the offset sum on the client is larger than the end offset sum on the broker? Before we change this invariant we should make sure we understand why it is not satisfied. Is it our code that breaks it or some external influences that we cannot control?
There was a problem hiding this comment.
@guozhangwang if the end offset is less than the checkpointed offset, how is it possible to not throw a TaskCorruptedException? I thought that was thrown after checking this exact condition?
edit: what I mean is, do we think this is a possible state? If so, we should explicitly check for it and throw TaskCorrupted if detected. (If not, it's an illegal state and thus the check here is appropriate)
There was a problem hiding this comment.
Also, @vvcephei , if a single task is corrupted and we do hit the TaskCorruptedException don't we close, wipe, and recreate stores of all tasks in the thread? Seems we would need to keep track of the thread ownership, and if we detect this condition then set the lag for all tasks owned by that thread to the endOffset
edit: nevermind, I checked the TaskCorruptedExcepion PR, looks like we ended up just setting the invalid offsets to 0 instead of wiping everything. So, disregard this comment 🙂
There was a problem hiding this comment.
Whoops, edited my previous comment before refreshing & seeing your response. Makes sense, although I still believe we should be checking for this and throwing TaskCorrupted if we detect this case (in the owning StreamThread, not in the leader during assignment)
There was a problem hiding this comment.
I think it's could easily happen that any past, present, or future versioned member reports a task sum that's beyond the end-offset sum of what the leader thinks the task contains.
If the corruption comes from our code, we should throw. The leader fetches the end offset shortly before that code, so they are most probably up-to-date. If the corruption is caused by failure we should handle it gracefully.
There was a problem hiding this comment.
Let's think about cases where this could possibly go wrong. We know it must reach or already be in standby/restoring phase. I will use logEndOffset for the actual endOffset of the partition, and endOffset for the offset a standby/restoring task is allowed to restore up to.
Newly-assigned: we read the invalid offset from the checkpoint, and determine the endOffset as either logEndOffset (normal) or min(logEndOffset, LCO) (optimized-source-changelog). In both cases, the endOffset <= logEndOffset so the invalid offset should also satisfy offset > endOffset which we can detect and throw as TaskCorrupted.
Already-assigned: in this case, we must have been the ones who wrote the invalid offset and/or have it in our checkpointableOffsets map. But we still need to fetch the endOffset to know when to stop, so the above applies here as well and we can detect the corruption.
In either case, I'd agree the leader should not throw and should just treat that task's offset sum as 0. But, we should make sure that on the other end we actually do detect this case and handle it the way the assignor expects.
There was a problem hiding this comment.
@ableegoldman If the log is truncated and then immediately more records are appended to go beyond the original log-end-offset, in old versions we would not throw the exception.
After some thoughts I think it makes sense to report the lag as the whole log.
There was a problem hiding this comment.
Thanks for the conversation, all. I think where it nets out for me is that we don't really expect this to happen for non-bugged code, but there are a few edge cases where it could. Thus, logging a warning actually seems reasonable.
Since the graceful handling option is still safe in any case, and since there do exist edge cases where this condition doesn't indicate a bug in Streams, we'll just keep the proposed graceful handling of reporting the lag as the whole log. Either the member in question won't get assigned the task (because it's de-prioritized wrt to other members who report valid positions), and it winds up cleaning up its state dir on its own, or it gets assigned the task, detects the corruption on its side, and recovers appropriately.
| public void shouldReturnEndOffsetSumIfOffsetSumIsGreaterThanEndOffsetSum() { | ||
| final Map<TaskId, Long> taskOffsetSums = Collections.singletonMap(taskId01, 5L); | ||
| final Map<TaskId, Long> allTaskEndOffsetSums = Collections.singletonMap(taskId01, 1L); | ||
| client.addPreviousTasksAndOffsetSums(taskOffsetSums); | ||
| assertThrows(IllegalStateException.class, () -> client.computeTaskLags(allTaskEndOffsetSums)); | ||
| client.computeTaskLags(allTaskEndOffsetSums); | ||
| assertThat(client.lagFor(taskId01), equalTo(1L)); |
There was a problem hiding this comment.
The test for the affected code of course has to change.
| waitForCondition(() -> streamInstanceOne.state().equals(KafkaStreams.State.RUNNING), | ||
| "Stream instance one should be up and running by now"); | ||
| waitForCondition(() -> streamInstanceTwo.state().equals(KafkaStreams.State.RUNNING), | ||
| "Stream instance one should be up and running by now"); |
There was a problem hiding this comment.
Stream instance "one" -> "two"
| LOG.warn("Task " + task + " had endOffsetSum=" + endOffsetSum + | ||
| " smaller than offsetSum=" + offsetSum + ". This probably means the task is corrupted," + | ||
| " which in turn indicates that it will need to restore from scratch, so we pin the lag" + | ||
| " to the end offset of the log."); | ||
| taskLagTotals.put(task, endOffsetSum); |
There was a problem hiding this comment.
prop: Could you explain a bit better what the warning is about? If somebody does not know the code, it is hard to understand what is going on.
|
@guozhangwang @vvcephei could we merge this one? |
|
test this please |
|
LGTM. @vvcephei please feel free to merge. |
|
Thanks, all. I'll make a trivial change to fix some strings in this PR and then merge. |
|
Just got a successful build with |
Relax the requirement that tasks' reported offsetSum is less than the endOffsetSum for those
tasks. This was surfaced by a test for corrupted tasks, but it can happen with real corrupted
tasks. Rather than throw an exception on the leader, we now de-prioritize the corrupted task.
Ideally, that instance will not get assigned the task and the stateDirCleaner will make
the problem "go away". If it does get assigned the task, then it will detect the corruption and
delete the task directory before recovering the entire changelog. Thus, the estimate we provide
accurately reflects the amount of lag such a corrupted task would have to recover (the whole log).
Committer Checklist (excluded from commit message)