KAFKA-3645: Fix ConsumerGroupCommand and ConsumerOffsetChecker to correctly read endpoint info from ZK#1301
KAFKA-3645: Fix ConsumerGroupCommand and ConsumerOffsetChecker to correctly read endpoint info from ZK#1301arunmahadevan wants to merge 2 commits intoapache:trunkfrom
Conversation
The host and port entries under /brokers/ids/<bid> gets filled only for PLAINTEXT security protocol. For other protocols the host is null and the actual endpoint is under "endpoints". This causes NPE when running the consumer group and offset checker scripts in a kerberized env. By always reading the host and port values from the "endpoint", a more meaningful exception would be thrown rather than a NPE.
|
+1 |
| brokerInfo.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port, | ||
| 10000, 100000, "ConsumerGroupCommand")) | ||
| case None => | ||
| throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)) |
There was a problem hiding this comment.
This can be written a bit more nicely along these lines:
zkUtils.getBrokerInfo(brokerId).map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)).map { endPoint =>
new SimpleConsumer(...)
}.getOrElse(throw new ....)Same applies to the other class.
|
Thanks for the PR, I left a minor comment, looks good otherwise. Can you please update the PR description to mention that this only happens with the old consumer (the PR description becomes the commit message)? With the new consumer (which only |
|
@ijuma thanks for reviewing. Changed PR title and refactored to use map and orElse. |
|
LGTM |
|
@arunmahadevan, because our test coverage for these tools is not good, we need to manually test changes like this. Can you please describe the steps to reproduce the NPE? |
|
@ijuma sorry for not responding earlier, somehow I missed the email notification. I was testing with a repo which had diverged from trunk and had some other fixes. The NPE does not seem to occur with the current code in trunk since the channel to the offset manager is opened first where it specifically requests for a PLAINTEXT endpoint - link. It then exits with a message "Error while executing consumer group command End point PLAINTEXT not found for broker 0". The method To test this I ran the below command after enabling kerberos with SASL_PLAINTEXT listener. The proposed patch fixes getZkConsumer to read the endpoint info correctly which could potentially cause NPE if invoked from a different context in future, but right now there is no NPE since the code that invokes this method fails fast. |
|
@arunmahadevan, thanks for the explanation. Can you update the PR description to be accurate then? The PR description becomes the commit message in the merged commit, so its contents are important. |
|
@ijuma Updated the PR title. Let me know if it makes sense. |
|
Thanks for the PR, LGTM. Merged to trunk. |
Conflicts: GroupCoordinatorIntegrationTest.scala - line 50 SocketServerTest.scala - TestableSocketServer class, removed verifyAcceptorIdlePercent() method, apache#1301 removed pollBlockMs var used in apache#1344 poll() method; testConnectionRateLimit() method changed SocketServer.scala - apache#82, removed SocketServerMetricsGroup var, apache#455 removed idlePercentMeter KafkaApis.scala - apache#50 imports RequestQuotaTest.scala - apache#28 imports
* Override doWork to poll requests every 10 ms * Needs more analysis for large scale deployment
The host and port entries under /brokers/ids/ gets filled only for PLAINTEXT security protocol. For other protocols the host is null and the actual endpoint is under "endpoints". This causes NPE when running the consumer group and offset checker scripts in a kerberized env. By always reading the host and port values from the "endpoint", a more meaningful exception would be thrown rather than a NPE.