From 5591ae10d96351d4b0f3ccb74fcef3aa518a4d74 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 23 Jul 2019 11:47:23 -0700 Subject: [PATCH] fix forking task runner task shutdown to be more graceful (#8085) * fix forking task runner shutdown to be more graceful * javadoc --- .../indexing/overlord/ForkingTaskRunner.java | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index 05f2f52c3897..e0b9291c9781 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -531,16 +531,7 @@ public void stop() synchronized (tasks) { for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) { - if (taskWorkItem.processHolder != null) { - log.info("Closing output stream to task[%s].", taskWorkItem.getTask().getId()); - try { - taskWorkItem.processHolder.process.getOutputStream().close(); - } - catch (Exception e) { - log.warn(e, "Failed to close stdout to task[%s]. Destroying task.", taskWorkItem.getTask().getId()); - taskWorkItem.processHolder.process.destroy(); - } - } + shutdownTaskProcess(taskWorkItem); } } @@ -597,12 +588,8 @@ public void shutdown(final String taskid, String reason) } taskInfo.shutdown = true; - } - if (taskInfo.processHolder != null) { - // Will trigger normal failure mechanisms due to process exit - log.info("Killing process for task: %s", taskid); - taskInfo.processHolder.process.destroy(); + shutdownTaskProcess(taskInfo); } } @@ -698,8 +685,10 @@ public InputStream openStream() throws IOException ); } - // Save running tasks to a file, so they can potentially be restored on next startup. Suppresses exceptions that - // occur while saving. + /** + * Save running tasks to a file, so they can potentially be restored on next startup. Suppresses exceptions that occur + * while saving. + */ @GuardedBy("tasks") private void saveRunningTasks() { @@ -718,6 +707,25 @@ private void saveRunningTasks() } } + /** + * Close task output stream (input stream of process) sending EOF telling process to terminate, destroying the process + * if an exception is encountered. + */ + private void shutdownTaskProcess(ForkingTaskRunnerWorkItem taskInfo) + { + if (taskInfo.processHolder != null) { + // Will trigger normal failure mechanisms due to process exit + log.info("Closing output stream to task[%s].", taskInfo.getTask().getId()); + try { + taskInfo.processHolder.process.getOutputStream().close(); + } + catch (Exception e) { + log.warn(e, "Failed to close stdout to task[%s]. Destroying task.", taskInfo.getTask().getId()); + taskInfo.processHolder.process.destroy(); + } + } + } + private File getRestoreFile() { return new File(taskConfig.getBaseTaskDir(), TASK_RESTORE_FILENAME);