Remove consumer.listTopics() method in case when too many topics in kafka causes the FullGC in Overlord#6455
Conversation
clintropolis
left a comment
There was a problem hiding this comment.
This does seem like it would be more efficient than pulling the list of all topics 👍, but Travis failures look perhaps related to this PR, maybe something to do with the removal of the lock?
| } | ||
|
|
||
| List<PartitionInfo> partitions = topics.get(ioConfig.getTopic()); | ||
| List<PartitionInfo> partitions = consumer.partitionsFor(ioConfig.getTopic()); |
There was a problem hiding this comment.
Why did the lock and exception handling get removed? The lock seems needed, and checking the docs partitionsFor can throw a handful of exceptions too:
Throws:
WakeupException - if wakeup() is called before or while this function is called
InterruptException - if the calling thread is interrupted before or while this function is called
AuthorizationException - if not authorized to the specified topic
TimeoutException - if the topic metadata could not be fetched before expiration of the configured request timeout
KafkaException - for any other unrecoverable errors
There was a problem hiding this comment.
Thank you for reviewing this. I aggree that the lock and exception handing should not be removed. I will fix later.
|
Dumb question why do we even need to list all topics if the user supply the input topic? Can this operation be removed. |
|
@elloooooo am wondering how big the topic list you guys had to see this issue in practice thanks! |
|
@b-slim Someone used to create 20000+ topics at one of our kafka clusters which supports creating topic when needed. Actually his code has bugs. |
…nd remove some useless checks
| log.warn("No such topic [%s] found, list of discovered topics [%s]", ioConfig.getTopic(), topics.keySet()); | ||
| } | ||
| int numPartitions = (partitions != null ? partitions.size() : 0); | ||
| int numPartitions = partitions.size(); |
There was a problem hiding this comment.
Looks like partitions can be null. Should be int numPartitions = (partitions != null ? partitions.size() : 0);.
There was a problem hiding this comment.
I think the partitions can be null only when consumer.partitionsFor(ioConfig.getTopic()) throw exceptions. And the exception has been catched and the function get return. So when partitions is null, there is no chance run int numPartitions = partitions.size();, is it?
There was a problem hiding this comment.
This is the code of KafkaConsumer.partitionsFor().
public List<PartitionInfo> partitionsFor(String topic) {
acquire();
try {
Cluster cluster = this.metadata.fetch();
List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
if (parts != null)
return parts;
Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
new MetadataRequest.Builder(Collections.singletonList(topic)), requestTimeoutMs);
return topicMetadata.get(topic);
} finally {
release();
}
}Since topicMetadata is a map, topicMetadata.get(topic); can return null.
There was a problem hiding this comment.
Ah~ I didn't notice that. Thank you. I will fix it.
|
@elloooooo thanks for the quick fix! The Travis fail looks legit. Would you please fix it? |
|
@elloooooo NVM. Looks like it's caused by the bug which #6466 fixes. |
|
Would you please check the CI failure? |
|
@jihoonson I can't figure it out why the travis-ci failed.
Could you give me some suggestions? |
|
can you try to run druid-server tests on your local machine? seems to be a locking issue, but not sure. i have restarted the build anyway. |
|
Hmm, they got stuck again but I restarted them again. |
| private void addSomeEvents(int numEventsPerPartition) throws Exception | ||
| { | ||
| //create topic manually | ||
| AdminUtils.createTopic(zkUtils, topic, NUM_PARTITIONS, 1, new Properties(), RackAwareMode.Enforced$.MODULE$); |
There was a problem hiding this comment.
Hmm, would you tell me why this is needed?
There was a problem hiding this comment.
When the value of setting auto.craete.topics.enable is ture(default), consumer.partitionsFor(SOME_TOPIC) will create the SOME_TOPIC if the topic doesn't exist. This leads some UT like testXXXNoTasks() failed. So I disable it but this lead some UTs which need sending some events to kafka fail. So I create the topic in kafka manually before sending data.
There was a problem hiding this comment.
Haha, it looks like there was something wrong in Github. Got it. Thanks.
There was a problem hiding this comment.
Sorry about that ~ I will try to delete the duplicate comment~
|
I suspect a race problem of KafkaSupervisorTest. All tests in KafkaSupervisorTest basically executes the below code. However, I left one question. Please take a look. |
…afka causes the FullGC in Overlord (apache#6455) * remove consumer.listTopics() method * add consumerLock and exception handling for consumer.partitionFor() and remove some useless checks * add check in case consumer.partitionsFor() returns null * fix CI failure * fix failed UT * Revert "fix CI failure" This reverts commit f839d09. * revert unless commit and re-commit the useful part to fix failed UT
When there are too many topics in kafka cluster, the method consumer.listTopics() will leads to Full GC in Overlord.