diff --git a/extensions-core/kafka-eight/pom.xml b/extensions-core/kafka-eight/pom.xml deleted file mode 100644 index 155aae49763b..000000000000 --- a/extensions-core/kafka-eight/pom.xml +++ /dev/null @@ -1,73 +0,0 @@ - - - - - 4.0.0 - - io.druid.extensions - druid-kafka-eight - druid-kafka-eight - druid-kafka-eight - - - io.druid - druid - 0.13.0-SNAPSHOT - ../../pom.xml - - - - - io.druid - druid-api - ${project.parent.version} - provided - - - org.apache.kafka - kafka_2.10 - 0.8.2.1 - - - log4j - log4j - - - org.apache.zookeeper - zookeeper - - - org.slf4j - slf4j-api - - - net.jpountz.lz4 - lz4 - - - - - - - junit - junit - test - - - - diff --git a/extensions-core/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightDruidModule.java b/extensions-core/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightDruidModule.java deleted file mode 100644 index 24fda7cf4bd8..000000000000 --- a/extensions-core/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightDruidModule.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.firehose.kafka; - -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.common.collect.ImmutableList; -import com.google.inject.Binder; -import io.druid.initialization.DruidModule; - -import java.util.List; - -/** - */ -public class KafkaEightDruidModule implements DruidModule -{ - @Override - public List getJacksonModules() - { - return ImmutableList.of( - new SimpleModule("KafkaEightFirehoseModule") - .registerSubtypes( - new NamedType(KafkaEightFirehoseFactory.class, "kafka-0.8") - ) - ); - } - - @Override - public void configure(Binder binder) - { - - } -} diff --git a/extensions-core/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java b/extensions-core/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java deleted file mode 100644 index c50badd4c7cd..000000000000 --- a/extensions-core/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.firehose.kafka; - - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Sets; -import io.druid.data.input.Firehose; -import io.druid.data.input.FirehoseFactory; -import io.druid.data.input.InputRow; -import io.druid.data.input.impl.InputRowParser; -import io.druid.java.util.common.logger.Logger; -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.InvalidMessageException; - -import javax.annotation.Nullable; -import java.io.File; -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - -/** - */ -public class KafkaEightFirehoseFactory implements FirehoseFactory> -{ - private static final Logger log = new Logger(KafkaEightFirehoseFactory.class); - - @JsonProperty - private final Properties consumerProps; - - @JsonProperty - private final String feed; - - @JsonCreator - public KafkaEightFirehoseFactory( - @JsonProperty("consumerProps") Properties consumerProps, - @JsonProperty("feed") String feed - - ) - { - this.consumerProps = consumerProps; - this.feed = feed; - } - - @Override - public Firehose connect(final InputRowParser firehoseParser, File temporaryDirectory) - { - Set newDimExclus = Sets.union( - firehoseParser.getParseSpec().getDimensionsSpec().getDimensionExclusions(), - Sets.newHashSet("feed") - ); - - final InputRowParser theParser = firehoseParser.withParseSpec( - firehoseParser.getParseSpec() - .withDimensionsSpec( - firehoseParser.getParseSpec() - .getDimensionsSpec() - .withDimensionExclusions( - newDimExclus - ) - ) - ); - - final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps)); - - final Map>> streams = connector.createMessageStreams( - ImmutableMap.of( - feed, - 1 - ) - ); - - final List> streamList = streams.get(feed); - if (streamList == null || streamList.size() != 1) { - return null; - } - - final KafkaStream stream = streamList.get(0); - final ConsumerIterator iter = stream.iterator(); - - return new Firehose() - { - Iterator nextIterator = Collections.emptyIterator(); - - @Override - public boolean hasMore() - { - return nextIterator.hasNext() || iter.hasNext(); - } - - @Nullable - @Override - public InputRow nextRow() - { - try { - if (!nextIterator.hasNext()) { - final byte[] message = iter.next().message(); - - if (message == null) { - return null; - } - nextIterator = theParser.parseBatch(ByteBuffer.wrap(message)).iterator(); - } - - return nextIterator.next(); - - } - catch (InvalidMessageException e) { - /* - IF the CRC is caused within the wire transfer, this is not the best way to handel CRC. - Probably it is better to shutdown the fireHose without commit and start it again. - */ - log.error(e, "Message failed its checksum and it is corrupt, will skip it"); - return null; - } - } - - @Override - public Runnable commit() - { - return new Runnable() - { - @Override - public void run() - { - /* - This is actually not going to do exactly what we want, cause it will be called asynchronously - after the persist is complete. So, it's going to commit that it's processed more than was actually - persisted. This is unfortunate, but good enough for now. Should revisit along with an upgrade - of our Kafka version. - */ - - log.info("committing offsets"); - connector.commitOffsets(); - } - }; - } - - @Override - public void close() - { - connector.shutdown(); - } - }; - } - -} diff --git a/extensions-core/kafka-eight/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-core/kafka-eight/src/main/resources/META-INF/services/io.druid.initialization.DruidModule deleted file mode 100644 index f56f6152eba4..000000000000 --- a/extensions-core/kafka-eight/src/main/resources/META-INF/services/io.druid.initialization.DruidModule +++ /dev/null @@ -1 +0,0 @@ -io.druid.firehose.kafka.KafkaEightDruidModule \ No newline at end of file diff --git a/pom.xml b/pom.xml index 4dc14357f053..a2fc486a1d58 100644 --- a/pom.xml +++ b/pom.xml @@ -113,7 +113,6 @@ extensions-core/hdfs-storage extensions-core/histogram extensions-core/stats - extensions-core/kafka-eight extensions-core/kafka-extraction-namespace extensions-core/kafka-indexing-service extensions-core/mysql-metadata-storage