KAFKA-8058: Fix ConnectClusterStateImpl.connectors() method#6384
KAFKA-8058: Fix ConnectClusterStateImpl.connectors() method#6384rhauch merged 9 commits intoapache:trunkfrom
Conversation
|
@mageshn @wicknicks @rayokota could you guys take a look when you have a chance? 😃 |
…ception on timeout
kkonstantine
left a comment
There was a problem hiding this comment.
Nice find @C0urante !
I think using FutureCallback already allows you to do what you implement with the countdown latch.
Also I have a question on whether we need to forward this request to the leader as we do elsewhere.
|
Forgot to mention that a unit test would be nice (for the whole class even?), since this seems it never worked because it returned immediately. |
…StateImpl.connectors()
|
@kkonstantine I think I've addressed your two inline comments, and I've added a unit test for the As far as the ten-second timeout goes, I'm open to suggestions but it doesn't appear that rebalances or forwarding to master should be a factor. |
kkonstantine
left a comment
There was a problem hiding this comment.
@C0urante this looks even better now IMO and on par with the rest of the code.
One nit and one more suggested improvement on the tests. Thanks for including those btw. Definitely on the right track.
|
@kkonstantine thanks for the review! I've taken your comments into account and pushed the next iteration, which consists of changes to how the |
|
@rhauch yep, this bug has been present since REST extensions were added to Connect. I've added your suggested change to the PR, let me know if there are any other alterations you'd like. |
ewencp
left a comment
There was a problem hiding this comment.
Mostly looks good, one comment. This is the sort of thing that should come with list of backports -- the bug appears to already be tagged with at least some info about previous versions that are also affected so we probably don't want to close without backporting.
Regarding timeout, it should really be as conservative as possible without being longer than the minimum of (rebalance timeout, default REST API request timeout).
|
@ewencp can you shed some light on why we'd want to take rebalances into account for the timeout? I could be wrong, but I don't think a rebalance would affect the latency of a call to |
|
Calling The thread that processes the requests is also the one managing the |
|
@ewencp thanks for the clarification; I did not realize that membership and request handling were taken care of with the same thread. I've taken your suggestion literally and just made the timeout the minimum of the timeout for the |
|
BTW, I think we want to backport this to 2.0, which according to the KIP is when Connect REST extensions were introduced and AFAICT the code for the |
ewencp
left a comment
There was a problem hiding this comment.
one comment that we could follow up on, but i'm fine merging as is
| config, ConnectRestExtension.class); | ||
|
|
||
| long herderRequestTimeoutMs = ConnectorsResource.REQUEST_TIMEOUT_MS; | ||
| if (config instanceof DistributedConfig) { |
There was a problem hiding this comment.
nit: i don't love instanceof solutions. might be better to generalize thee rebalance timeout so support a small/zero value for StandaloneConfig.
I wouldn't block merging on this, but this is an anti-pattern, so if there is a cleaner solution, it might be worth exploring.
There was a problem hiding this comment.
@C0urante, what do you think about doing something like the following instead of the instanceof block?
Integer rebalanceTimeoutMs = config.getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG);
if (rebalanceTimeoutMs != null) {
herderRequestTimeoutMs = Math.min(herderRequestTimeoutMs, rebalanceTimeoutMs.longValue());
}
This still uses the DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG, but otherwise is simply checking for the existence of that property to determine if herderRequestTimeoutMs should be set to a new value.
There was a problem hiding this comment.
@rhauch sgtm! I've added your code snippet verbatim to the PR
Fixed the ConnectClusterStateImpl.connectors() method and throw an exception on timeout. Added unit test. Author: Chris Egerton <chrise@confluent.io> Reviewers: Magesh Nandakumar <magesh.n.kumar@gmail.com>, Robert Yokota <rayokota@gmail.com>, Arjun Satish <wicknicks@users.noreply.github.com>, Konstantine Karantasis <konstantine@confluent.io>, Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io> Closes #6384 from C0urante:kafka-8058
Fixed the ConnectClusterStateImpl.connectors() method and throw an exception on timeout. Added unit test. Author: Chris Egerton <chrise@confluent.io> Reviewers: Magesh Nandakumar <magesh.n.kumar@gmail.com>, Robert Yokota <rayokota@gmail.com>, Arjun Satish <wicknicks@users.noreply.github.com>, Konstantine Karantasis <konstantine@confluent.io>, Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io> Closes #6384 from C0urante:kafka-8058
Fixed the ConnectClusterStateImpl.connectors() method and throw an exception on timeout. Added unit test. Author: Chris Egerton <chrise@confluent.io> Reviewers: Magesh Nandakumar <magesh.n.kumar@gmail.com>, Robert Yokota <rayokota@gmail.com>, Arjun Satish <wicknicks@users.noreply.github.com>, Konstantine Karantasis <konstantine@confluent.io>, Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io> Closes #6384 from C0urante:kafka-8058
* apache/trunk: MINOR: Add security considerations for remote JMX in Kafka docs (apache#6544) MINOR: Remove redundant access specifiers from metrics interfaces (apache#6527) MINOR: Correct KStream documentation (apache#6552) KAFKA-8013; Avoid underflow when reading a Struct from a partially correct buffer (apache#6340) KAFKA-8058: Fix ConnectClusterStateImpl.connectors() method (apache#6384) MINOR: Move common consumer tests out of abstract consumer class (apache#6548) KAFKA-8168; Add a generated ApiMessageType class KAFKA-7893; Refactor ConsumerBounceTest to reuse functionality from BaseConsumerTest (apache#6238) MINOR: Tighten up metadata upgrade test (apache#6531) KAFKA-8190; Don't update keystore modification time during validation (apache#6539) MINOR: Fixed a few warning in core and connects (apache#6545) KAFKA-7904; Add AtMinIsr partition metric and TopicCommand option (KIP-427) MINOR: fix throttling and status in ConnectionStressWorker KAFKA-8090: Use automatic RPC generation in ControlledShutdown KAFKA-6399: Remove Streams max.poll.interval override (apache#6509) KAFKA-8126: Flaky Test org.apache.kafka.connect.runtime.WorkerTest.testAddRemoveTask (apache#6475) HOTFIX: Update unit test for KIP-443 KAFKA-7190: KIP-443; Remove streams overrides on repartition topics (apache#6511) KAFKA-8183: Add retries to WorkerUtils#verifyTopics (apache#6532) KAFKA-8181: Removed Avro topic from TOC on kafka (apache#6529)
) Fixed the ConnectClusterStateImpl.connectors() method and throw an exception on timeout. Added unit test. Author: Chris Egerton <chrise@confluent.io> Reviewers: Magesh Nandakumar <magesh.n.kumar@gmail.com>, Robert Yokota <rayokota@gmail.com>, Arjun Satish <wicknicks@users.noreply.github.com>, Konstantine Karantasis <konstantine@confluent.io>, Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io> Closes apache#6384 from C0urante:kafka-8058
This makes the
ConnectClusterStateImpl.connectors()method synchronous, whereas before it was implicitly asynchronous with no way to tell whether it had completed or not.More detail can be found in the Jira.
Tested manually. Seems like a light enough change that even unit tests would be overkill, but if reviewers feel differently tests can be added.A unit test has also been added for the
ConnectClusterStateImplclass.Committer Checklist (excluded from commit message)