From 3fce8dc74faee4921d5a57a2ce677c81380147cb Mon Sep 17 00:00:00 2001 From: Ankit Kothari Date: Fri, 8 Feb 2019 18:26:37 -0800 Subject: [PATCH] [Issue #6967] NoClassDefFoundError when using druid-hdfs-storage (#7015) * Fix: 1. hadoop-common dependency for druid-hdfs and druid-kerberos extensions Refactoring: 2. Hadoop config call in the inner static class to avoid class path conflicts for stopGracefully kill * Fix: 1. hadoop-common test dependency * Fix: 1. Avoid issue of kill command once the job is actually completed --- extensions-core/druid-kerberos/pom.xml | 1 + extensions-core/hdfs-storage/pom.xml | 138 +++++++++++++++++- indexing-hadoop/pom.xml | 32 ++-- .../indexing/common/task/HadoopIndexTask.java | 64 ++++---- 4 files changed, 185 insertions(+), 50 deletions(-) diff --git a/extensions-core/druid-kerberos/pom.xml b/extensions-core/druid-kerberos/pom.xml index b6fad3510f58..6fbdb5459084 100644 --- a/extensions-core/druid-kerberos/pom.xml +++ b/extensions-core/druid-kerberos/pom.xml @@ -71,6 +71,7 @@ org.apache.hadoop hadoop-common ${hadoop.compile.version} + compile commons-cli diff --git a/extensions-core/hdfs-storage/pom.xml b/extensions-core/hdfs-storage/pom.xml index c60787717db9..f8eda5d7eca8 100644 --- a/extensions-core/hdfs-storage/pom.xml +++ b/extensions-core/hdfs-storage/pom.xml @@ -151,6 +151,130 @@ + + org.apache.hadoop + hadoop-common + ${hadoop.compile.version} + compile + + + commons-cli + commons-cli + + + commons-httpclient + commons-httpclient + + + log4j + log4j + + + commons-codec + commons-codec + + + commons-logging + commons-logging + + + commons-io + commons-io + + + commons-lang + commons-lang + + + org.apache.httpcomponents + httpclient + + + org.apache.httpcomponents + httpcore + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + org.apache.zookeeper + zookeeper + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-log4j12 + + + javax.ws.rs + jsr311-api + + + com.google.code.findbugs + jsr305 + + + org.mortbay.jetty + jetty-util + + + org.apache.hadoop + hadoop-annotations + + + com.google.protobuf + protobuf-java + + + com.sun.jersey + jersey-core + + + org.apache.curator + curator-client + + + org.apache.commons + commons-math3 + + + com.google.guava + guava + + + org.apache.avro + avro + + + net.java.dev.jets3t + jets3t + + + com.sun.jersey + jersey-json + + + com.jcraft + jsch + + + org.mortbay.jetty + jetty + + + com.sun.jersey + jersey-server + + + org.apache.hadoop hadoop-aws @@ -164,6 +288,13 @@ + + org.apache.hadoop + hadoop-common + ${hadoop.compile.version} + tests + test + junit junit @@ -189,13 +320,6 @@ tests test - - org.apache.hadoop - hadoop-common - ${hadoop.compile.version} - tests - test - org.apache.hadoop hadoop-hdfs diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml index 1fd923c99bfe..aecacbf7f9d4 100644 --- a/indexing-hadoop/pom.xml +++ b/indexing-hadoop/pom.xml @@ -83,6 +83,22 @@ com.google.code.findbugs jsr305 + + org.apache.hadoop + hadoop-common + provided + + + org.apache.hadoop + hadoop-mapreduce-client-core + provided + + + javax.servlet + servlet-api + + + @@ -130,22 +146,6 @@ ${hadoop.compile.version} test - - org.apache.hadoop - hadoop-common - provided - - - org.apache.hadoop - hadoop-mapreduce-client-core - provided - - - javax.servlet - servlet-api - - - org.apache.druid druid-server diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 398ed96a2f39..62c23e734a53 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -430,21 +430,12 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception @Override public void stopGracefully(TaskConfig taskConfig) { - final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); - File hadoopJobIdFile = new File(getHadoopJobIdFileName()); - String jobId = null; + // To avoid issue of kill command once the ingestion task is actually completed + if (!ingestionState.equals(IngestionState.COMPLETED)) { + final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); + String hadoopJobIdFile = getHadoopJobIdFileName(); - try { - if (hadoopJobIdFile.exists()) { - jobId = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(hadoopJobIdFile, String.class); - } - } - catch (Exception e) { - log.warn(e, "exeption while reading Hadoop Job ID from: %s", hadoopJobIdFile); - } - - try { - if (jobId != null) { + try { ClassLoader loader = HadoopTask.buildClassLoader(getHadoopDependencyCoordinates(), taskConfig.getDefaultHadoopCoordinates()); @@ -452,28 +443,28 @@ public void stopGracefully(TaskConfig taskConfig) "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner", loader ); + String[] buildKillJobInput = new String[]{ - "-kill", - jobId + hadoopJobIdFile }; Class buildKillJobRunnerClass = killMRJobInnerProcessingRunner.getClass(); Method innerProcessingRunTask = buildKillJobRunnerClass.getMethod("runTask", buildKillJobInput.getClass()); Thread.currentThread().setContextClassLoader(loader); - final String killStatusString = (String) innerProcessingRunTask.invoke( + final String killStatusString[] = (String[]) innerProcessingRunTask.invoke( killMRJobInnerProcessingRunner, new Object[]{buildKillJobInput} ); - log.info(StringUtils.format("Tried killing job %s , status: %s", jobId, killStatusString)); + log.info(StringUtils.format("Tried killing job: [%s], status: [%s]", killStatusString[0], killStatusString[1])); + } + catch (Exception e) { + throw new RuntimeException(e); + } + finally { + Thread.currentThread().setContextClassLoader(oldLoader); } - } - catch (Exception e) { - throw new RuntimeException(e); - } - finally { - Thread.currentThread().setContextClassLoader(oldLoader); } } @@ -722,10 +713,29 @@ public Map getStats() @SuppressWarnings("unused") public static class HadoopKillMRJobIdProcessingRunner { - public String runTask(String[] args) throws Exception + public String[] runTask(String[] args) throws Exception { - int res = ToolRunner.run(new JobClient(), args); - return res == 0 ? "Success" : "Fail"; + File hadoopJobIdFile = new File(args[0]); + String jobId = null; + + try { + if (hadoopJobIdFile.exists()) { + jobId = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(hadoopJobIdFile, String.class); + } + } + catch (Exception e) { + log.warn(e, "exeption while reading hadoop job id from: [%s]", hadoopJobIdFile); + } + + if (jobId != null) { + int res = ToolRunner.run(new JobClient(), new String[]{ + "-kill", + jobId + }); + + return new String[] {jobId, (res == 0 ? "Success" : "Fail")}; + } + return new String[] {jobId, "Fail"}; } }