diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index 53db87d93e01..88da445d61d7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -527,16 +527,7 @@ public void stop() synchronized (tasks) { for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) { - if (taskWorkItem.processHolder != null) { - log.info("Closing output stream to task[%s].", taskWorkItem.getTask().getId()); - try { - taskWorkItem.processHolder.process.getOutputStream().close(); - } - catch (Exception e) { - log.warn(e, "Failed to close stdout to task[%s]. Destroying task.", taskWorkItem.getTask().getId()); - taskWorkItem.processHolder.process.destroy(); - } - } + shutdownTaskProcess(taskWorkItem); } } @@ -593,12 +584,8 @@ public void shutdown(final String taskid, String reason) } taskInfo.shutdown = true; - } - if (taskInfo.processHolder != null) { - // Will trigger normal failure mechanisms due to process exit - log.info("Killing process for task: %s", taskid); - taskInfo.processHolder.process.destroy(); + shutdownTaskProcess(taskInfo); } } @@ -694,8 +681,10 @@ public InputStream openStream() throws IOException ); } - // Save running tasks to a file, so they can potentially be restored on next startup. Suppresses exceptions that - // occur while saving. + /** + * Save running tasks to a file, so they can potentially be restored on next startup. Suppresses exceptions that occur + * while saving. + */ @GuardedBy("tasks") private void saveRunningTasks() { @@ -714,6 +703,25 @@ private void saveRunningTasks() } } + /** + * Close task output stream (input stream of process) sending EOF telling process to terminate, destroying the process + * if an exception is encountered. + */ + private void shutdownTaskProcess(ForkingTaskRunnerWorkItem taskInfo) + { + if (taskInfo.processHolder != null) { + // Will trigger normal failure mechanisms due to process exit + log.info("Closing output stream to task[%s].", taskInfo.getTask().getId()); + try { + taskInfo.processHolder.process.getOutputStream().close(); + } + catch (Exception e) { + log.warn(e, "Failed to close stdout to task[%s]. Destroying task.", taskInfo.getTask().getId()); + taskInfo.processHolder.process.destroy(); + } + } + } + private File getRestoreFile() { return new File(taskConfig.getBaseTaskDir(), TASK_RESTORE_FILENAME);