KAFKA-4794: Add access to OffsetStorageReader from SourceConnector#2604
KAFKA-4794: Add access to OffsetStorageReader from SourceConnector#2604rhauch merged 5 commits intoapache:trunkfrom
Conversation
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Will this path be merged? |
e2bb682 to
9f38088
Compare
rhauch
left a comment
There was a problem hiding this comment.
Looks great, @fhussonnois! I do have a few specific requests, but all are pretty minor.
There was a problem hiding this comment.
Is it possible to trigger infinite loop:
raiseError -> reconfig -> raiseError -> reconfig ...
9f38088 to
a2fdc01
Compare
|
Thank you @rhauch for your comments. I've updated the PR. |
a2fdc01 to
10d7118
Compare
10d7118 to
cfc2b60
Compare
|
@rhauch is there anything I should add or modify for the PR ? |
|
@fhussonnois, I just submitted the KIP for a vote, which will be open for a few days. This can be merged once that is approved. |
cfc2b60 to
417aa04
Compare
|
I also needed this feature. #feelsbad |
|
@fhussonnois, would you mind updating this PR to eliminate the conflicts? Also, while I hope to get this into AK 2.6, the KIP hasn't yet passed. I did make a suggestion on the vote thread, though, to consider making it much easier for connectors that want to also run in older versions of Connect to easily get the OffsetStorageReader when Connect supports it without jumping through a lot of ugly hoops. |
|
ok to test |
rhauch
left a comment
There was a problem hiding this comment.
Thanks, @fhussonnois. I have a few more suggestions to clean up the implementation a bit more, to improve backward compatibility, and to ensure the new code is adequately tested.
There was a problem hiding this comment.
At one point these were anonymous classes, and the delegation perhaps made a bit more sense. Now that they are inner classes, it seems like it would be a lot less confusing and simpler if the DelegateToWorkerConnectorContext class were extended by the DelegateSinkConnectorContext and DelegateSourceConnectorContext classes. Doing this has several advantages:
- removes nesting/delegation and eliminates the chance of getting into an infinite loop
- eliminates duplicate code in these classes
- simplifies the context classes quite a bit
- makes it more obvious that the sink connector context has nothing extra over the base
- makes it more obvious that the source connector context only adds the offset storage reader
- simplifies this code a bit (see below)
By keeping DelegateToWorkerConnectorContext class non-abstract, we're actually able to maintain backward compatibility by calling initialize when the connector class is neither a source or sink connector:
This code becomes simpler, too:
| final ConnectorContext delegateCtx = new DelegateToWorkerConnectorContext(); | |
| if (isSinkConnector()) { | |
| SinkConnectorConfig.validate(config); | |
| connector.initialize(new DelegateSinkConnectorContext(delegateCtx)); | |
| } else { | |
| connector.initialize(new DelegateSourceConnectorContext(delegateCtx, offsetStorageReader)); | |
| } | |
| if (isSinkConnector()) { | |
| SinkConnectorConfig.validate(config); | |
| connector.initialize(new DelegateSinkConnectorContext()); | |
| } else if (isSourceConnector()) { | |
| connector.initialize(new DelegateSourceConnectorContext(offsetStorageReader)); | |
| } else { | |
| connector.initialize(new DelegateToWorkerConnectorContext()); | |
| } |
This seems a little strange, but we're actually not checking whether the Connector instance passed into this WorkerConnector is only an instance of SourceConnector or SinkConnector. If it is neither, prior to this change the framework would still initialize it, and I think we should maintain that arguably strange behavior just to maintain backward compatibility.
There was a problem hiding this comment.
Nit: let's avoid unrelated line additions.
There was a problem hiding this comment.
Nit: let's avoid unrelated line additions.
|
retest this please |
|
ok to test |
rhauch
left a comment
There was a problem hiding this comment.
@fhussonnois, thanks again for doing the brunt of the work on this feature. We're getting close to AK 2.6.0 feature freeze, and it'd be great to merge this feature now that KIP-131 has been adopted. Since my previous review was some time ago, I hope you don't mind that I pushed the remaining changes I suggested in my previous review. I've also dismissed my previous "Request Changes" review.
@kkonstantine would you mind taking a look? I'd like to have an independent review of my changes. Thanks!
|
@rhauch thank you very much for finalizing this PR; I apologize for not finding any free time to work on it these past few weeks. |
|
retest this please |
|
Just one failure in each of the 3 builds, and none were in Connect unit or integration tests. |
kkonstantine
left a comment
There was a problem hiding this comment.
Thanks for the original contribution and the KIP @fhussonnois
@rhauch thanks for completing this PR. Overall looks good. I have a few minor comments and then I think that maybe we shouldn't plan for a connector implementation that is not a sink or a source in the tests. The code would break elsewhere anyways, and this is not supported by they framework today. Wdyt?
Add two interfaces SinkConnectorContext/SourceConnectContext that extend ConnectorContext in order to expose an OffsetStorageReader instance.
…initialization in WorkerConnector
…nector default methods
…ce or sink, and incorporated other feedback
|
Rebased to correct conflicts. |
|
ok to test |
|
retest this please |
|
All 3 builds had test failures on known flaky Streams integration tests, but all Connect-related tests passed. |
kkonstantine
left a comment
There was a problem hiding this comment.
This looks great now. Happy to see this feature being added!
Thanks @rhauch
LGTM
This a first attempt to implement Add access to OffsetStorageReader from Source Connector.
I am not sure if I did it right so I prefer to ask you your feedback. I still need to write some tests.