KAFKA-10000: Zombie fencing (KIP-618)#11779
Conversation
|
Converting to draft until upstream PRs are reviewed. |
9ec47e3 to
5651509
Compare
5651509 to
733752d
Compare
|
Given that all merge conflicts have been resolved and #11778 has already been approved, marking this as ready for review. |
733752d to
1683603
Compare
tombentley
left a comment
There was a problem hiding this comment.
Thanks @C0urante, made a first pass.
There was a problem hiding this comment.
There's a resource leak if the whenComplete never calls the passed lambda. I think you should be able to call admin.close in a catch(Exception).
There was a problem hiding this comment.
This pattern of swapping out class loaders is pretty common, but also a little verbose. Perhaps Plugins could expose a withClassloader(ClassLoader) method that returned an AutoClosable, so that call sites like this could use try-with-resources and the compiler could warn about leaking resources?
There was a problem hiding this comment.
Ah yeah, been toying with that idea for a while but never got around to trying it out. Works pretty well in this case; the one wrinkle is that the signature for AutoCloseable::close includes a checked exception. I've added a new (internal) LoaderSwap class that implements AutoCloseable and removes that checked exception to address that.
If this looks good, we can retrofit other parts of the code base to leverage it in a follow-up.
There was a problem hiding this comment.
what happens if it's not a source connector?
There was a problem hiding this comment.
The callback is invoked with an error (added to Javadocs)
There was a problem hiding this comment.
I wonder if runOnTickThread might be a better name, since it more explicitly describes what it's doing?
There was a problem hiding this comment.
Surely we should always invoke the callback, even on success, since that's the contract for Callback?
There was a problem hiding this comment.
This follows the same pattern as AbstractHerder::maybeAddConfigErrors, which accepts a Callback but only invokes it on errors. This is useful if you'd like to establish some reusable logic that terminates control flow for a method and reports an error to a callback if something goes wrong, but otherwise allows control flow to continue and possibly fail later.
I'll take a page out of AbstractHerder::maybeAddConfigErrors's book and add Javadocs making note of this fact.
There was a problem hiding this comment.
Thanks for the explanation and the Javadoc.
There was a problem hiding this comment.
So we don't consider 1 minute 'very long'?
There was a problem hiding this comment.
It seemed reasonable considering how vital being able to reach the config topic is to the health of a Connect worker, and that the penalty for failure here is that a task will fail to start. But given the existing workerSyncTimeoutMs field and its use, it seems better to just follow that precedent and use that value to dictate how long we're willing to wait to reach the end of the config topic in most cases.
There was a problem hiding this comment.
There's a lot of repetition of this if (!writeToConfigTopicAsLeader()){ throw new ConnectException} pattern. In fact it look like all invocations of writeToConfigTopicAsLeader are of this form. So what not just put the if/throw within writeToConfigTopicAsLeader?
There was a problem hiding this comment.
I pushed a change to #11778 that basically does this; will rebase and update the new config topic writes introduced in this PR accordingly. One noteworthy difference now is that the exception message is always the same regardless of which operation failed; I tried to make it generic and user-friendly enough to work with that, but if that doesn't work well enough, we can add a message parameter to this method and use it as part of the message for the exception that gets thrown on failure. BTW, it might be more helpful to leave comments about this topic on that PR, but I'll do my best to handle them either way.
There was a problem hiding this comment.
Is it OK to not invoke the callback in the case where we weren't leader?
There was a problem hiding this comment.
Yes, although the internal API for this is a little convoluted:
addRequestaccepts an action (aCallable<Void>) and a callback (aCallback<Void>)- When requests submitted to
addRequestare run, the callback is always invoked after they complete; if they throw an exception, it's invoked with that exception, and if they don't, it's invoked withnullfor both parameters - The callback we pass to
addRequesthere is the result of wrapping the callback given to thedeleteConnectorConfigmethod in theforwardErrorCallbackmethod, which causes it to be invoked if and only if an exception is thrown when the request is run - As a result, if we throw any exceptions from the action that we pass to
addRequest, they're guaranteed to be passed to the callback supplied todeleteConnectorConfig
Although I think it's cleaner to throw exceptions instead of invoking Callback::onCompletion with an exception and then doing a return null, for consistency's sake, it's probably better to do the former, since that's the existing pattern. I'll address this first in #11778 and then add it here in the subsequent rebase.
There was a problem hiding this comment.
On second thought, I think it's probably fine to leave things as they are without adding a manual invocation of Callback::onCompletion and a return null. Yes, writeToConfigTopicAsLeader may throw an exception, but so could writes to the config topic before changes for this KIP were made (such as here, here, and here).
If we were throwing an exception from within the body of the herder request instead of a method that the request invokes, it'd make sense to change that to instead be a manual invocation of the callback with the exception. But just calling a method that might throw an exception is different, and follows existing precedent in the code base without having to jump through special callback-related hoops.
There was a problem hiding this comment.
This could block indefinitely, since KafkaConfigBackingStore calls configLog.readToEnd().get(), which seems at odds with the it should not block for very long requirement.
There was a problem hiding this comment.
Good point, replaced configLog.readToEnd().get() with configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS), which is used everywhere else in the KafkaConfigBackingStore where we read to the end of the log to ensure that writes that we just performed have landed. It comes with the downside that it makes zombie fencing rounds more frail, but that's better than squatting indefinitely on the herder thread.
I also fixed another potential blocking issue around this area by shifting the call to onZombieFencingSuccess (or rather, the registration of it as a follow-up to the future returned by Worker::fenceZombies) into a separate method that can then be invoked after the ZombieFencing object has been constructed and the lock on the DistributedHerder instance has been relinquished.
There was a problem hiding this comment.
I find the method name a bit confusing, because it sends in either case. Perhaps something like sendPossiblyFencibly would be better, wdyt?
There was a problem hiding this comment.
sendPossiblyFencibly (fencably?) does work but it's a bit verbose. Do you think sendPrivileged works? It refers to the concept inherited by the ConfigBackingStore interface and its claimWritePrivileges method, and the write itself is technically privileged in that it should only ever be performed by the leader, even if those privileges are only enforced when the backing store is configured to use a fencable producer.
There was a problem hiding this comment.
Changed to sendPrivileged, can change to something else if desired
1683603 to
2d4b74f
Compare
|
Thanks Tom, some great catches. Going to rebase tomorrow or Thursday which should address the one or two outstanding comments; everything else should be addressed now and ready for another round. |
There was a problem hiding this comment.
What about fenceZombieSourceTasks()? I find fenceZombies() a bit too generic
There was a problem hiding this comment.
Do you already have the PR that clears this?
There was a problem hiding this comment.
No, but I will update #11782 to remove it as soon as this is merged.
There was a problem hiding this comment.
Do we need this? Also does this method need to return Response?
There was a problem hiding this comment.
This is to force a 200 OK response instead of a 204 no content response, which would be returned otherwise. I'd just use a 204 except the KIP specifies that this endpoint should "serve an empty-bodied 200 response" and I wanted to stick to that.
Given that this endpoint is internal and it's a tiny detail, I'd be fine with switch to a 204 response if it's alright with you.
There was a problem hiding this comment.
As far as I can tell the other internal endpoint returns 204 so I'd be in favor of doing the same here
There was a problem hiding this comment.
This type of small cleanups are really appreciated, thanks!
There was a problem hiding this comment.
I was confused for a moment as I remember seeing these methods in another PR. I see this PR has conflicts so this must be the reason and they'll disappear from here once this is rebased on trunk
There was a problem hiding this comment.
Yep, exactly 👍
Going to try to do the rebase today, but may not be able to finish by EOD as it's going to be fairly involved.
There was a problem hiding this comment.
Ah whoops, those changes were made on #11780, which hasn't been merged yet, so a rebase isn't going to automatically draw them in. I'll do the change manually here but there may be other small changes in not-yet-merged PRs that don't get pulled in here. It should be fine as those changes are included in whichever PR gets merged last.
There was a problem hiding this comment.
Rebase complete; should be resolved now.
There was a problem hiding this comment.
Is this going to be called from other places in the remaining PRs? If not we could get rid of it
There was a problem hiding this comment.
There was a problem hiding this comment.
Would task_count or even count (like state) be clearer?
There was a problem hiding this comment.
I think given the key format ("tasks-count-<connector>") this is probably fine, and the name of the field is also specified in the KIP. But similar to the 200 vs. 204 HTTP response for the fencing endpoint, this is internal and a small detail, so I can change it if we agree that this kind of detail doesn't need to precisely match what's in the KIP.
There was a problem hiding this comment.
I wonder whether the key format should assume that a count is involved, or whether it should be named for the purpose to which it's being put (zombie fencing). e.g. maybe tasks-fencing-<connector> is a better key, with task_count at the field name for this V0 schema which happens to use just the count as the implementation?
There was a problem hiding this comment.
Haha yep, caught and fixed this in an upstream PR that's since been merged. Will pick up in the rebase.
There was a problem hiding this comment.
Rebase complete; should be resolved now.
There was a problem hiding this comment.
Isn't ordering still guaranteed with retries when idempotency is enabled?
There was a problem hiding this comment.
Yep, this got fixed in #11778, which just got merged. A rebase should take care of this.
There was a problem hiding this comment.
Rebase complete; should be resolved now.
2d4b74f to
9d7ce0a
Compare
|
Thanks for the quick updates. I'll try to make another pass tomorrow |
9d7ce0a to
6d7d814
Compare
tombentley
left a comment
There was a problem hiding this comment.
Thanks for the fixes @C0urante! I've left a bunch more comments, but these are nits, and assuming you agree with them this now LGTM.
There was a problem hiding this comment.
Trivial point, but swapping the order of these parameters would match the order that they're used in test(), and, at the constructor call site, the method, path ordering matches how these things appear in an actual HTTP request.
There was a problem hiding this comment.
I guess we could also swap there parameter order here too?
There was a problem hiding this comment.
Thanks for the explanation and the Javadoc.
There was a problem hiding this comment.
Can we document that access is protected by this object's monitor.
There was a problem hiding this comment.
This can be final too, I think
There was a problem hiding this comment.
It's initialized in start(), not in the constructor.
There was a problem hiding this comment.
I wonder whether the key format should assume that a count is involved, or whether it should be named for the purpose to which it's being put (zombie fencing). e.g. maybe tasks-fencing-<connector> is a better key, with task_count at the field name for this V0 schema which happens to use just the count as the implementation?
There was a problem hiding this comment.
We should include the connector name, I think.
There was a problem hiding this comment.
It's implicitly included in the record key but that info is redundant. Updated to just use the connector name and make the message clearer
There was a problem hiding this comment.
We're adding this else if clause to a method that's now ~250 lines long. I think we can factor the block of each if and else if into its own method.
There was a problem hiding this comment.
Yeah, we've definitely reached that point 👍
While doing this decomposition I kept the bodies for each new method identical to the if/else if blocks that they were extracted from, with these exceptions:
- Log messages that include the record key are adjusted to use the connector name in its place (this doesn't drop any information)
- Calls to
Object::getClassfor logging messages are all converted to calls to the newly-introduced and null-safeclassNamemethod - The
@SuppressWarnings("unchecked")annotation is removed from method signatures and is instead added only to assignments within the method bodies that require it
6d7d814 to
5a043d6
Compare
7f8f89f to
920a03d
Compare
|
Thanks Mickael 👍 |
…-2022 * apache/trunk: (52 commits) KAFKA-13967: Document guarantees for producer callbacks on transaction commit (apache#12264) [KAFKA-13848] Clients remain connected after SASL re-authentication f… (apache#12179) KAFKA-10000: Zombie fencing logic (apache#11779) KAFKA-13947: Use %d formatting for integers rather than %s (apache#12267) KAFKA-13929: Replace legacy File.createNewFile() with NIO.2 Files.createFile() (apache#12197) KAFKA-13780: Generate OpenAPI file for Connect REST API (apache#12067) KAFKA-13917: Avoid calling lookupCoordinator() in tight loop (apache#12180) KAFKA-10199: Implement removing active and standby tasks from the state updater (apache#12270) MINOR: Update Scala to 2.13.8 in gradle.properties (apache#12273) MINOR: add java 8/scala 2.12 deprecation info in doc (apache#12261) ... Conflicts: gradle.properties
Implements the zombie fencing logic described in KIP-618 (except for the portion already covered by #11778).
Relies on changes from:
Note that none of the logic here actually causes zombie fencing to take place, it only implements the internal API required to perform zombie fencing. Downstream PRs will actually put this logic into play.