KAFKA-10000: Exactly-once support for source connectors (KIP-618)#10907
KAFKA-10000: Exactly-once support for source connectors (KIP-618)#10907C0urante wants to merge 1 commit intoapache:trunkfrom
Conversation
|
@gharris1727 you've been tremendously helpful so far in the design process; if you have time would you like to take a look at the implementation as well? |
8aab3f7 to
6abc98a
Compare
|
There was a small misunderstanding of the rebalance logic for distributed workers. It turns out that they already preemptively stop reconfigured tasks before (re)joining the group, so no additional logic is necessary to stop reconfigured source tasks before they are naturally fenced out by the leader. The KIP has been updated with this note and the changes in the PR related to it have been reverted. |
There was a problem hiding this comment.
I think this JavaDoc should more clearly specify when an implementation may expect this method is called relative to other methods. We don't want to box ourselves in with respect to the implementation, but we also need a clear contract that Connector developers can rely upon.
For example, maybe add something like:
This method may be called by the runtime before the {@link #start} method when the connector is being run with exactly-once support and when the connector is to determine the transactions boundaries.
There was a problem hiding this comment.
Can do. I think a reference to the existing validate method may help clarify things (especially since this method will be used in almost exactly the same way); LMK what you think.
There was a problem hiding this comment.
WDYT about something like this:
/**
* Signals whether the connector implementation is capable of defining the transaction boundaries for a
* connector with the given configuration. This method is called before {@link #start(Map)}, only when the
* runtime supports exactly-once and the connector configuration includes {@code transaction.boundary=connector}.
*
* <p>This method need not be implemented if the connector implementation does not support definiting
* transaction boundaries.
*
* @param connectorConfig the configuration that will be used for the connector
* @return {@link ConnectorTransactionBoundaries.SUPPORTED} if the connector will define its own transaction boundaries,
* or {@link ConnectorTransactionBoundaries.UNSUPPORTED} otherwise.
* @see TransactionContext
*/
There was a problem hiding this comment.
LGTM. I added a small note on not returning null with the same language as in SourceConnector::exactlyOnceSupport but otherwise added this verbatim. LMKWYT
There was a problem hiding this comment.
The SourceTask class is part of the public API for Connect, and so we should have JavaDoc on this enum and its literals and methods.
There was a problem hiding this comment.
Just making sure: is there anything left to address for this comment?
ce53cb9 to
b65ee93
Compare
|
Resolved merge conflicts. Will try to address review comments soon. |
|
@rhauch Is that second pass coming any time soon? It'd be nice if we could get this merged in time for the upcoming 3.1 release. I plan to address the existing comments next week. |
|
Rebased onto #11046 and cleaned up the |
|
Removed from the upcoming 3.1 release plan due to lack of review. @rhauch do you think we can get this merged in time for the 3.2/4.0 release? I'd rather not have to continue cleaning up merge conflicts for this PR and the KIP was accepted several months ago with only one round of review since then. If there just isn't time to look at this please let me know and I can close the PR. |
|
Thanks, @C0urante. I have been trying to get through this large PR, but that takes time and I often get sidetracked by other things, such as AK 3.1 release issues. Please keep this open. I do plan to complete my latest pass of this PR soon. Thanks for your continued patience. |
|
@rhauch gentle reminder, it's been another month and we're still waiting on that next pass. |
|
Hey @rhauch, @C0urante, I've kept an eye on this PR, its associated KIP, for a while. Is there anything I can do to help? Happy to take guidance to split the PR if that would make reviewing easier? P.S. I appreciate I haven't committed to this repository, but all my Kafka work was internal while I was at AWS. |
There was a problem hiding this comment.
Thanks for the updates, @C0urante. And I apologize for the delay, which was due to other commitments.
I've reviewed most of the public API portions, and a fair portion of the Worker and other similar classes. I've not yet made it through the refactoring of the WorkerSourceTask. I've slotted time next week to keep going.
(Also, I responded on a few threads from my previous review. I resolved most of that review's threads where I was happy with your fixes. Hopefully that makes it easier to see the outstanding threads.)
There was a problem hiding this comment.
Please change all of these @since 3.0 to @since 3.1, plus any other Kafka 3.0 references in JavaDoc.
There was a problem hiding this comment.
I believe 3.2 is the correct version now, right? Will update to that.
There was a problem hiding this comment.
WDYT about something like this:
/**
* Signals whether the connector implementation is capable of defining the transaction boundaries for a
* connector with the given configuration. This method is called before {@link #start(Map)}, only when the
* runtime supports exactly-once and the connector configuration includes {@code transaction.boundary=connector}.
*
* <p>This method need not be implemented if the connector implementation does not support definiting
* transaction boundaries.
*
* @param connectorConfig the configuration that will be used for the connector
* @return {@link ConnectorTransactionBoundaries.SUPPORTED} if the connector will define its own transaction boundaries,
* or {@link ConnectorTransactionBoundaries.UNSUPPORTED} otherwise.
* @see TransactionContext
*/
There was a problem hiding this comment.
Nit: using a new paragraph makes this stand out more.
| * For backwards compatibility, the default implementation will return {@code null}, but connector developers are | |
| * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are |
There was a problem hiding this comment.
Ack, done. (And added a few more paragraph breaks in other places too).
There was a problem hiding this comment.
IIUC, the WorkerTransactionContext is the only implementation of this. That means that if a connector is configured with transaction.boundary=poll or transaction.boundary=interval, a poorly-implemented connector could still call these methods and they'd unnecessarily accumulate records.
WDYT about in such cases the SourceTaskContext#transactionContext() method returning a no-op implementation of this interface, so no harm is done if a connector implementation still calls these methods when transaction.boundary is not set to connector?
Maybe we could consider a warning log message if these methods are called by a connector inappropriately. But we have to be careful. While such log messages might be useful for the developer of a connector plugin, I would argue that prolific warnings are actually harmful for a user trying to use a connector plugin they didn't develop with a connector configuration that includes transaction.boundary=poll or transaction.boundary=interval. So maybe it's worthwhile for the "no-op" implementation to only log each warning once per method per instance.
There was a problem hiding this comment.
The behavior specified in the KIP is for SourceTaskContext::transactionContext to return "the transaction context, or null if the connector was not configured to specify transaction boundaries" (see the Javadocs for that method in the code snippets in the relevant KIP section).
This is implemented in the ExactlyOnceWorkerSourceTask class and should prevent connectors from invoking TransactionContext methods and accumulating records when the user has not configured the connector to define its own transaction boundaries.
There was a problem hiding this comment.
I think the risk of introducing options() is that some developers might accidentally use values().
The pattern used in ConnectorType is far better, as it overrides the toString() method. That doesn't handle the case-independence for parsing, though ConverterType is a better pattern to follow if that's required.
Let's be consistent with the new enums, and have each follow one of those two patterns depending upon whether parsing case-independently is required.
There was a problem hiding this comment.
Ah, so here's one example of how these properties cannot be overridden by the connector config. But that's not quite so obvious for some other calls to ConnectUtils.warnOnOverriddenProperty.
Is it not feasible to reset the expected value on the props within the ConnectUtils#warnOnOverriddenProperty method?
There was a problem hiding this comment.
This is more of a nit-type comment.
It seems like this comment tries to describe why we don't need to create a connector-specific offset store AND then talk about the check might not always be accurate. But I found the comment a bit hard to follow.
Also, the ConnectorOffsetBackingStore offsetStore; line should move under the multi-line comment.
In such cases, it might be worth changing the code to help make things more clear. For example:
// We can simply reuse the worker's offset store when the connector-specific offset topic
// is the same as the worker's. We can check the offset topic name and the Kafka cluster's
// bootstrap servers, although this isn't exact and can lead to some false positives if the user
// provides an overridden bootstrap servers value for their producer that is different than
// the worker's but still resolves to the same Kafka cluster used by the worker.
// At the moment this is probably adequate, especially since we probably don't want to put
// a network ping to a remote Kafka cluster inside the herder's tick thread (which is where this
// logic takes place right now) in case that takes a while.
ConnectorOffsetBackingStore offsetStore;
final boolean sameOffsetTopicAsWorker = offsetsTopic.equals(config.offsetsTopic())
&& producerProps.get(BOOTSTRAP_SERVERS_CONFIG).equals(config.bootstrapServers();
if (sameOffsetTopicAsWorker) {
offsetStore = ConnectorOffsetBackingStore.withoutConnectorOffsetStore(
...
} else {
offsetStore = ConnectorOffsetBackingStore.withConnectorOffsetStore(
...
}
There was a problem hiding this comment.
Ack, done. This logic has been moved into an isolated method and I've moved the relevant portions of the comment with it.
There was a problem hiding this comment.
Very nice. For this and the other new getter methods, can you add some JavaDoc just so that it's easier to follow in an IDE where these methods are used? I wanted that several times as I was reviewing this code.
There was a problem hiding this comment.
Sorry, I'm a little unclear on what the benefits of Javadocs here are. I've added tags for each of the relevant methods but would you mind elaborating? In IntelliJ, I can see where this method is invoked by cmd+clicking the bootstrapServers() declaration, and wherever it's invoked, I can go to the declaration by cmd+clicking the invocation.
I'm aware that not everyone uses my IDE and certainly don't expect them to start doing so; just wondering if there's a different IDE out there with behavior that makes Javadocs like these significantly more powerful.
There was a problem hiding this comment.
As mentioned above, IIUC this PR will sometimes pass a null offsetStorageReader (IIRC for sink connectors), but this class currently expects that to be null. Might be worth adding a Objects.requireNonNull(...) call here to help catch that situation.
There was a problem hiding this comment.
I opted for an alternative approach where the offset reader is allowed to be null, but I can also see the benefit of a conditional call to Objects::requireNonNull. I've added one in WorkerConnector::initialize; let me know what you think.
There was a problem hiding this comment.
Note to future me: I didn't get this far in the PR.
There was a problem hiding this comment.
May be best to hold off on this section for a bit; there's a fair amount of rebasing that needs to be done now that #11323 has been merged.
|
Thanks Randall, I've addressed all of your comments either with code changes or an inline response in the appropriate thread. I've noted this elsewhere but it bears repeating: the The The new admin client API for fencing producer transactions should also be safe to review (although there are some conflicts in the
This made it very easy to follow along, thanks 👍 |
There was a problem hiding this comment.
I've filed #11508 as a fix for KAFKA-9279. The workaround here isn't terribly awkward so it probably isn't necessary for us to block on that fix PR, but it'd be nice if someone could find the time to review it.
There was a problem hiding this comment.
This will have to be tweaked slightly in order to invoke SourceTask::commitRecord and correctly track the number of in-flight records for metrics. Leaving as-is for now since this part of the PR will have to be updated anyways once rebased onto trunk.
|
@C0urante Thanks for the PR! It's really massive! Can we try to cut it in smaller chunks? That would make it a lot easier to review and allow merging bit by bit. |
|
Thanks Mickael, I can take a stab at that. I don't think I can get each PR to be buildable on its own without a lot of work but I can definitely isolate high-level sections into their own PRs, probably following the breakdown in the description for this one. I'm taking this week off but will hopefully be able to get a rebase done and split things up by Wednesday the 16th. |
bba337e to
363a1db
Compare
Implements KIP-618.
Depends on changes from #11046
There are several changes here that can be reviewed fairly independently of each other:
ExactlyOnceWorkerSourceTaskclass, its newly-introducedAbstractWorkerSourceTasksuperclass, theWorkerclass (whose API for task starts has been split up from the existingstartTaskmethod into separatestartSinkTask,startSourceTask, andstartExactlyOnceSourceTaskmethods), and theWorkerTransactionContextclass (which is used to allow connectors to define their own transaction boundaries)DistributedHerder,ConfigBackingStore,KafkaConfigBackingStore, andClusterConfigStateclassesAdminAPI for fencing out transactional producers by ID, which is done with changes to theAdmininterface (unsurprisingly) and theKafkaAdminClientclassWorker,OffsetStorageReaderImpl, andOffsetStorageWriterclassesSourceConnectormethods for communicating support for exactly-once guarantees and connector-defined transactions; these take place in theSourceConnectorclass (also unsurprisingly) and theAbstractHerderclassExisting unit tests are expanded where applicable, and new ones have been introduced where necessary.
Eight new integration tests are added, which cover scenarios including preflight validation checks, all three types of transaction boundary, graceful recovery of the leader when fenced out from the config topic by another worker, ensuring that the correct number of task producers are fenced out across generations, accurate reporting of failure to bring up tasks when fencing does not succeed (includes an ACL-secured embedded Kafka cluster to simulate one of the most likely potential causes of this issue--insufficient permissions on the targeted Kafka cluster), and the use of a custom offsets topic.
Many but not all existing system tests are modified to add cases involving exactly-once source support, which helps give us reasonable confidence that the feature is agnostic with regards to rebalance protocol. A new test is added that is based on the existing bounce test, but with no sink connector and with stricter expectations for delivery guarantees (no duplicates are permitted).
Committer Checklist (excluded from commit message)