From 50b5fd497ec8cb1fa479da3d29a6f05ce597b682 Mon Sep 17 00:00:00 2001 From: Satish Bhor Date: Sat, 25 Mar 2017 21:15:03 +0530 Subject: [PATCH 1/4] Fix lz4 library incompatibility in kafka-indexing-service extension #3266 --- extensions-core/kafka-indexing-service/pom.xml | 4 ++-- .../src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java | 2 +- .../io/druid/indexing/kafka/supervisor/KafkaSupervisor.java | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/extensions-core/kafka-indexing-service/pom.xml b/extensions-core/kafka-indexing-service/pom.xml index 263d78061b6a..df0b31033376 100644 --- a/extensions-core/kafka-indexing-service/pom.xml +++ b/extensions-core/kafka-indexing-service/pom.xml @@ -55,7 +55,7 @@ org.apache.kafka kafka-clients - 0.9.0.1 + 0.10.0.0 @@ -67,7 +67,7 @@ org.apache.kafka kafka_2.11 - 0.9.0.0 + 0.10.0.0 test diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index b344b8a3e9b5..c1a5fd925ef7 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -1008,7 +1008,7 @@ private void possiblyResetOffsetsOrWait( final TopicPartition topicPartition = outOfRangePartition.getKey(); final long nextOffset = outOfRangePartition.getValue(); // seek to the beginning to get the least available offset - consumer.seekToBeginning(topicPartition); + consumer.seekToBeginning(Lists.newArrayList(topicPartition)); final long leastAvailableOffset = consumer.position(topicPartition); // reset the seek consumer.seek(topicPartition, nextOffset); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 2ec0a8b1fad1..e20b7c795afb 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -1484,9 +1484,9 @@ private long getOffsetFromKafkaForPartition(int partition, boolean useEarliestOf } if (useEarliestOffset) { - consumer.seekToBeginning(topicPartition); + consumer.seekToBeginning(Lists.newArrayList(topicPartition)); } else { - consumer.seekToEnd(topicPartition); + consumer.seekToEnd(Lists.newArrayList(topicPartition)); } return consumer.position(topicPartition); From b53e38ba701bf21c18872349f112e885d316de88 Mon Sep 17 00:00:00 2001 From: Satish Bhor Date: Sun, 26 Mar 2017 13:00:36 +0530 Subject: [PATCH 2/4] Bumped Kafka version to 0.10.2.0 for : Fix lz4 library incompatibility in kafka-indexing-service extension #3266 --- docs/content/development/extensions-core/kafka-ingestion.md | 6 +++--- extensions-core/kafka-indexing-service/pom.xml | 4 ++-- .../test/java/io/druid/indexing/kafka/test/TestBroker.java | 5 +++-- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index 84900617bf7b..5cb3232cf887 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -17,9 +17,9 @@ currently designated as an *experimental feature* and is subject to the usual [experimental caveats](../experimental.html).
-The Kafka indexing service uses the Java consumer that was introduced in Kafka 0.9. As there were protocol changes -made in this version, Kafka 0.9 consumers are not compatible with older brokers. Ensure that your Kafka brokers are -version 0.9 or better before using this service. +The Kafka indexing service uses the Java consumer that was introduced in Kafka 0.10.x. As there were protocol changes +made in this version, Kafka 0.10.x consumers might not be compatible with older brokers. Ensure that your Kafka brokers are +version 0.10.x or better before using this service. Refer Kafka upgrade guide if you are using older version of kafka brokers.
## Submitting a Supervisor Spec diff --git a/extensions-core/kafka-indexing-service/pom.xml b/extensions-core/kafka-indexing-service/pom.xml index df0b31033376..e313f1273562 100644 --- a/extensions-core/kafka-indexing-service/pom.xml +++ b/extensions-core/kafka-indexing-service/pom.xml @@ -55,7 +55,7 @@ org.apache.kafka kafka-clients - 0.10.0.0 + 0.10.2.0 @@ -67,7 +67,7 @@ org.apache.kafka kafka_2.11 - 0.10.0.0 + 0.10.2.0 test diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/test/TestBroker.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/test/TestBroker.java index d879b4d4739e..f63de29e014d 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/test/TestBroker.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/test/TestBroker.java @@ -23,12 +23,13 @@ import com.google.common.collect.Maps; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; -import kafka.utils.SystemTime$; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.utils.SystemTime; import scala.Some; +import scala.collection.immutable.List$; import java.io.Closeable; import java.io.File; @@ -69,7 +70,7 @@ public void start() final KafkaConfig config = new KafkaConfig(props); - server = new KafkaServer(config, SystemTime$.MODULE$, Some.apply(String.format("TestingBroker[%d]-", id))); + server = new KafkaServer(config, SystemTime.SYSTEM, Some.apply(String.format("TestingBroker[%d]-", id)), List$.MODULE$.empty()); server.startup(); } From 0ddbfbed97ef558d37c22543e262a3ad255dd89e Mon Sep 17 00:00:00 2001 From: Satish Bhor Date: Fri, 7 Apr 2017 11:07:29 +0530 Subject: [PATCH 3/4] Replaced Lists.newArrayList() with Collections.singletonList() For Fix lz4 library incompatibility in kafka-indexing-service extension #4115 --- .../main/java/io/druid/indexing/kafka/KafkaIndexTask.java | 3 ++- .../druid/indexing/kafka/supervisor/KafkaSupervisor.java | 7 ++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index c1a5fd925ef7..eef8aac5d714 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -96,6 +96,7 @@ import java.util.Properties; import java.util.Random; import java.util.Set; +import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -1008,7 +1009,7 @@ private void possiblyResetOffsetsOrWait( final TopicPartition topicPartition = outOfRangePartition.getKey(); final long nextOffset = outOfRangePartition.getValue(); // seek to the beginning to get the least available offset - consumer.seekToBeginning(Lists.newArrayList(topicPartition)); + consumer.seekToBeginning(Collections.singletonList(topicPartition)); final long leastAvailableOffset = consumer.position(topicPartition); // reset the seek consumer.seek(topicPartition, nextOffset); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index e20b7c795afb..a3a55e3f792d 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -77,6 +77,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.HashMap; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -1480,13 +1481,13 @@ private long getOffsetFromKafkaForPartition(int partition, boolean useEarliestOf { TopicPartition topicPartition = new TopicPartition(ioConfig.getTopic(), partition); if (!consumer.assignment().contains(topicPartition)) { - consumer.assign(Lists.newArrayList(topicPartition)); + consumer.assign(Collections.singletonList(topicPartition)); } if (useEarliestOffset) { - consumer.seekToBeginning(Lists.newArrayList(topicPartition)); + consumer.seekToBeginning(Collections.singletonList(topicPartition)); } else { - consumer.seekToEnd(Lists.newArrayList(topicPartition)); + consumer.seekToEnd(Collections.singletonList(topicPartition)); } return consumer.position(topicPartition); From 8b35c35a9367c3c5b3fac399c91ea5f9fc870ff2 Mon Sep 17 00:00:00 2001 From: Satish Bhor Date: Tue, 2 May 2017 21:02:23 +0530 Subject: [PATCH 4/4] Fixed: Average Server Percent Used: NaN% Error when server startup is in progress #4214 --- .../main/resources/static/old-console/js/init-0.0.2.js | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/resources/static/old-console/js/init-0.0.2.js b/server/src/main/resources/static/old-console/js/init-0.0.2.js index c89ac19ce13a..fc6b07f01c1b 100644 --- a/server/src/main/resources/static/old-console/js/init-0.0.2.js +++ b/server/src/main/resources/static/old-console/js/init-0.0.2.js @@ -53,8 +53,12 @@ $(document).ready(function() { return ret; }()); var avg = serverTable.getColTotal('Server percentUsed') / serverTable.getNumRows(); - $('#avg_server_metric').html('Average Server Percent Used: ' + avg + '%'); - + if (!isNaN(avg)) { + $('#avg_server_metric').html('Average Server Percent Used: ' + avg + '%'); + }else{ + $('.loading').html('Server is still starting...Please try after few minutes.'); + $('.loading').show() + } serverTable.toHTMLTable($('#servers')); segmentTable.toHTMLTable($('#segments')); }