diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index fe2f4be37116..30bea70416bb 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -134,18 +134,16 @@ public Optional streamTaskLog(String taskid, long offset) @Override public ListenableFuture run(Task task) { - synchronized (tasks) { - tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task)))); - return tasks.get(task.getId()).getResult(); - } + return tasks.computeIfAbsent( + task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task))) + ).getResult(); } protected ListenableFuture joinAsync(Task task) { - synchronized (tasks) { - tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task)))); - return tasks.get(task.getId()).getResult(); - } + return tasks.computeIfAbsent( + task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task))) + ).getResult(); } private TaskStatus runTask(Task task) @@ -163,20 +161,18 @@ protected TaskStatus doTask(Task task, boolean run) { KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task); - synchronized (tasks) { - KubernetesWorkItem workItem = tasks.get(task.getId()); - - if (workItem == null) { - throw new ISE("Task [%s] disappeared", task.getId()); - } + KubernetesWorkItem workItem = tasks.get(task.getId()); - if (workItem.isShutdownRequested()) { - throw new ISE("Task [%s] has been shut down", task.getId()); - } + if (workItem == null) { + throw new ISE("Task [%s] disappeared", task.getId()); + } - workItem.setKubernetesPeonLifecycle(peonLifecycle); + if (workItem.isShutdownRequested()) { + throw new ISE("Task [%s] has been shut down", task.getId()); } + workItem.setKubernetesPeonLifecycle(peonLifecycle); + try { TaskStatus taskStatus; if (run) { @@ -202,9 +198,7 @@ protected TaskStatus doTask(Task task, boolean run) } finally { - synchronized (tasks) { - tasks.remove(task.getId()); - } + tasks.remove(task.getId()); } } @@ -322,9 +316,7 @@ public Map getTotalTaskSlotCount() @Override public Collection getKnownTasks() { - synchronized (tasks) { - return Lists.newArrayList(tasks.values()); - } + return Lists.newArrayList(tasks.values()); } @@ -393,23 +385,19 @@ public void registerListener(TaskRunnerListener listener, Executor executor) @Override public Collection getRunningTasks() { - synchronized (tasks) { - return tasks.values() - .stream() - .filter(KubernetesWorkItem::isRunning) - .collect(Collectors.toList()); - } + return tasks.values() + .stream() + .filter(KubernetesWorkItem::isRunning) + .collect(Collectors.toList()); } @Override public Collection getPendingTasks() { - synchronized (tasks) { - return tasks.values() - .stream() - .filter(KubernetesWorkItem::isPending) - .collect(Collectors.toList()); - } + return tasks.values() + .stream() + .filter(KubernetesWorkItem::isPending) + .collect(Collectors.toList()); } @Nullable