From fb4fca9d5b30f416a31d77d26151b836e859223d Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 26 Sep 2025 09:44:16 +0530 Subject: [PATCH 1/3] Make KubernetesWorkItem.shutdown idempotent --- .../druid/k8s/overlord/KubernetesWorkItem.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java index 5eb55b097b4b..f9f47731532b 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java @@ -29,12 +29,15 @@ import org.apache.druid.java.util.common.ISE; import java.io.InputStream; +import java.util.concurrent.atomic.AtomicBoolean; public class KubernetesWorkItem extends TaskRunnerWorkItem { private final Task task; private final KubernetesPeonLifecycle kubernetesPeonLifecycle; + private final AtomicBoolean isShuttingDown = new AtomicBoolean(false); + public KubernetesWorkItem(Task task, ListenableFuture statusFuture, KubernetesPeonLifecycle kubernetesPeonLifecycle) { super(task.getId(), statusFuture); @@ -42,10 +45,15 @@ public KubernetesWorkItem(Task task, ListenableFuture statusFuture, this.kubernetesPeonLifecycle = kubernetesPeonLifecycle; } + /** + * Shuts down this work item. Subsequent calls to this method return immediately. + */ protected synchronized void shutdown() { - this.kubernetesPeonLifecycle.startWatchingLogs(); - this.kubernetesPeonLifecycle.shutdown(); + if (isShuttingDown.compareAndSet(false, true)) { + this.kubernetesPeonLifecycle.startWatchingLogs(); + this.kubernetesPeonLifecycle.shutdown(); + } } protected boolean isPending() From 84c7ff7d68463e37af88254d5ead755e67749a25 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 26 Sep 2025 09:47:45 +0530 Subject: [PATCH 2/3] Use sync block --- .../org/apache/druid/k8s/overlord/KubernetesWorkItem.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java index f9f47731532b..88c0a3b67038 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java @@ -48,11 +48,13 @@ public KubernetesWorkItem(Task task, ListenableFuture statusFuture, /** * Shuts down this work item. Subsequent calls to this method return immediately. */ - protected synchronized void shutdown() + protected void shutdown() { if (isShuttingDown.compareAndSet(false, true)) { - this.kubernetesPeonLifecycle.startWatchingLogs(); - this.kubernetesPeonLifecycle.shutdown(); + synchronized (this) { + this.kubernetesPeonLifecycle.startWatchingLogs(); + this.kubernetesPeonLifecycle.shutdown(); + } } } From cd22a9177ddf2f4d3b97606300e689c44ac7ad3e Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 26 Sep 2025 17:41:25 +0530 Subject: [PATCH 3/3] Minor fix --- .../org/apache/druid/k8s/overlord/KubernetesWorkItem.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java index 88c0a3b67038..6e0794a1e45f 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java @@ -36,7 +36,7 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem private final Task task; private final KubernetesPeonLifecycle kubernetesPeonLifecycle; - private final AtomicBoolean isShuttingDown = new AtomicBoolean(false); + private final AtomicBoolean isShutdown = new AtomicBoolean(false); public KubernetesWorkItem(Task task, ListenableFuture statusFuture, KubernetesPeonLifecycle kubernetesPeonLifecycle) { @@ -50,7 +50,7 @@ public KubernetesWorkItem(Task task, ListenableFuture statusFuture, */ protected void shutdown() { - if (isShuttingDown.compareAndSet(false, true)) { + if (isShutdown.compareAndSet(false, true)) { synchronized (this) { this.kubernetesPeonLifecycle.startWatchingLogs(); this.kubernetesPeonLifecycle.shutdown();