diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index 84900617bf7b..5cb3232cf887 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -17,9 +17,9 @@ currently designated as an *experimental feature* and is subject to the usual [experimental caveats](../experimental.html).
-The Kafka indexing service uses the Java consumer that was introduced in Kafka 0.9. As there were protocol changes -made in this version, Kafka 0.9 consumers are not compatible with older brokers. Ensure that your Kafka brokers are -version 0.9 or better before using this service. +The Kafka indexing service uses the Java consumer that was introduced in Kafka 0.10.x. As there were protocol changes +made in this version, Kafka 0.10.x consumers might not be compatible with older brokers. Ensure that your Kafka brokers are +version 0.10.x or better before using this service. Refer Kafka upgrade guide if you are using older version of kafka brokers.
## Submitting a Supervisor Spec diff --git a/extensions-core/kafka-indexing-service/pom.xml b/extensions-core/kafka-indexing-service/pom.xml index 263d78061b6a..e313f1273562 100644 --- a/extensions-core/kafka-indexing-service/pom.xml +++ b/extensions-core/kafka-indexing-service/pom.xml @@ -55,7 +55,7 @@ org.apache.kafka kafka-clients - 0.9.0.1 + 0.10.2.0 @@ -67,7 +67,7 @@ org.apache.kafka kafka_2.11 - 0.9.0.0 + 0.10.2.0 test diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index b344b8a3e9b5..eef8aac5d714 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -96,6 +96,7 @@ import java.util.Properties; import java.util.Random; import java.util.Set; +import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -1008,7 +1009,7 @@ private void possiblyResetOffsetsOrWait( final TopicPartition topicPartition = outOfRangePartition.getKey(); final long nextOffset = outOfRangePartition.getValue(); // seek to the beginning to get the least available offset - consumer.seekToBeginning(topicPartition); + consumer.seekToBeginning(Collections.singletonList(topicPartition)); final long leastAvailableOffset = consumer.position(topicPartition); // reset the seek consumer.seek(topicPartition, nextOffset); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 2ec0a8b1fad1..a3a55e3f792d 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -77,6 +77,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.HashMap; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -1480,13 +1481,13 @@ private long getOffsetFromKafkaForPartition(int partition, boolean useEarliestOf { TopicPartition topicPartition = new TopicPartition(ioConfig.getTopic(), partition); if (!consumer.assignment().contains(topicPartition)) { - consumer.assign(Lists.newArrayList(topicPartition)); + consumer.assign(Collections.singletonList(topicPartition)); } if (useEarliestOffset) { - consumer.seekToBeginning(topicPartition); + consumer.seekToBeginning(Collections.singletonList(topicPartition)); } else { - consumer.seekToEnd(topicPartition); + consumer.seekToEnd(Collections.singletonList(topicPartition)); } return consumer.position(topicPartition); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/test/TestBroker.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/test/TestBroker.java index d879b4d4739e..f63de29e014d 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/test/TestBroker.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/test/TestBroker.java @@ -23,12 +23,13 @@ import com.google.common.collect.Maps; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; -import kafka.utils.SystemTime$; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.utils.SystemTime; import scala.Some; +import scala.collection.immutable.List$; import java.io.Closeable; import java.io.File; @@ -69,7 +70,7 @@ public void start() final KafkaConfig config = new KafkaConfig(props); - server = new KafkaServer(config, SystemTime$.MODULE$, Some.apply(String.format("TestingBroker[%d]-", id))); + server = new KafkaServer(config, SystemTime.SYSTEM, Some.apply(String.format("TestingBroker[%d]-", id)), List$.MODULE$.empty()); server.startup(); }