From cae49980c1e6b0c0ba250bdfcab7cc1e1dca83e9 Mon Sep 17 00:00:00 2001 From: Ankit Kothari Date: Fri, 12 Jul 2019 13:51:16 -0700 Subject: [PATCH] Issue task.stopGracefully() from ForkingTaskRunner as when hadoop ingestion task is killed, the control does to the ForkingTaskRunner::shutdown --- .../druid/indexing/common/task/HadoopIndexTask.java | 8 +++----- .../apache/druid/indexing/overlord/ForkingTaskRunner.java | 3 ++- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 2571c576d2aa..f3ef79318e4c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -84,7 +84,6 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler { private static final Logger log = new Logger(HadoopIndexTask.class); private static final String HADOOP_JOB_ID_FILENAME = "mapReduceJobId.json"; - private TaskConfig taskConfig = null; private static String getTheDataSource(HadoopIngestionSpec spec) { @@ -223,7 +222,7 @@ public String getClasspathPrefix() return classpathPrefix; } - public String getHadoopJobIdFileName() + private String getHadoopJobIdFileName(TaskConfig taskConfig) { return new File(taskConfig.getTaskDir(getId()), HADOOP_JOB_ID_FILENAME).getAbsolutePath(); } @@ -232,7 +231,6 @@ public String getHadoopJobIdFileName() public TaskStatus run(TaskToolbox toolbox) { try { - taskConfig = toolbox.getConfig(); if (chatHandlerProvider.isPresent()) { log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName()); chatHandlerProvider.get().register(getId(), this, false); @@ -270,7 +268,7 @@ public TaskStatus run(TaskToolbox toolbox) @SuppressWarnings("unchecked") private TaskStatus runInternal(TaskToolbox toolbox) throws Exception { - String hadoopJobIdFile = getHadoopJobIdFileName(); + String hadoopJobIdFile = getHadoopJobIdFileName(toolbox.getConfig()); final ClassLoader loader = buildClassLoader(toolbox); boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent(); @@ -432,7 +430,7 @@ public void stopGracefully(TaskConfig taskConfig) // To avoid issue of kill command once the ingestion task is actually completed if (!ingestionState.equals(IngestionState.COMPLETED)) { final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); - String hadoopJobIdFile = getHadoopJobIdFileName(); + String hadoopJobIdFile = getHadoopJobIdFileName(taskConfig); try { ClassLoader loader = HadoopTask.buildClassLoader(getHadoopDependencyCoordinates(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index 53db87d93e01..7d9a326a0572 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -596,8 +596,9 @@ public void shutdown(final String taskid, String reason) } if (taskInfo.processHolder != null) { - // Will trigger normal failure mechanisms due to process exit + // Will cleanup the underlying running task if any and trigger normal failure mechanisms due to process exit log.info("Killing process for task: %s", taskid); + taskInfo.getTask().stopGracefully(taskConfig); taskInfo.processHolder.process.destroy(); } }