diff --git a/contrib/hadoop-consumer/README b/contrib/hadoop-consumer/README index 5395d38ff9acc..4aec6f3fa6632 100644 --- a/contrib/hadoop-consumer/README +++ b/contrib/hadoop-consumer/README @@ -58,9 +58,12 @@ your hadoop installation directory. -1 means no limitation. hdfs.default.classpath.dir : hdfs location of jars - 2) copy jars into hdfs + 2) downloads neccessary jars + ./import-jars.sh + + 3) copy jars into hdfs ./copy-jars.sh ${hdfs.default.classpath.dir} - 2) Fetch data + 4) Fetch data ./run-class.sh kafka.etl.impl.SimpleKafkaETLJob test/test.properties diff --git a/contrib/hadoop-consumer/copy-jars.sh b/contrib/hadoop-consumer/copy-jars.sh index e5de1dda7b819..0769998f71629 100755 --- a/contrib/hadoop-consumer/copy-jars.sh +++ b/contrib/hadoop-consumer/copy-jars.sh @@ -30,34 +30,6 @@ $hadoop fs -rmr $1 echo "$hadoop fs -mkdir $1" $hadoop fs -mkdir $1 -# include kafka jars -for file in $base_dir/contrib/hadoop-consumer/target/scala_2.8.0/*.jar; -do - echo "$hadoop fs -put $file $1/" - $hadoop fs -put $file $1/ -done - -# include kafka jars -echo "$hadoop fs -put $base_dir/core/target/scala_2.8.0/kafka-*.jar; $1/" -$hadoop fs -put $base_dir/core/target/scala_2.8.0/kafka-*.jar $1/ - -# include core lib jars -for file in $base_dir/core/lib/*.jar; -do - echo "$hadoop fs -put $file $1/" - $hadoop fs -put $file $1/ -done - -for file in $base_dir/core/lib_managed/scala_2.8.0/compile/*.jar; -do - echo "$hadoop fs -put $file $1/" - $hadoop fs -put $file $1/ -done - -# include scala library jar -echo "$hadoop fs -put $base_dir/project/boot/scala-2.8.0/lib/scala-library.jar; $1/" -$hadoop fs -put $base_dir/project/boot/scala-2.8.0/lib/scala-library.jar $1/ - local_dir=$(dirname $0) # include hadoop-consumer jars diff --git a/contrib/hadoop-consumer/import-jars.sh b/contrib/hadoop-consumer/import-jars.sh new file mode 100644 index 0000000000000..dc1ba9dcafc74 --- /dev/null +++ b/contrib/hadoop-consumer/import-jars.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +find lib/ -not -name piggybank.jar | xargs rm + +wget -P lib/ \ +http://repo1.maven.org/maven2/org/apache/hadoop/hadoop-core/1.2.1/hadoop-core-1.2.1.jar \ +http://repo1.maven.org/maven2/commons-io/commons-io/2.4/commons-io-2.4.jar \ +http://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.6/slf4j-api-1.7.6.jar \ +http://repo1.maven.org/maven2/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar \ +http://repo1.maven.org/maven2/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar \ +http://repo1.maven.org/maven2/commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar \ +http://repo1.maven.org/maven2/commons-httpclient/commons-httpclient/3.1/commons-httpclient-3.1.jar \ +http://repo1.maven.org/maven2/log4j/log4j/1.2.16/log4j-1.2.16.jar \ +http://repo1.maven.org/maven2/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar \ +http://repo1.maven.org/maven2/commons-lang/commons-lang/2.6/commons-lang-2.6.jar \ +http://repo1.maven.org/maven2/commons-configuration/commons-configuration/1.10/commons-configuration-1.10.jar \ +http://repo1.maven.org/maven2/org/scala-lang/scala-library/2.8.0/scala-library-2.8.0.jar + +cp ../../core/target/scala-2.8.0/kafka_2.8.0-0.8.0.jar lib diff --git a/contrib/hadoop-consumer/run-class.sh b/contrib/hadoop-consumer/run-class.sh index bfb4744ceeb49..127537b82c5cd 100755 --- a/contrib/hadoop-consumer/run-class.sh +++ b/contrib/hadoop-consumer/run-class.sh @@ -36,7 +36,7 @@ done local_dir=$(dirname $0) # include hadoop-consumer jars -for file in $base_dir/contrib/hadoop-consumer/target/scala_2.8.0/*.jar; +for file in $base_dir/contrib/hadoop-consumer/target/*.jar; do CLASSPATH=$CLASSPATH:$file done diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java index 4b1d117462d10..d7283e6d6161e 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java @@ -27,7 +27,6 @@ import kafka.etl.KafkaETLRequest; import kafka.etl.Props; import kafka.javaapi.producer.Producer; -import kafka.message.Message; import kafka.producer.ProducerConfig; import kafka.producer.KeyedMessage; import org.apache.hadoop.fs.FileSystem; @@ -86,8 +85,7 @@ public void run() throws Exception { Long timestamp = RANDOM.nextLong(); if (timestamp < 0) timestamp = -timestamp; byte[] bytes = timestamp.toString().getBytes("UTF8"); - Message message = new Message(bytes); - list.add(new KeyedMessage(_topic, null, message)); + list.add(new KeyedMessage(_topic, null, bytes)); } // send events System.out.println(" send " + list.size() + " " + _topic + " count events to " + _uri); diff --git a/project/Build.scala b/project/Build.scala index 05f54b9f40365..61737117b7b5f 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -84,7 +84,7 @@ object KafkaBuild extends Build { "commons-logging" % "commons-logging" % "1.0.4", "org.codehaus.jackson" % "jackson-core-asl" % "1.5.5", "org.codehaus.jackson" % "jackson-mapper-asl" % "1.5.5", - "org.apache.hadoop" % "hadoop-core" % "0.20.2" + "org.apache.hadoop" % "hadoop-core" % "1.2.1" ), ivyXML := @@ -94,7 +94,7 @@ object KafkaBuild extends Build { - + diff --git a/project/build/KafkaProject.scala b/project/build/KafkaProject.scala index cd406c188688f..dcec3b2fedd66 100644 --- a/project/build/KafkaProject.scala +++ b/project/build/KafkaProject.scala @@ -190,7 +190,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje - + @@ -213,7 +213,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje - + @@ -242,7 +242,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje val commonsLogging = "commons-logging" % "commons-logging" % "1.0.4" val jacksonCore = "org.codehaus.jackson" % "jackson-core-asl" % "1.5.5" val jacksonMapper = "org.codehaus.jackson" % "jackson-mapper-asl" % "1.5.5" - val hadoop = "org.apache.hadoop" % "hadoop-core" % "0.20.2" + val hadoop = "org.apache.hadoop" % "hadoop-core" % "1.2.1" } trait CompressionDependencies {