diff --git a/extensions-core/druid-kerberos/pom.xml b/extensions-core/druid-kerberos/pom.xml
index b6fad3510f58..6fbdb5459084 100644
--- a/extensions-core/druid-kerberos/pom.xml
+++ b/extensions-core/druid-kerberos/pom.xml
@@ -71,6 +71,7 @@
org.apache.hadoop
hadoop-common
${hadoop.compile.version}
+ compile
commons-cli
diff --git a/extensions-core/hdfs-storage/pom.xml b/extensions-core/hdfs-storage/pom.xml
index c60787717db9..f8eda5d7eca8 100644
--- a/extensions-core/hdfs-storage/pom.xml
+++ b/extensions-core/hdfs-storage/pom.xml
@@ -151,6 +151,130 @@
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.compile.version}
+ compile
+
+
+ commons-cli
+ commons-cli
+
+
+ commons-httpclient
+ commons-httpclient
+
+
+ log4j
+ log4j
+
+
+ commons-codec
+ commons-codec
+
+
+ commons-logging
+ commons-logging
+
+
+ commons-io
+ commons-io
+
+
+ commons-lang
+ commons-lang
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+ org.apache.httpcomponents
+ httpcore
+
+
+ org.codehaus.jackson
+ jackson-core-asl
+
+
+ org.codehaus.jackson
+ jackson-mapper-asl
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ javax.ws.rs
+ jsr311-api
+
+
+ com.google.code.findbugs
+ jsr305
+
+
+ org.mortbay.jetty
+ jetty-util
+
+
+ org.apache.hadoop
+ hadoop-annotations
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+ com.sun.jersey
+ jersey-core
+
+
+ org.apache.curator
+ curator-client
+
+
+ org.apache.commons
+ commons-math3
+
+
+ com.google.guava
+ guava
+
+
+ org.apache.avro
+ avro
+
+
+ net.java.dev.jets3t
+ jets3t
+
+
+ com.sun.jersey
+ jersey-json
+
+
+ com.jcraft
+ jsch
+
+
+ org.mortbay.jetty
+ jetty
+
+
+ com.sun.jersey
+ jersey-server
+
+
+
org.apache.hadoop
hadoop-aws
@@ -164,6 +288,13 @@
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.compile.version}
+ tests
+ test
+
junit
junit
@@ -189,13 +320,6 @@
tests
test
-
- org.apache.hadoop
- hadoop-common
- ${hadoop.compile.version}
- tests
- test
-
org.apache.hadoop
hadoop-hdfs
diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index 1fd923c99bfe..aecacbf7f9d4 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -83,6 +83,22 @@
com.google.code.findbugs
jsr305
+
+ org.apache.hadoop
+ hadoop-common
+ provided
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+ provided
+
+
+ javax.servlet
+ servlet-api
+
+
+
@@ -130,22 +146,6 @@
${hadoop.compile.version}
test
-
- org.apache.hadoop
- hadoop-common
- provided
-
-
- org.apache.hadoop
- hadoop-mapreduce-client-core
- provided
-
-
- javax.servlet
- servlet-api
-
-
-
org.apache.druid
druid-server
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index 398ed96a2f39..62c23e734a53 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -430,21 +430,12 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
@Override
public void stopGracefully(TaskConfig taskConfig)
{
- final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
- File hadoopJobIdFile = new File(getHadoopJobIdFileName());
- String jobId = null;
+ // To avoid issue of kill command once the ingestion task is actually completed
+ if (!ingestionState.equals(IngestionState.COMPLETED)) {
+ final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
+ String hadoopJobIdFile = getHadoopJobIdFileName();
- try {
- if (hadoopJobIdFile.exists()) {
- jobId = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(hadoopJobIdFile, String.class);
- }
- }
- catch (Exception e) {
- log.warn(e, "exeption while reading Hadoop Job ID from: %s", hadoopJobIdFile);
- }
-
- try {
- if (jobId != null) {
+ try {
ClassLoader loader = HadoopTask.buildClassLoader(getHadoopDependencyCoordinates(),
taskConfig.getDefaultHadoopCoordinates());
@@ -452,28 +443,28 @@ public void stopGracefully(TaskConfig taskConfig)
"org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner",
loader
);
+
String[] buildKillJobInput = new String[]{
- "-kill",
- jobId
+ hadoopJobIdFile
};
Class> buildKillJobRunnerClass = killMRJobInnerProcessingRunner.getClass();
Method innerProcessingRunTask = buildKillJobRunnerClass.getMethod("runTask", buildKillJobInput.getClass());
Thread.currentThread().setContextClassLoader(loader);
- final String killStatusString = (String) innerProcessingRunTask.invoke(
+ final String killStatusString[] = (String[]) innerProcessingRunTask.invoke(
killMRJobInnerProcessingRunner,
new Object[]{buildKillJobInput}
);
- log.info(StringUtils.format("Tried killing job %s , status: %s", jobId, killStatusString));
+ log.info(StringUtils.format("Tried killing job: [%s], status: [%s]", killStatusString[0], killStatusString[1]));
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(oldLoader);
}
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- finally {
- Thread.currentThread().setContextClassLoader(oldLoader);
}
}
@@ -722,10 +713,29 @@ public Map getStats()
@SuppressWarnings("unused")
public static class HadoopKillMRJobIdProcessingRunner
{
- public String runTask(String[] args) throws Exception
+ public String[] runTask(String[] args) throws Exception
{
- int res = ToolRunner.run(new JobClient(), args);
- return res == 0 ? "Success" : "Fail";
+ File hadoopJobIdFile = new File(args[0]);
+ String jobId = null;
+
+ try {
+ if (hadoopJobIdFile.exists()) {
+ jobId = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(hadoopJobIdFile, String.class);
+ }
+ }
+ catch (Exception e) {
+ log.warn(e, "exeption while reading hadoop job id from: [%s]", hadoopJobIdFile);
+ }
+
+ if (jobId != null) {
+ int res = ToolRunner.run(new JobClient(), new String[]{
+ "-kill",
+ jobId
+ });
+
+ return new String[] {jobId, (res == 0 ? "Success" : "Fail")};
+ }
+ return new String[] {jobId, "Fail"};
}
}