diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java index 5eb55b097b4b..6e0794a1e45f 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java @@ -29,12 +29,15 @@ import org.apache.druid.java.util.common.ISE; import java.io.InputStream; +import java.util.concurrent.atomic.AtomicBoolean; public class KubernetesWorkItem extends TaskRunnerWorkItem { private final Task task; private final KubernetesPeonLifecycle kubernetesPeonLifecycle; + private final AtomicBoolean isShutdown = new AtomicBoolean(false); + public KubernetesWorkItem(Task task, ListenableFuture statusFuture, KubernetesPeonLifecycle kubernetesPeonLifecycle) { super(task.getId(), statusFuture); @@ -42,10 +45,17 @@ public KubernetesWorkItem(Task task, ListenableFuture statusFuture, this.kubernetesPeonLifecycle = kubernetesPeonLifecycle; } - protected synchronized void shutdown() + /** + * Shuts down this work item. Subsequent calls to this method return immediately. + */ + protected void shutdown() { - this.kubernetesPeonLifecycle.startWatchingLogs(); - this.kubernetesPeonLifecycle.shutdown(); + if (isShutdown.compareAndSet(false, true)) { + synchronized (this) { + this.kubernetesPeonLifecycle.startWatchingLogs(); + this.kubernetesPeonLifecycle.shutdown(); + } + } } protected boolean isPending()