Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions contrib/hadoop-consumer/README
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,12 @@ your hadoop installation directory.
-1 means no limitation.
hdfs.default.classpath.dir : hdfs location of jars

2) copy jars into hdfs
2) downloads neccessary jars
./import-jars.sh

3) copy jars into hdfs
./copy-jars.sh ${hdfs.default.classpath.dir}

2) Fetch data
4) Fetch data
./run-class.sh kafka.etl.impl.SimpleKafkaETLJob test/test.properties

28 changes: 0 additions & 28 deletions contrib/hadoop-consumer/copy-jars.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,34 +30,6 @@ $hadoop fs -rmr $1
echo "$hadoop fs -mkdir $1"
$hadoop fs -mkdir $1

# include kafka jars
for file in $base_dir/contrib/hadoop-consumer/target/scala_2.8.0/*.jar;
do
echo "$hadoop fs -put $file $1/"
$hadoop fs -put $file $1/
done

# include kafka jars
echo "$hadoop fs -put $base_dir/core/target/scala_2.8.0/kafka-*.jar; $1/"
$hadoop fs -put $base_dir/core/target/scala_2.8.0/kafka-*.jar $1/

# include core lib jars
for file in $base_dir/core/lib/*.jar;
do
echo "$hadoop fs -put $file $1/"
$hadoop fs -put $file $1/
done

for file in $base_dir/core/lib_managed/scala_2.8.0/compile/*.jar;
do
echo "$hadoop fs -put $file $1/"
$hadoop fs -put $file $1/
done

# include scala library jar
echo "$hadoop fs -put $base_dir/project/boot/scala-2.8.0/lib/scala-library.jar; $1/"
$hadoop fs -put $base_dir/project/boot/scala-2.8.0/lib/scala-library.jar $1/

local_dir=$(dirname $0)

# include hadoop-consumer jars
Expand Down
19 changes: 19 additions & 0 deletions contrib/hadoop-consumer/import-jars.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash

find lib/ -not -name piggybank.jar | xargs rm

wget -P lib/ \
http://repo1.maven.org/maven2/org/apache/hadoop/hadoop-core/1.2.1/hadoop-core-1.2.1.jar \
http://repo1.maven.org/maven2/commons-io/commons-io/2.4/commons-io-2.4.jar \
http://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.6/slf4j-api-1.7.6.jar \
http://repo1.maven.org/maven2/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar \
http://repo1.maven.org/maven2/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar \
http://repo1.maven.org/maven2/commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar \
http://repo1.maven.org/maven2/commons-httpclient/commons-httpclient/3.1/commons-httpclient-3.1.jar \
http://repo1.maven.org/maven2/log4j/log4j/1.2.16/log4j-1.2.16.jar \
http://repo1.maven.org/maven2/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar \
http://repo1.maven.org/maven2/commons-lang/commons-lang/2.6/commons-lang-2.6.jar \
http://repo1.maven.org/maven2/commons-configuration/commons-configuration/1.10/commons-configuration-1.10.jar \
http://repo1.maven.org/maven2/org/scala-lang/scala-library/2.8.0/scala-library-2.8.0.jar

cp ../../core/target/scala-2.8.0/kafka_2.8.0-0.8.0.jar lib
2 changes: 1 addition & 1 deletion contrib/hadoop-consumer/run-class.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ done
local_dir=$(dirname $0)

# include hadoop-consumer jars
for file in $base_dir/contrib/hadoop-consumer/target/scala_2.8.0/*.jar;
for file in $base_dir/contrib/hadoop-consumer/target/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import kafka.etl.KafkaETLRequest;
import kafka.etl.Props;
import kafka.javaapi.producer.Producer;
import kafka.message.Message;
import kafka.producer.ProducerConfig;
import kafka.producer.KeyedMessage;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -86,8 +85,7 @@ public void run() throws Exception {
Long timestamp = RANDOM.nextLong();
if (timestamp < 0) timestamp = -timestamp;
byte[] bytes = timestamp.toString().getBytes("UTF8");
Message message = new Message(bytes);
list.add(new KeyedMessage<Integer, Message>(_topic, null, message));
list.add(new KeyedMessage<byte[], byte[]>(_topic, null, bytes));
}
// send events
System.out.println(" send " + list.size() + " " + _topic + " count events to " + _uri);
Expand Down
4 changes: 2 additions & 2 deletions project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ object KafkaBuild extends Build {
"commons-logging" % "commons-logging" % "1.0.4",
"org.codehaus.jackson" % "jackson-core-asl" % "1.5.5",
"org.codehaus.jackson" % "jackson-mapper-asl" % "1.5.5",
"org.apache.hadoop" % "hadoop-core" % "0.20.2"
"org.apache.hadoop" % "hadoop-core" % "1.2.1"
),
ivyXML :=
<dependencies>
Expand All @@ -94,7 +94,7 @@ object KafkaBuild extends Build {
<exclude module="jmxtools"/>
<exclude module="mail"/>
<exclude module="jms"/>
<dependency org="org.apache.hadoop" name="hadoop-core" rev="0.20.2">
<dependency org="org.apache.hadoop" name="hadoop-core" rev="1.2.1">
<exclude org="junit" module="junit"/>
</dependency>
<dependency org="org.apache.pig" name="pig" rev="0.8.0">
Expand Down
6 changes: 3 additions & 3 deletions project/build/KafkaProject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
<exclude module="jmxtools"/>
<exclude module="mail"/>
<exclude module="jms"/>
<dependency org="org.apache.hadoop" name="hadoop-core" rev="0.20.2">
<dependency org="org.apache.hadoop" name="hadoop-core" rev="1.2.1">
<exclude module="junit"/>
</dependency>
<dependency org="org.apache.pig" name="pig" rev="0.10.0">
Expand All @@ -213,7 +213,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
<exclude module="mail"/>
<exclude module="jms"/>
<exclude module=""/>
<dependency org="org.apache.hadoop" name="hadoop-core" rev="0.20.2">
<dependency org="org.apache.hadoop" name="hadoop-core" rev="1.2.1">
<exclude module="junit"/>
</dependency>
<dependency org="org.apache.pig" name="pig" rev="0.8.0">
Expand Down Expand Up @@ -242,7 +242,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
val commonsLogging = "commons-logging" % "commons-logging" % "1.0.4"
val jacksonCore = "org.codehaus.jackson" % "jackson-core-asl" % "1.5.5"
val jacksonMapper = "org.codehaus.jackson" % "jackson-mapper-asl" % "1.5.5"
val hadoop = "org.apache.hadoop" % "hadoop-core" % "0.20.2"
val hadoop = "org.apache.hadoop" % "hadoop-core" % "1.2.1"
}

trait CompressionDependencies {
Expand Down