From 51d16cc6e9c013aaf68561821ca6834750cd9dcb Mon Sep 17 00:00:00 2001 From: Ankit Kothari Date: Tue, 5 Feb 2019 17:07:48 -0800 Subject: [PATCH 1/3] Fix: 1. hadoop-common dependency for druid-hdfs and druid-kerberos extensions Refactoring: 2. Hadoop config call in the inner static class to avoid class path conflicts for stopGracefully kill --- extensions-core/druid-kerberos/pom.xml | 1 + extensions-core/hdfs-storage/pom.xml | 147 +++++++++++++++++- indexing-hadoop/pom.xml | 32 ++-- .../indexing/common/task/HadoopIndexTask.java | 73 +++++---- 4 files changed, 197 insertions(+), 56 deletions(-) diff --git a/extensions-core/druid-kerberos/pom.xml b/extensions-core/druid-kerberos/pom.xml index b6fad3510f58..6fbdb5459084 100644 --- a/extensions-core/druid-kerberos/pom.xml +++ b/extensions-core/druid-kerberos/pom.xml @@ -71,6 +71,7 @@ org.apache.hadoop hadoop-common ${hadoop.compile.version} + compile commons-cli diff --git a/extensions-core/hdfs-storage/pom.xml b/extensions-core/hdfs-storage/pom.xml index c60787717db9..05cd727e3268 100644 --- a/extensions-core/hdfs-storage/pom.xml +++ b/extensions-core/hdfs-storage/pom.xml @@ -151,6 +151,146 @@ + + org.apache.hadoop + hadoop-common + ${hadoop.compile.version} + compile + + + commons-cli + commons-cli + + + commons-httpclient + commons-httpclient + + + log4j + log4j + + + commons-codec + commons-codec + + + commons-logging + commons-logging + + + commons-io + commons-io + + + commons-lang + commons-lang + + + org.apache.httpcomponents + httpclient + + + org.apache.httpcomponents + httpcore + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + org.apache.zookeeper + zookeeper + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-log4j12 + + + javax.ws.rs + jsr311-api + + + com.google.code.findbugs + jsr305 + + + org.mortbay.jetty + jetty-util + + + org.apache.hadoop + hadoop-annotations + + + com.google.protobuf + protobuf-java + + + com.sun.jersey + jersey-core + + + org.apache.curator + curator-client + + + org.apache.commons + commons-math3 + + + com.google.guava + guava + + + com.google.code.gson + gson + + + org.apache.avro + avro + + + net.java.dev.jets3t + jets3t + + + com.sun.jersey + jersey-json + + + com.jcraft + jsch + + + org.mortbay.jetty + jetty + + + com.sun.jersey + jersey-server + + + + + org.apache.hadoop + hadoop-mapreduce-client-core + 2.8.3 + compile + + + servlet-api + javax.servlet + + + org.apache.hadoop hadoop-aws @@ -189,13 +329,6 @@ tests test - - org.apache.hadoop - hadoop-common - ${hadoop.compile.version} - tests - test - org.apache.hadoop hadoop-hdfs diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml index 1fd923c99bfe..aecacbf7f9d4 100644 --- a/indexing-hadoop/pom.xml +++ b/indexing-hadoop/pom.xml @@ -83,6 +83,22 @@ com.google.code.findbugs jsr305 + + org.apache.hadoop + hadoop-common + provided + + + org.apache.hadoop + hadoop-mapreduce-client-core + provided + + + javax.servlet + servlet-api + + + @@ -130,22 +146,6 @@ ${hadoop.compile.version} test - - org.apache.hadoop - hadoop-common - provided - - - org.apache.hadoop - hadoop-mapreduce-client-core - provided - - - javax.servlet - servlet-api - - - org.apache.druid druid-server diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 398ed96a2f39..3ab7a5d713ef 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -431,43 +431,31 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception public void stopGracefully(TaskConfig taskConfig) { final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); - File hadoopJobIdFile = new File(getHadoopJobIdFileName()); - String jobId = null; + String hadoopJobIdFile = getHadoopJobIdFileName(); try { - if (hadoopJobIdFile.exists()) { - jobId = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(hadoopJobIdFile, String.class); - } - } - catch (Exception e) { - log.warn(e, "exeption while reading Hadoop Job ID from: %s", hadoopJobIdFile); - } + ClassLoader loader = HadoopTask.buildClassLoader(getHadoopDependencyCoordinates(), + taskConfig.getDefaultHadoopCoordinates()); - try { - if (jobId != null) { - ClassLoader loader = HadoopTask.buildClassLoader(getHadoopDependencyCoordinates(), - taskConfig.getDefaultHadoopCoordinates()); + Object killMRJobInnerProcessingRunner = getForeignClassloaderObject( + "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner", + loader + ); - Object killMRJobInnerProcessingRunner = getForeignClassloaderObject( - "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner", - loader - ); - String[] buildKillJobInput = new String[]{ - "-kill", - jobId - }; + String[] buildKillJobInput = new String[]{ + hadoopJobIdFile + }; - Class buildKillJobRunnerClass = killMRJobInnerProcessingRunner.getClass(); - Method innerProcessingRunTask = buildKillJobRunnerClass.getMethod("runTask", buildKillJobInput.getClass()); + Class buildKillJobRunnerClass = killMRJobInnerProcessingRunner.getClass(); + Method innerProcessingRunTask = buildKillJobRunnerClass.getMethod("runTask", buildKillJobInput.getClass()); - Thread.currentThread().setContextClassLoader(loader); - final String killStatusString = (String) innerProcessingRunTask.invoke( - killMRJobInnerProcessingRunner, - new Object[]{buildKillJobInput} - ); + Thread.currentThread().setContextClassLoader(loader); + final String killStatusString[] = (String[]) innerProcessingRunTask.invoke( + killMRJobInnerProcessingRunner, + new Object[]{buildKillJobInput} + ); - log.info(StringUtils.format("Tried killing job %s , status: %s", jobId, killStatusString)); - } + log.info(StringUtils.format("Tried killing job: [%s], status: [%s]", killStatusString[0], killStatusString[1])); } catch (Exception e) { throw new RuntimeException(e); @@ -722,10 +710,29 @@ public Map getStats() @SuppressWarnings("unused") public static class HadoopKillMRJobIdProcessingRunner { - public String runTask(String[] args) throws Exception + public String[] runTask(String[] args) throws Exception { - int res = ToolRunner.run(new JobClient(), args); - return res == 0 ? "Success" : "Fail"; + File hadoopJobIdFile = new File(args[0]); + String jobId = null; + + try { + if (hadoopJobIdFile.exists()) { + jobId = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(hadoopJobIdFile, String.class); + } + } + catch (Exception e) { + log.warn(e, "exeption while reading hadoop job id from: [%s]", hadoopJobIdFile); + } + + if (jobId != null) { + int res = ToolRunner.run(new JobClient(), new String[]{ + "-kill", + jobId + }); + + return new String[] {jobId, (res == 0 ? "Success" : "Fail")}; + } + return new String[] {jobId, "Fail"}; } } From f0ad3cbcd3569d48daf13941f322e0db4919ad7d Mon Sep 17 00:00:00 2001 From: Ankit Kothari Date: Wed, 6 Feb 2019 16:56:37 -0800 Subject: [PATCH 2/3] Fix: 1. hadoop-common test dependency --- extensions-core/hdfs-storage/pom.xml | 23 +++++++---------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/extensions-core/hdfs-storage/pom.xml b/extensions-core/hdfs-storage/pom.xml index 05cd727e3268..f8eda5d7eca8 100644 --- a/extensions-core/hdfs-storage/pom.xml +++ b/extensions-core/hdfs-storage/pom.xml @@ -249,10 +249,6 @@ com.google.guava guava - - com.google.code.gson - gson - org.apache.avro avro @@ -279,18 +275,6 @@ - - org.apache.hadoop - hadoop-mapreduce-client-core - 2.8.3 - compile - - - servlet-api - javax.servlet - - - org.apache.hadoop hadoop-aws @@ -304,6 +288,13 @@ + + org.apache.hadoop + hadoop-common + ${hadoop.compile.version} + tests + test + junit junit From 84a03b7325b9723e39104f777d7dcb0bf74ad91e Mon Sep 17 00:00:00 2001 From: Ankit Kothari Date: Wed, 6 Feb 2019 21:44:53 -0800 Subject: [PATCH 3/3] Fix: 1. Avoid issue of kill command once the job is actually completed --- .../indexing/common/task/HadoopIndexTask.java | 55 ++++++++++--------- 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 3ab7a5d713ef..62c23e734a53 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -430,38 +430,41 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception @Override public void stopGracefully(TaskConfig taskConfig) { - final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); - String hadoopJobIdFile = getHadoopJobIdFileName(); + // To avoid issue of kill command once the ingestion task is actually completed + if (!ingestionState.equals(IngestionState.COMPLETED)) { + final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); + String hadoopJobIdFile = getHadoopJobIdFileName(); - try { - ClassLoader loader = HadoopTask.buildClassLoader(getHadoopDependencyCoordinates(), - taskConfig.getDefaultHadoopCoordinates()); + try { + ClassLoader loader = HadoopTask.buildClassLoader(getHadoopDependencyCoordinates(), + taskConfig.getDefaultHadoopCoordinates()); - Object killMRJobInnerProcessingRunner = getForeignClassloaderObject( - "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner", - loader - ); + Object killMRJobInnerProcessingRunner = getForeignClassloaderObject( + "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner", + loader + ); - String[] buildKillJobInput = new String[]{ - hadoopJobIdFile - }; + String[] buildKillJobInput = new String[]{ + hadoopJobIdFile + }; - Class buildKillJobRunnerClass = killMRJobInnerProcessingRunner.getClass(); - Method innerProcessingRunTask = buildKillJobRunnerClass.getMethod("runTask", buildKillJobInput.getClass()); + Class buildKillJobRunnerClass = killMRJobInnerProcessingRunner.getClass(); + Method innerProcessingRunTask = buildKillJobRunnerClass.getMethod("runTask", buildKillJobInput.getClass()); - Thread.currentThread().setContextClassLoader(loader); - final String killStatusString[] = (String[]) innerProcessingRunTask.invoke( - killMRJobInnerProcessingRunner, - new Object[]{buildKillJobInput} - ); + Thread.currentThread().setContextClassLoader(loader); + final String killStatusString[] = (String[]) innerProcessingRunTask.invoke( + killMRJobInnerProcessingRunner, + new Object[]{buildKillJobInput} + ); - log.info(StringUtils.format("Tried killing job: [%s], status: [%s]", killStatusString[0], killStatusString[1])); - } - catch (Exception e) { - throw new RuntimeException(e); - } - finally { - Thread.currentThread().setContextClassLoader(oldLoader); + log.info(StringUtils.format("Tried killing job: [%s], status: [%s]", killStatusString[0], killStatusString[1])); + } + catch (Exception e) { + throw new RuntimeException(e); + } + finally { + Thread.currentThread().setContextClassLoader(oldLoader); + } } }