Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions extensions-core/druid-kerberos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.compile.version}</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>commons-cli</groupId>
Expand Down
138 changes: 131 additions & 7 deletions extensions-core/hdfs-storage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,130 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.compile.version}</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
<exclusion>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</exclusion>
<exclusion>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
<exclusion>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
Expand All @@ -164,6 +288,13 @@
</dependency>

<!-- Tests -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.compile.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand All @@ -189,13 +320,6 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.compile.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
Expand Down
32 changes: 16 additions & 16 deletions indexing-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,22 @@
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Tests -->
<dependency>
Expand Down Expand Up @@ -130,22 +146,6 @@
<version>${hadoop.compile.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,50 +430,41 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
@Override
public void stopGracefully(TaskConfig taskConfig)
{
final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
File hadoopJobIdFile = new File(getHadoopJobIdFileName());
String jobId = null;
// To avoid issue of kill command once the ingestion task is actually completed
if (!ingestionState.equals(IngestionState.COMPLETED)) {
final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
String hadoopJobIdFile = getHadoopJobIdFileName();

try {
if (hadoopJobIdFile.exists()) {
jobId = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(hadoopJobIdFile, String.class);
}
}
catch (Exception e) {
log.warn(e, "exeption while reading Hadoop Job ID from: %s", hadoopJobIdFile);
}

try {
if (jobId != null) {
try {
ClassLoader loader = HadoopTask.buildClassLoader(getHadoopDependencyCoordinates(),
taskConfig.getDefaultHadoopCoordinates());

Object killMRJobInnerProcessingRunner = getForeignClassloaderObject(
"org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner",
loader
);

String[] buildKillJobInput = new String[]{
"-kill",
jobId
hadoopJobIdFile
};

Class<?> buildKillJobRunnerClass = killMRJobInnerProcessingRunner.getClass();
Method innerProcessingRunTask = buildKillJobRunnerClass.getMethod("runTask", buildKillJobInput.getClass());

Thread.currentThread().setContextClassLoader(loader);
final String killStatusString = (String) innerProcessingRunTask.invoke(
final String killStatusString[] = (String[]) innerProcessingRunTask.invoke(
killMRJobInnerProcessingRunner,
new Object[]{buildKillJobInput}
);

log.info(StringUtils.format("Tried killing job %s , status: %s", jobId, killStatusString));
log.info(StringUtils.format("Tried killing job: [%s], status: [%s]", killStatusString[0], killStatusString[1]));
}
catch (Exception e) {
throw new RuntimeException(e);
}
finally {
Thread.currentThread().setContextClassLoader(oldLoader);
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
finally {
Thread.currentThread().setContextClassLoader(oldLoader);
}

}
Expand Down Expand Up @@ -722,10 +713,29 @@ public Map<String, Object> getStats()
@SuppressWarnings("unused")
public static class HadoopKillMRJobIdProcessingRunner
{
public String runTask(String[] args) throws Exception
public String[] runTask(String[] args) throws Exception
{
int res = ToolRunner.run(new JobClient(), args);
return res == 0 ? "Success" : "Fail";
File hadoopJobIdFile = new File(args[0]);
String jobId = null;

try {
if (hadoopJobIdFile.exists()) {
jobId = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(hadoopJobIdFile, String.class);
}
}
catch (Exception e) {
log.warn(e, "exeption while reading hadoop job id from: [%s]", hadoopJobIdFile);
}

if (jobId != null) {
int res = ToolRunner.run(new JobClient(), new String[]{
"-kill",
jobId
});

return new String[] {jobId, (res == 0 ? "Success" : "Fail")};
}
return new String[] {jobId, "Fail"};
}
}

Expand Down