diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 30bea70416bb..12c5bb602977 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -134,16 +134,18 @@ public Optional streamTaskLog(String taskid, long offset) @Override public ListenableFuture run(Task task) { - return tasks.computeIfAbsent( - task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task))) - ).getResult(); + synchronized (tasks) { + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task)))) + .getResult(); + } } protected ListenableFuture joinAsync(Task task) { - return tasks.computeIfAbsent( - task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task))) - ).getResult(); + synchronized (tasks) { + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task)))) + .getResult(); + } } private TaskStatus runTask(Task task) @@ -159,21 +161,23 @@ private TaskStatus joinTask(Task task) @VisibleForTesting protected TaskStatus doTask(Task task, boolean run) { - KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task); + try { + KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task); - KubernetesWorkItem workItem = tasks.get(task.getId()); + synchronized (tasks) { + KubernetesWorkItem workItem = tasks.get(task.getId()); - if (workItem == null) { - throw new ISE("Task [%s] disappeared", task.getId()); - } + if (workItem == null) { + throw new ISE("Task [%s] disappeared", task.getId()); + } - if (workItem.isShutdownRequested()) { - throw new ISE("Task [%s] has been shut down", task.getId()); - } + if (workItem.isShutdownRequested()) { + throw new ISE("Task [%s] has been shut down", task.getId()); + } - workItem.setKubernetesPeonLifecycle(peonLifecycle); + workItem.setKubernetesPeonLifecycle(peonLifecycle); + } - try { TaskStatus taskStatus; if (run) { taskStatus = peonLifecycle.run( @@ -191,14 +195,14 @@ protected TaskStatus doTask(Task task, boolean run) return taskStatus; } - catch (Exception e) { log.error(e, "Task [%s] execution caught an exception", task.getId()); throw new RuntimeException(e); } - finally { - tasks.remove(task.getId()); + synchronized (tasks) { + tasks.remove(task.getId()); + } } } @@ -269,17 +273,17 @@ public Optional streamTaskReports(String taskid) throws IOException @Override public List>> restore() { - List>> tasks = new ArrayList<>(); + List>> restoredTasks = new ArrayList<>(); for (Job job : client.getPeonJobs()) { try { Task task = adapter.toTask(job); - tasks.add(Pair.of(task, joinAsync(task))); + restoredTasks.add(Pair.of(task, joinAsync(task))); } catch (IOException e) { log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName()); } } - return tasks; + return restoredTasks; } @Override @@ -319,7 +323,6 @@ public Collection getKnownTasks() return Lists.newArrayList(tasks.values()); } - @Override public Optional getScalingStats() { @@ -386,18 +389,18 @@ public void registerListener(TaskRunnerListener listener, Executor executor) public Collection getRunningTasks() { return tasks.values() - .stream() - .filter(KubernetesWorkItem::isRunning) - .collect(Collectors.toList()); + .stream() + .filter(KubernetesWorkItem::isRunning) + .collect(Collectors.toList()); } @Override public Collection getPendingTasks() { return tasks.values() - .stream() - .filter(KubernetesWorkItem::isPending) - .collect(Collectors.toList()); + .stream() + .filter(KubernetesWorkItem::isPending) + .collect(Collectors.toList()); } @Nullable diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 4a3dd322a73c..0ef053a61982 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -32,7 +32,6 @@ import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; @@ -237,17 +236,17 @@ public void test_join_whenExceptionThrown_throwsRuntimeException() } @Test - public void test_doTask_withoutWorkItem_throwsISE() + public void test_doTask_withoutWorkItem_throwsRuntimeException() { Assert.assertThrows( "Task [id] disappeared", - ISE.class, + RuntimeException.class, () -> runner.doTask(task, true) ); } @Test - public void test_doTask_whenShutdownRequested_throwsISE() + public void test_doTask_whenShutdownRequested_throwsRuntimeException() { KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); workItem.shutdown(); @@ -256,7 +255,7 @@ public void test_doTask_whenShutdownRequested_throwsISE() Assert.assertThrows( "Task [id] has been shut down", - ISE.class, + RuntimeException.class, () -> runner.doTask(task, true) ); }