From b33a03fb37033f5e5307815cba67802e8477ddb2 Mon Sep 17 00:00:00 2001 From: liuboyu Date: Fri, 12 Oct 2018 16:26:18 +0800 Subject: [PATCH 1/7] remove consumer.listTopics() method --- .../kafka/supervisor/KafkaSupervisor.java | 38 ++++++------------- 1 file changed, 12 insertions(+), 26 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index b7845cae2206..e73eaff3a097 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -767,8 +767,8 @@ void resetInternal(DataSourceMetadata dataSourceMetadata) // as well as the case where the metadata store do not have an entry for the reset partitions boolean doReset = false; for (Entry resetPartitionOffset : resetKafkaMetadata.getKafkaPartitions() - .getPartitionOffsetMap() - .entrySet()) { + .getPartitionOffsetMap() + .entrySet()) { final Long partitionOffsetInMetadataStore = currentMetadata == null ? null : currentMetadata.getKafkaPartitions() @@ -1034,24 +1034,9 @@ private KafkaConsumer getKafkaConsumer() private void updatePartitionDataFromKafka() { - Map> topics; - try { - synchronized (consumerLock) { - topics = consumer.listTopics(); // updates the consumer's list of partitions from the brokers - } - } - catch (Exception e) { // calls to the consumer throw NPEs when the broker doesn't respond - log.warn( - e, - "Unable to get partition data from Kafka for brokers [%s], are the brokers up?", - ioConfig.getConsumerProperties().get(KafkaSupervisorIOConfig.BOOTSTRAP_SERVERS_KEY) - ); - return; - } - - List partitions = topics.get(ioConfig.getTopic()); + List partitions = consumer.partitionsFor(ioConfig.getTopic()); if (partitions == null) { - log.warn("No such topic [%s] found, list of discovered topics [%s]", ioConfig.getTopic(), topics.keySet()); + log.warn("No such topic [%s] found", ioConfig.getTopic()); } int numPartitions = (partitions != null ? partitions.size() : 0); @@ -1101,7 +1086,7 @@ private void discoverTasks() throws ExecutionException, InterruptedException, Ti taskCount++; final KafkaIndexTask kafkaTask = (KafkaIndexTask) task; final String taskId = task.getId(); - + // Determine which task group this task belongs to based on one of the partitions handled by this task. If we // later determine that this task is actively reading, we will make sure that it matches our current partition // allocation (getTaskGroupIdForPartition(partition) should return the same value for every partition being read @@ -2263,16 +2248,17 @@ private Runnable buildRunTask() private void updateLatestOffsetsFromKafka() { synchronized (consumerLock) { - final Map> topics = consumer.listTopics(); + final List partitionInfoList = consumer.partitionsFor(ioConfig.getTopic()); - if (topics == null || !topics.containsKey(ioConfig.getTopic())) { + if (partitionInfoList == null || partitionInfoList.size() == 0) { throw new ISE("Could not retrieve partitions for topic [%s]", ioConfig.getTopic()); } - final Set topicPartitions = topics.get(ioConfig.getTopic()) - .stream() - .map(x -> new TopicPartition(x.topic(), x.partition())) - .collect(Collectors.toSet()); + final Set topicPartitions = partitionInfoList + .stream() + .map(x -> new TopicPartition(x.topic(), x.partition())) + .collect(Collectors.toSet()); + consumer.assign(topicPartitions); consumer.seekToEnd(topicPartitions); From db750e0dcf81ad387593424cccb0e62a0f7a79b5 Mon Sep 17 00:00:00 2001 From: liuboyu Date: Mon, 15 Oct 2018 09:50:22 +0800 Subject: [PATCH 2/7] add consumerLock and exception handling for consumer.partitionFor() and remove some useless checks --- .../kafka/supervisor/KafkaSupervisor.java | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index e73eaff3a097..0b029d9c0409 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -1034,11 +1034,22 @@ private KafkaConsumer getKafkaConsumer() private void updatePartitionDataFromKafka() { - List partitions = consumer.partitionsFor(ioConfig.getTopic()); - if (partitions == null) { - log.warn("No such topic [%s] found", ioConfig.getTopic()); + List partitions; + try { + synchronized (consumerLock) { + partitions = consumer.partitionsFor(ioConfig.getTopic()); + } } - int numPartitions = (partitions != null ? partitions.size() : 0); + catch (Exception e) { + log.warn( + e, + "Unable to get partition data from Kafka for brokers [%s], are the brokers up?", + ioConfig.getConsumerProperties().get(KafkaSupervisorIOConfig.BOOTSTRAP_SERVERS_KEY) + ); + return; + } + + int numPartitions = partitions.size(); log.debug("Found [%d] Kafka partitions for topic [%s]", numPartitions, ioConfig.getTopic()); From 417834f6630caf314c0fb8019c8e883582cb43b7 Mon Sep 17 00:00:00 2001 From: liuboyu Date: Mon, 15 Oct 2018 11:55:28 +0800 Subject: [PATCH 3/7] add check in case consumer.partitionsFor() returns null --- .../apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 0b029d9c0409..57d6feb27c9f 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -1049,7 +1049,7 @@ private void updatePartitionDataFromKafka() return; } - int numPartitions = partitions.size(); + int numPartitions = (partitions != null ? partitions.size() : 0); log.debug("Found [%d] Kafka partitions for topic [%s]", numPartitions, ioConfig.getTopic()); From f839d09e1ed6a3c73bb5ba524b26ee83c0e79c41 Mon Sep 17 00:00:00 2001 From: liuboyu Date: Wed, 17 Oct 2018 13:37:34 +0800 Subject: [PATCH 4/7] fix CI failure --- .idea/inspectionProfiles/Druid.xml | 22 ++++++------------- .../inspectionProfiles/profiles_settings.xml | 1 + .../kafka/supervisor/KafkaSupervisorTest.java | 10 +++++++-- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml index 38d6033a81a1..cbecba765046 100644 --- a/.idea/inspectionProfiles/Druid.xml +++ b/.idea/inspectionProfiles/Druid.xml @@ -63,7 +63,6 @@ - @@ -156,26 +155,19 @@ - - - - + @@ -187,8 +179,8 @@ - - + +