From bceefdcd587054245710bf1505cce490c6d0abee Mon Sep 17 00:00:00 2001 From: Andrew Ho Date: Mon, 17 Nov 2025 13:22:31 -0800 Subject: [PATCH 1/3] Ignore all exceptions when pushing task logs --- .../org/apache/druid/indexing/overlord/ForkingTaskRunner.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index 9a7f2905b110..5d87ab602fba 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -524,7 +524,7 @@ int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File lo try { taskLogPusher.pushTaskLog(task.getId(), logFile); } - catch (IOException e) { + catch (Exception e) { LOGGER.error("Task[%s] failed to push task logs to [%s]: Exception[%s]", task.getId(), logFile.getName(), e.getMessage()); } @@ -532,7 +532,7 @@ int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File lo try { taskLogPusher.pushTaskReports(task.getId(), reportsFile); } - catch (IOException e) { + catch (Exception e) { LOGGER.error("Task[%s] failed to push task reports to [%s]: Exception[%s]", task.getId(), reportsFile.getName(), e.getMessage()); } From a15ff10037efa613653d7d9b1f12e304dfe2b921 Mon Sep 17 00:00:00 2001 From: Andrew Ho Date: Thu, 20 Nov 2025 14:09:43 -0800 Subject: [PATCH 2/3] Add unit test to ensure that task succeeds even if log upload fails --- .../overlord/ForkingTaskRunnerTest.java | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java index 3664af6fd82c..05a00e21adbe 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java @@ -535,6 +535,59 @@ ProcessHolder runTaskProcess(List command, File logFile, TaskLocation ta Assert.assertTrue(forkingTaskRunner.restore().isEmpty()); } + @Test + public void testTaskStatusWhenTaskLogUploadFails() throws Exception + { + class ExceptionTaskLogs extends NoopTaskLogs { + @Override + public void pushTaskLog(String taskid, File logFile) { + throw new RuntimeException("Exception occurred while pushing task logs"); + } + } + TaskConfig taskConfig = makeDefaultTaskConfigBuilder() + .build(); + final WorkerConfig workerConfig = new WorkerConfig(); + ExceptionTaskLogs exceptionTaskLogs = new ExceptionTaskLogs(); + ObjectMapper mapper = new DefaultObjectMapper(); + Task task = NoopTask.create(); + ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner( + new ForkingTaskRunnerConfig(), + taskConfig, + workerConfig, + new Properties(), + exceptionTaskLogs, + mapper, + new DruidNode("middleManager", "host", false, 8091, null, true, false), + new StartupLoggingConfig(), + TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig) + ) + { + @Override + ProcessHolder runTaskProcess(List command, File logFile, TaskLocation taskLocation) throws IOException { + for (String param : command) { + if (param.endsWith(task.getId())) { + final String basePath = getTracker().pickStorageSlot(task.getId()).getDirectory().getAbsolutePath(); + File resultFile = Paths.get(basePath, task.getId(), "attempt", "1", "status.json").toFile(); + mapper.writeValue(resultFile, TaskStatus.success(task.getId())); + break; + } + } + MockTestProcess mockTestProcess = new MockTestProcess() + { + @Override + public int waitFor() + { + return 0; + } + }; + return new ForkingTaskRunner.ProcessHolder(mockTestProcess, logFile, taskLocation); + } + }; + forkingTaskRunner.setNumProcessorsPerTask(); + final TaskStatus status = forkingTaskRunner.run(task).get(); + assertEquals(TaskState.SUCCESS, status.getStatusCode()); + } + public static TaskConfigBuilder makeDefaultTaskConfigBuilder() { return new TaskConfigBuilder() From e0f072ab08a8aab3ad4ba8871a33a5507aacb3a0 Mon Sep 17 00:00:00 2001 From: Andrew Ho Date: Thu, 20 Nov 2025 14:24:28 -0800 Subject: [PATCH 3/3] style fixes --- .../druid/indexing/overlord/ForkingTaskRunnerTest.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java index 05a00e21adbe..4b722be02e5f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java @@ -538,9 +538,11 @@ ProcessHolder runTaskProcess(List command, File logFile, TaskLocation ta @Test public void testTaskStatusWhenTaskLogUploadFails() throws Exception { - class ExceptionTaskLogs extends NoopTaskLogs { + class ExceptionTaskLogs extends NoopTaskLogs + { @Override - public void pushTaskLog(String taskid, File logFile) { + public void pushTaskLog(String taskid, File logFile) + { throw new RuntimeException("Exception occurred while pushing task logs"); } } @@ -563,7 +565,8 @@ public void pushTaskLog(String taskid, File logFile) { ) { @Override - ProcessHolder runTaskProcess(List command, File logFile, TaskLocation taskLocation) throws IOException { + ProcessHolder runTaskProcess(List command, File logFile, TaskLocation taskLocation) throws IOException + { for (String param : command) { if (param.endsWith(task.getId())) { final String basePath = getTracker().pickStorageSlot(task.getId()).getDirectory().getAbsolutePath();