KAFKA-6145: Pt 2. Include offset sums in subscription#8246
KAFKA-6145: Pt 2. Include offset sums in subscription#8246vvcephei merged 22 commits intoapache:trunkfrom
Conversation
|
test this please |
vvcephei
left a comment
There was a problem hiding this comment.
Thanks, @ableegoldman !
There was a problem hiding this comment.
Let's add this before we try to read the checkpoint file. In case we do get an IOException, we shouldn't forget that we got the lock.
There was a problem hiding this comment.
what should we do if the offset is negative?
There was a problem hiding this comment.
Oh, also, should we detect overflow and pin to MAX_VALUE in that case?
There was a problem hiding this comment.
Q: Would it be too strict to specify offset >= 0 as an invariant and throw an IllegalStateException?
Regarding the overflow, when computing the lag, the sum of the last committed offsets will always be larger or equal to the sum of the offset of the states of a task except for the case where the sum of the last committed offsets has already overflown but the sum of the offset of the states has not. So, if both have overflown then the difference should not be affected by the overflows. If only the sum of the last committed offsets has overflown we need to compute the difference differently, but we are able to recognize this case. All of this assumes that overflows are well defined in Java as MIN_VALUE comes after MAX_VALUE.
There was a problem hiding this comment.
Seems like the only negative offset we can get is -1, which indicates the offset is unknown in which case we should skip it. Will add a check for overflow too
There was a problem hiding this comment.
Hmm. Skipping would count the "current position" on that store as 0. Should we assume "unknown offset" equates to "fully caught up", or is it safer to set it to MAX_VALUE, or something else?
There was a problem hiding this comment.
-1 means that recordMetadata.offset == ProduceResponse.INVALID_OFFSET, which sounds to me like either the topic or producer isn't initialized yet or is corrupted in some way. Both of those make sense (to me) to interpret as an offset of 0. But, I'm no expert in the Producer client and various offset meanings.
w.r.t the matter of overflow, I think we should aim to keep things simple and just pin to MAX_VALUE in the event of overflow, if we think that's likely to be a rare event. Obviously we should at the least make sure we don't crash or seriously harm the operation or results of the app -- pinning to MAX_VALUE will at worst make us potentially switch over to an active task before it's completely caught up.
There was a problem hiding this comment.
Should we try to unlock the rest? Also, we should always include the causing exception.
There was a problem hiding this comment.
This now looks a little suspicious... In the absence of a value, should we assume standbys are caught up (0L), or that they are not (MAX_VALUE)?
There was a problem hiding this comment.
Hm, I guess if we do that then the assignment algorithm should hopefully reduce to the original one during an upgrade while we're pinned to the lower subscription version.
The assignment will be a little screwy on the first VP rebalance while there are mixed subscription versions, but maybe that's not worth worrying about. Probably not worth adding another sentinel value either..
There was a problem hiding this comment.
Actually I reverse, think we should add a (negative) OFFSET_SUM_UNKNOWN sentinel for this case. Either way we have to do some special handling, this way we can still enforce the endOffsetSum >= taskOffsetSum invariant and also don't share this pseudo-sentinel with the overflow case
There was a problem hiding this comment.
Not taking a hard stance on this spelling, just aiming for consistency across the code base
cadonna
left a comment
There was a problem hiding this comment.
@ableegoldman Thank you for the PR.
Here my feedback.
There was a problem hiding this comment.
Q: Wouldn't a log message on DEBUG-level make sense here?
There was a problem hiding this comment.
Actually, what about warn?
There was a problem hiding this comment.
prop:
| long offsetSum = 0; | |
| for (final long offset : changelogOffsets.values()) { | |
| offsetSum += offset; | |
| } | |
| return offsetSum; | |
| return changelogOffsets.values().stream().reduce(0L, Long::sum); |
There was a problem hiding this comment.
prop: rename to sumUpChangelogOffsets
There was a problem hiding this comment.
How about sumOfChangelogOffsets?
There was a problem hiding this comment.
Q: This question is unrelated to your change. Why is firstException an atomic variable? It is a local variable and shutdown() is only called from StreamThread which should be single-threaded. \cc @guozhangwang
There was a problem hiding this comment.
I think it's just more convenient than the conditional block to check if it's null.
There was a problem hiding this comment.
Fair enough if the performance is similar.
There was a problem hiding this comment.
prop:
| for (final TaskId task : prevTasks()) { | |
| taskOffsetSumsCache.put(task, Task.LATEST_OFFSET); | |
| } | |
| for (final TaskId task : standbyTasks()) { | |
| taskOffsetSumsCache.put(task, 0L); | |
| } | |
| prevTasks().forEach((taskId) -> taskOffsetSumsCache.put(taskId, Task.LATEST_OFFSET)); | |
| prevTasks().forEach((taskId) -> taskOffsetSumsCache.put(taskId, 0L)); | |
There was a problem hiding this comment.
Not a big fan of this suggestion. Is there something wrong with loops?
There was a problem hiding this comment.
I'll let you two fight this one out 😄
There was a problem hiding this comment.
Nothing wrong with loops as forEach() is also a loop. If I can write a loop more concisely and I still easily get what they do, I would go for it. This is a proposal, so if @ableegoldman wants to follow it fine, if not I am also fine with it (after a bit of crying).
There was a problem hiding this comment.
req: Why do you verify the setup code here?
There was a problem hiding this comment.
I bet she copied the idiom from all of my tests. I did it because it makes the tests easier to read... I.e., you can visually see what state everything is in. Otherwise you'd have to reason about what state it would be in, given all the mocks above.
There was a problem hiding this comment.
To verify that we're actually testing what we think we're testing, ie if due to some bug in TaskManager 0_0 did not actually reach the RUNNING state, we should fail fast. Otherwise when the test fails, it's not clear that it's due to an unrelated bug rather than a bug in getTaskOffsetSums
There was a problem hiding this comment.
Also what @vvcephei said 🙂 . It's kind of hard to reason about the states based only on calls to TaskManager in this test class
There was a problem hiding this comment.
Fair enough given the complexity of the setup. I guess what disturbs me most is the fact that the setup is so complex.
There was a problem hiding this comment.
req: In general I think this unit test is really large. For the sake of readability and modularization, you should split it into multiple tests. Maybe for each case two unit tests: one with a single case. Then one unit test for the composite scenario with all cases and different occurrences of the different cases. If you extract and parametrize the setup, it should not be too much code duplication. Additionally, a test where stateDirectory.listTaskDirectories() returns an empty array and a test with a stateless task are missing.
There was a problem hiding this comment.
To clarify, you're suggesting to add smaller tests for each case (and edge cases) but also leave the composite test in as well?
There was a problem hiding this comment.
Basically yes. I would extend the composite test with multiple occurrences of each case so that we also cover the scenario where we have -- for example -- n active running task, m active non-running tasks, k standby tasks etc.
If this makes the test too clumsy, you could cover each of n active running task, m active non-running tasks, k standby tasks etc in its own test and make one composite test with one occurrence of each case. Choose what is better readable.
My point is, that the current test does not cover multiple occurrences of the same case.
4e3ebfe to
fe8e2e5
Compare
cadonna
left a comment
There was a problem hiding this comment.
@ableegoldman I like a lot how you cleaned-up the tests. They are much better readable now.
Here my feedback.
There was a problem hiding this comment.
req: Please add a similar unit test as the one above but where stateDirectory.listTaskDirectories() returns null instead of an empty array.
There was a problem hiding this comment.
I think we should actually just make listTaskDirectories() always return an empty File[] instead of null in some cases and new File[0] in others, as we treat both cases the same. WDYT?
There was a problem hiding this comment.
Yeah, I agree with you. I checked the code and it should be OK to always return an empty array. However, could you open a second PR for it that we merge before this one to keep this PR focussed.
There was a problem hiding this comment.
I'm not sure it's really worth doing as a separate PR, it's about 10 lines of code and is only really motivated by the work in this KIP?
There was a problem hiding this comment.
req: If the release of the lock throws, we log an error message in releaseTaskDirLock(id) but swallow the exception here. This seems to me a false alarm. Imagine you analyse the log files and find an error that actually isn't one. I think, we should suppress the log message in this case.
There was a problem hiding this comment.
How about just a warning in releaseTaskDirLock, and then log as an error if it's actually fatal?
There was a problem hiding this comment.
I would prefer to not log anything in releaseTaskDirLock() and check the return value of releaseTaskDirLock() at caller side. That would avoid double log messages due to the same event.
On a different note, do we need to return the exception from releaseTaskDirLock(). We could just throw it and catch it where we call releaseTaskDirLock(). Am I missing something?
There was a problem hiding this comment.
new question: why bother with this complexity? The background cleaner can do its thing when we're not rebalancing, right?
There was a problem hiding this comment.
We have/are making some changes to the directory cleanup (unrelated to 441) that seem like we could end up with a lot of empty directories that have to wait for the cleanup thread to be removed. Since every thread has to go through every task directory and try to lock it, I was thinking we should try to avoid blocking the cleanup thread as much as possible.
Note that currently, the cleanup thread runs every 10 min (by default). If we also choose 10 min as the default for the probing rebalance interval, and leave empty directories locked during a rebalance, we might never delete them (until the probing reblances end of course).
Maybe a better approach is to let the cleanup thread run slightly more frequently to remove empty directories only, and skip anything with remaining state & valid checkpoint. WDYT? If that sounds preferable I can make a ticket to follow up later
There was a problem hiding this comment.
Nevermind the above -- assuming we get this PR into 2.6 as well we can just leverage the new listNonEmptyTaskDirectories and the problem becomes moot. Removed the unlocking attempt from this method for now
There was a problem hiding this comment.
prop: Rename to shouldPinOffsetToLongMaxValueInCaseOfOverflow
There was a problem hiding this comment.
Wow yeah that original test name made no sense, thanks for the prop
|
test this please |
vvcephei
left a comment
There was a problem hiding this comment.
Thanks @ableegoldman , I think this is just about ready. A few final remarks...
There was a problem hiding this comment.
new question: why bother with this complexity? The background cleaner can do its thing when we're not rebalancing, right?
There was a problem hiding this comment.
Still unsure if this is the right logic. What if we just return an "unknown sum" sentinel here? Then, if any store's offset is unknown, then the task's offset sum would also be reported as "unknown", which would let the assignor treat it as "not caught up".
There was a problem hiding this comment.
In what cases might one store's offsets being invalid mean that every other store with valid offsets should not be taken into account?
That's not rhetorical, I really am asking. It's not clear to me exactly when you'd get this invalid offset response -- but if only one partition was having issues (whatever those may be) and the others al had valid, positive offsets, would we have to wipe out the entire state? Do we even check for negative offsets elsewhere in Streams? It's not clear to me that/if we do (in fact several places assume they are always positive and I believe would actually crash if not)
There was a problem hiding this comment.
As far as I can see from the code, an offset of -1 is returned in error case. Additionally, offset -1 is used during initialization of the producer response. I guess in the error case we do not write any offset into the checkpoint file and the offset map. So I suppose that it cannot happen that the offset becomes -1 in this code. So, my proposal would be to double-check my observation. In case it is correct we can specify an invariant that the offset must be >= 0 and throw an IllegalStateException if the invariant is not satisfied.
There was a problem hiding this comment.
We skip adding offsets to the map in the error case, but I asked Jason and apparently there may be a non-error case where -1 is returned with an idempotent producer. Just cc'ed you on the thread
There was a problem hiding this comment.
you could avoid computing the addition twice by checking after this line if offsetSum < 0
There was a problem hiding this comment.
That's what I did originally, Bruno suggested changing it to use else -- but, maybe I misunderstood his actual proposal...I'll set it back
There was a problem hiding this comment.
My original proposal was
if (offset < 0L) {
if (offset == -1L) {
log.debug("Skipping unknown offset for changelog {}", changelog);
} else {
log.warn("Unexpected negative offset {} for changelog {}", offset, changelog);
}
} else {
offsetSum += offset;
if (offsetSum < 0) {
log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id);
return Long.MAX_VALUE;
}
}
I find this easier to read.
There was a problem hiding this comment.
I see, I think I find it easier to read without the else as it makes it clear that we are just adding the offset, except in these two potential edge cases (overflow and negative). But we still need to come to a consensus about how to handle the negative case anyway
|
The test failure seems related: |
|
Also, can you run the system tests before we merge this? |
eaa31be to
35675a7
Compare
|
Streams system tests: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3830/ |
|
System test failures are unrelated to this PR ( |
|
Linking the test results here, since the jenkins build will be cleaned up: http://testing.confluent.io/confluent-kafka-branch-builder-system-test-results/?prefix=2020-03-12--001.1584073791--ableegoldman--KIP-441-send-task-offset-sums--298f1e2/ |
|
test this please |
1 similar comment
|
test this please |
|
test this please |
1 similar comment
|
test this please |
|
Unrelated test failure: |
vvcephei
left a comment
There was a problem hiding this comment.
Did a final pass, and it looks good to me.
| @@ -347,8 +349,14 @@ private synchronized void cleanRemovedTasks(final long cleanupDelayMs, | |||
| * @return The list of all the existing local directories for stream tasks | |||
| */ | |||
| File[] listTaskDirectories() { | |||
There was a problem hiding this comment.
One last thing: Could you open another PR to add unit tests that check that the array is empty for the two edge cases?
There was a problem hiding this comment.
Ack, added the tests to the Pt 2.5 PR
There was a problem hiding this comment.
Actually, ended up doing some additional cleanup on the side so I split it out into a small PR; please give this a quick review.
#8304
* apache-github/trunk: (39 commits) MINOR: cleanup and add tests to StateDirectoryTest (apache#8304) HOTFIX: StateDirectoryTest should use Set instead of List (apache#8305) MINOR: Fix build and JavaDoc warnings (apache#8291) MINOR: Fix kafka.server.RequestQuotaTest missing new ApiKeys. (apache#8302) KAFKA-9712: Catch and handle exception thrown by reflections scanner (apache#8289) KAFKA-9670; Reduce allocations in Metadata Response preparation (apache#8236) MINOR: fix Scala 2.13 build error introduced in apache#8083 (apache#8301) MINOR: enforce non-negative invariant for checkpointed offsets (apache#8297) MINOR: comment apikey types in generated switch (apache#8201) MINOR: Fix typo in CreateTopicsResponse.json (apache#8300) KIP-546: Implement describeClientQuotas and alterClientQuotas. (apache#8083) KAFKA-6647: Do note delete the lock file while holding the lock (apache#8267) KAFKA-9677: Fix consumer fetch with small consume bandwidth quotas (apache#8290) KAFKA-9533: Fix JavaDocs of KStream.transformValues (apache#8298) MINOR: reuse pseudo-topic in FKJoin (apache#8296) KAFKA-6145: Pt 2. Include offset sums in subscription (apache#8246) KAFKA-9714; Eliminate unused reference to IBP in `TransactionStateManager` (apache#8293) KAFKA-9718; Don't log passwords for AlterConfigs in request logs (apache#8294) KAFKA-8768: DeleteRecords request/response automated protocol (apache#7957) KAFKA-9685: Solve Set concatenation perf issue in AclAuthorizer ...
KIP-441 Pt. 2: Compute sum of offsets across all stores/changelogs in a task and include them in the subscription.
Previously each thread would just encode every task on disk, but we now need to read the changelog file which is unsafe to do without a lock on the task directory. So, each thread now encodes only its assigned active and standby tasks, and ignores any already-locked tasks.
In some cases there may be unowned and unlocked tasks on disk that were reassigned to another instance and haven't been cleaned up yet by the background thread. Each StreamThread makes a weak effort to lock any such task directories it finds, and if successful is then responsible for computing and reporting that task's offset sum (based on reading the checkpoint file)
This PR therefore also addresses two orthogonal issues: