From c4be868d935f89a611e96158b0780c1c6a613fab Mon Sep 17 00:00:00 2001 From: YongGang Date: Sun, 23 Jul 2023 20:05:46 -0700 Subject: [PATCH 1/6] Fix race condition in KubernetesTaskRunner when task is added to the map --- .../k8s/overlord/KubernetesTaskRunner.java | 47 +++++++++++-------- .../overlord/KubernetesTaskRunnerTest.java | 8 ++-- 2 files changed, 31 insertions(+), 24 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 30bea70416bb..0169c6d98575 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -65,6 +65,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; /** @@ -100,7 +101,12 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner private final ListeningExecutorService exec; private final HttpClient httpClient; private final PeonLifecycleFactory peonLifecycleFactory; - + /** + * The tasksLock is used to ensure thread safety when adding tasks to the tasks map and when subsequently + * retrieving them from a different thread. Its purpose is to prevent a race condition where a task might + * be retrieved from the tasks map before it has been fully added, which could lead to unexpected behavior. + */ + private final ReentrantLock tasksLock = new ReentrantLock(true); public KubernetesTaskRunner( TaskAdapter adapter, @@ -134,26 +140,20 @@ public Optional streamTaskLog(String taskid, long offset) @Override public ListenableFuture run(Task task) { - return tasks.computeIfAbsent( - task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task))) - ).getResult(); - } - - protected ListenableFuture joinAsync(Task task) - { - return tasks.computeIfAbsent( - task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task))) - ).getResult(); + return runOrJoinTask(task, true); } - private TaskStatus runTask(Task task) + protected ListenableFuture runOrJoinTask(Task task, boolean run) { - return doTask(task, true); - } - - private TaskStatus joinTask(Task task) - { - return doTask(task, false); + tasksLock.lock(); + try { + return tasks.computeIfAbsent( + task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> doTask(task, run))) + ).getResult(); + } + finally { + tasksLock.unlock(); + } } @VisibleForTesting @@ -161,7 +161,14 @@ protected TaskStatus doTask(Task task, boolean run) { KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task); - KubernetesWorkItem workItem = tasks.get(task.getId()); + KubernetesWorkItem workItem; + tasksLock.lock(); + try { + workItem = tasks.get(task.getId()); + } + finally { + tasksLock.unlock(); + } if (workItem == null) { throw new ISE("Task [%s] disappeared", task.getId()); @@ -273,7 +280,7 @@ public List>> restore() for (Job job : client.getPeonJobs()) { try { Task task = adapter.toTask(job); - tasks.add(Pair.of(task, joinAsync(task))); + tasks.add(Pair.of(task, runOrJoinTask(task, false))); } catch (IOException e) { log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 4a3dd322a73c..e29312984845 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -200,7 +200,7 @@ public void test_join_withoutExistingTask() throws ExecutionException, Interrupt replayAll(); - ListenableFuture future = runner.joinAsync(task); + ListenableFuture future = runner.runOrJoinTask(task, false); Assert.assertEquals(taskStatus, future.get()); verifyAll(); @@ -226,7 +226,7 @@ public void test_join_whenExceptionThrown_throwsRuntimeException() replayAll(); - ListenableFuture future = runner.joinAsync(task); + ListenableFuture future = runner.runOrJoinTask(task, false); Exception e = Assert.assertThrows(ExecutionException.class, future::get); Assert.assertTrue(e.getCause() instanceof RuntimeException); @@ -293,7 +293,7 @@ public void test_restore_withExistingJobs() throws IOException new TestPeonLifecycleFactory(kubernetesPeonLifecycle) ) { @Override - protected ListenableFuture joinAsync(Task task) + protected ListenableFuture runOrJoinTask(Task task, boolean run) { return new KubernetesWorkItem(task, null).getResult(); } @@ -329,7 +329,7 @@ public void test_restore_whenDeserializationExceptionThrown_isIgnored() throws I new TestPeonLifecycleFactory(kubernetesPeonLifecycle) ) { @Override - protected ListenableFuture joinAsync(Task task) + protected ListenableFuture runOrJoinTask(Task task, boolean run) { return new KubernetesWorkItem(task, null).getResult(); } From f270ec5011c698770b54cd695c7c845fcf5c8227 Mon Sep 17 00:00:00 2001 From: YongGang Date: Mon, 24 Jul 2023 11:30:15 -0700 Subject: [PATCH 2/6] Remove lock and introduce Result setter --- .../k8s/overlord/KubernetesTaskRunner.java | 31 ++++------------- .../k8s/overlord/KubernetesWorkItem.java | 22 ++++++++++-- .../overlord/KubernetesTaskRunnerTest.java | 34 +++++++++---------- .../k8s/overlord/KubernetesWorkItemTest.java | 10 +++--- 4 files changed, 48 insertions(+), 49 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 0169c6d98575..9ea0aa555fbf 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -65,7 +65,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; /** @@ -101,12 +100,6 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner private final ListeningExecutorService exec; private final HttpClient httpClient; private final PeonLifecycleFactory peonLifecycleFactory; - /** - * The tasksLock is used to ensure thread safety when adding tasks to the tasks map and when subsequently - * retrieving them from a different thread. Its purpose is to prevent a race condition where a task might - * be retrieved from the tasks map before it has been fully added, which could lead to unexpected behavior. - */ - private final ReentrantLock tasksLock = new ReentrantLock(true); public KubernetesTaskRunner( TaskAdapter adapter, @@ -145,15 +138,11 @@ public ListenableFuture run(Task task) protected ListenableFuture runOrJoinTask(Task task, boolean run) { - tasksLock.lock(); - try { - return tasks.computeIfAbsent( - task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> doTask(task, run))) - ).getResult(); - } - finally { - tasksLock.unlock(); - } + KubernetesWorkItem workItem = tasks.computeIfAbsent( + task.getId(), k -> new KubernetesWorkItem(task) + ); + workItem.setResultIfRequired(exec.submit(() -> doTask(task, run))); + return workItem.getResult(); } @VisibleForTesting @@ -161,15 +150,7 @@ protected TaskStatus doTask(Task task, boolean run) { KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task); - KubernetesWorkItem workItem; - tasksLock.lock(); - try { - workItem = tasks.get(task.getId()); - } - finally { - tasksLock.unlock(); - } - + KubernetesWorkItem workItem = tasks.get(task.getId()); if (workItem == null) { throw new ISE("Task [%s] disappeared", task.getId()); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java index 0bfbe1afa059..9c44af8b5b6f 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java @@ -19,6 +19,7 @@ package org.apache.druid.k8s.overlord; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; @@ -38,10 +39,15 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); private KubernetesPeonLifecycle kubernetesPeonLifecycle = null; + private ListenableFuture statusFuture = null; - public KubernetesWorkItem(Task task, ListenableFuture statusFuture) + /** + * Constructs a new KubernetesWorkItem with the given task. The status future of the task are initially set to null. + * After constructing a new KubernetesWorkItem, the setResultIfRequired method should be called to set the status future. + */ + public KubernetesWorkItem(Task task) { - super(task.getId(), statusFuture); + super(task.getId(), null); this.task = task; } @@ -103,6 +109,18 @@ protected Optional streamTaskLogs() return kubernetesPeonLifecycle.streamLogs(); } + protected synchronized void setResultIfRequired(ListenableFuture statusFuture) + { + this.statusFuture = statusFuture; + } + + @Override + @JsonIgnore + public ListenableFuture getResult() + { + return statusFuture; + } + @Override public TaskLocation getLocation() { diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index e29312984845..3d408cc849f0 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -107,7 +107,7 @@ public void test_streamTaskLog_withoutExistingTask_returnsEmptyOptional() @Test public void test_streamTaskLog_withExistingTask() throws IOException { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) + KubernetesWorkItem workItem = new KubernetesWorkItem(task) { @Override protected Optional streamTaskLogs() @@ -155,7 +155,7 @@ public void test_run_withoutExistingTask() throws IOException, ExecutionExceptio @Test public void test_run_withExistingTask_returnsExistingWorkItem() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); + KubernetesWorkItem workItem = new KubernetesWorkItem(task); runner.tasks.put(task.getId(), workItem); ListenableFuture future = runner.run(task); @@ -211,7 +211,7 @@ public void test_join_withoutExistingTask() throws ExecutionException, Interrupt @Test public void test_join_withExistingTask_returnsExistingWorkItem() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); + KubernetesWorkItem workItem = new KubernetesWorkItem(task); runner.tasks.put(task.getId(), workItem); ListenableFuture future = runner.run(task); @@ -249,7 +249,7 @@ public void test_doTask_withoutWorkItem_throwsISE() @Test public void test_doTask_whenShutdownRequested_throwsISE() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); + KubernetesWorkItem workItem = new KubernetesWorkItem(task); workItem.shutdown(); runner.tasks.put(task.getId(), workItem); @@ -270,7 +270,7 @@ public void test_shutdown_withoutExistingTask() @Test public void test_shutdown_withExistingTask() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task) { @Override protected synchronized void shutdown() { @@ -295,7 +295,7 @@ public void test_restore_withExistingJobs() throws IOException @Override protected ListenableFuture runOrJoinTask(Task task, boolean run) { - return new KubernetesWorkItem(task, null).getResult(); + return new KubernetesWorkItem(task).getResult(); } }; @@ -331,7 +331,7 @@ public void test_restore_whenDeserializationExceptionThrown_isIgnored() throws I @Override protected ListenableFuture runOrJoinTask(Task task, boolean run) { - return new KubernetesWorkItem(task, null).getResult(); + return new KubernetesWorkItem(task).getResult(); } }; @@ -371,7 +371,7 @@ public void test_getTotalTaskSlotCount() @Test public void test_getKnownTasks() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); + KubernetesWorkItem workItem = new KubernetesWorkItem(task); runner.tasks.put(task.getId(), workItem); @@ -385,7 +385,7 @@ public void test_getKnownTasks() public void test_getRunningTasks() { Task pendingTask = NoopTask.create("pending-id", 0); - KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, null) { + KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -395,7 +395,7 @@ protected RunnerTaskState getRunnerTaskState() runner.tasks.put(pendingTask.getId(), pendingWorkItem); Task runningTask = NoopTask.create("running-id", 0); - KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, null) { + KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -414,7 +414,7 @@ protected RunnerTaskState getRunnerTaskState() public void test_getPendingTasks() { Task pendingTask = NoopTask.create("pending-id", 0); - KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, null) { + KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -424,7 +424,7 @@ protected RunnerTaskState getRunnerTaskState() runner.tasks.put(pendingTask.getId(), pendingWorkItem); Task runningTask = NoopTask.create("running-id", 0); - KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, null) { + KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -448,7 +448,7 @@ public void test_getRunnerTaskState_withoutExistingTask_returnsNull() @Test public void test_getRunnerTaskState_withExistingTask() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -463,7 +463,7 @@ protected RunnerTaskState getRunnerTaskState() @Test public void test_streamTaskReports_withExistingTask() throws Exception { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task) { @Override public TaskLocation getLocation() { @@ -498,7 +498,7 @@ public void test_streamTaskReports_withoutExistingTask_returnsEmptyOptional() th @Test public void test_streamTaskReports_withUnknownTaskLocation_returnsEmptyOptional() throws Exception { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task) { @Override public TaskLocation getLocation() { @@ -515,7 +515,7 @@ public TaskLocation getLocation() @Test public void test_streamTaskReports_whenInterruptedExceptionThrown_throwsRuntimeException() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task) { @Override public TaskLocation getLocation() { @@ -579,7 +579,7 @@ public InputStream get(long timeout, TimeUnit unit) throws InterruptedException @Test public void test_streamTaskReports_whenExecutionExceptionThrown_throwsRuntimeException() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task) { @Override public TaskLocation getLocation() { diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java index b272230b0795..b8e600383a65 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java @@ -45,7 +45,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport public void setup() { task = NoopTask.create("id", 0); - workItem = new KubernetesWorkItem(task, null); + workItem = new KubernetesWorkItem(task); } @Test @@ -95,7 +95,7 @@ public void test_shutdown_withKubernetesPeonLifecycle() @Test public void test_isPending_withTaskStateWaiting_returnsFalse() { - workItem = new KubernetesWorkItem(task, null) { + workItem = new KubernetesWorkItem(task) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -108,7 +108,7 @@ protected RunnerTaskState getRunnerTaskState() @Test public void test_isPending_withTaskStatePending_returnsTrue() { - workItem = new KubernetesWorkItem(task, null) { + workItem = new KubernetesWorkItem(task) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -121,7 +121,7 @@ protected RunnerTaskState getRunnerTaskState() @Test public void test_isRunning_withTaskStateWaiting_returnsFalse() { - workItem = new KubernetesWorkItem(task, null) { + workItem = new KubernetesWorkItem(task) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -134,7 +134,7 @@ protected RunnerTaskState getRunnerTaskState() @Test public void test_isRunning_withTaskStatePending_returnsTrue() { - workItem = new KubernetesWorkItem(task, null) { + workItem = new KubernetesWorkItem(task) { @Override protected RunnerTaskState getRunnerTaskState() { From 1b57061a412c20abae9d205030e58c12bdf5b521 Mon Sep 17 00:00:00 2001 From: YongGang Date: Tue, 25 Jul 2023 09:36:14 -0700 Subject: [PATCH 3/6] Add back synchronized block in run and doTask methods --- .../k8s/overlord/KubernetesTaskRunner.java | 52 ++++++++++--------- .../k8s/overlord/KubernetesWorkItem.java | 22 +------- .../overlord/KubernetesTaskRunnerTest.java | 34 ++++++------ .../k8s/overlord/KubernetesWorkItemTest.java | 10 ++-- 4 files changed, 52 insertions(+), 66 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 9ea0aa555fbf..346117c2761f 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -101,6 +101,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner private final HttpClient httpClient; private final PeonLifecycleFactory peonLifecycleFactory; + public KubernetesTaskRunner( TaskAdapter adapter, KubernetesTaskRunnerConfig config, @@ -138,11 +139,10 @@ public ListenableFuture run(Task task) protected ListenableFuture runOrJoinTask(Task task, boolean run) { - KubernetesWorkItem workItem = tasks.computeIfAbsent( - task.getId(), k -> new KubernetesWorkItem(task) - ); - workItem.setResultIfRequired(exec.submit(() -> doTask(task, run))); - return workItem.getResult(); + synchronized (tasks) { + tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> doTask(task, run)))); + return tasks.get(task.getId()).getResult(); + } } @VisibleForTesting @@ -150,16 +150,19 @@ protected TaskStatus doTask(Task task, boolean run) { KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task); - KubernetesWorkItem workItem = tasks.get(task.getId()); - if (workItem == null) { - throw new ISE("Task [%s] disappeared", task.getId()); - } + synchronized (tasks) { + KubernetesWorkItem workItem = tasks.get(task.getId()); - if (workItem.isShutdownRequested()) { - throw new ISE("Task [%s] has been shut down", task.getId()); - } + if (workItem == null) { + throw new ISE("Task [%s] disappeared", task.getId()); + } + + if (workItem.isShutdownRequested()) { + throw new ISE("Task [%s] has been shut down", task.getId()); + } - workItem.setKubernetesPeonLifecycle(peonLifecycle); + workItem.setKubernetesPeonLifecycle(peonLifecycle); + } try { TaskStatus taskStatus; @@ -186,7 +189,9 @@ protected TaskStatus doTask(Task task, boolean run) } finally { - tasks.remove(task.getId()); + synchronized (tasks) { + tasks.remove(task.getId()); + } } } @@ -257,17 +262,17 @@ public Optional streamTaskReports(String taskid) throws IOException @Override public List>> restore() { - List>> tasks = new ArrayList<>(); + List>> restoredTasks = new ArrayList<>(); for (Job job : client.getPeonJobs()) { try { Task task = adapter.toTask(job); - tasks.add(Pair.of(task, runOrJoinTask(task, false))); + restoredTasks.add(Pair.of(task, runOrJoinTask(task, false))); } catch (IOException e) { log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName()); } } - return tasks; + return restoredTasks; } @Override @@ -307,7 +312,6 @@ public Collection getKnownTasks() return Lists.newArrayList(tasks.values()); } - @Override public Optional getScalingStats() { @@ -374,18 +378,18 @@ public void registerListener(TaskRunnerListener listener, Executor executor) public Collection getRunningTasks() { return tasks.values() - .stream() - .filter(KubernetesWorkItem::isRunning) - .collect(Collectors.toList()); + .stream() + .filter(KubernetesWorkItem::isRunning) + .collect(Collectors.toList()); } @Override public Collection getPendingTasks() { return tasks.values() - .stream() - .filter(KubernetesWorkItem::isPending) - .collect(Collectors.toList()); + .stream() + .filter(KubernetesWorkItem::isPending) + .collect(Collectors.toList()); } @Nullable diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java index 9c44af8b5b6f..0bfbe1afa059 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java @@ -19,7 +19,6 @@ package org.apache.druid.k8s.overlord; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; @@ -39,15 +38,10 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); private KubernetesPeonLifecycle kubernetesPeonLifecycle = null; - private ListenableFuture statusFuture = null; - /** - * Constructs a new KubernetesWorkItem with the given task. The status future of the task are initially set to null. - * After constructing a new KubernetesWorkItem, the setResultIfRequired method should be called to set the status future. - */ - public KubernetesWorkItem(Task task) + public KubernetesWorkItem(Task task, ListenableFuture statusFuture) { - super(task.getId(), null); + super(task.getId(), statusFuture); this.task = task; } @@ -109,18 +103,6 @@ protected Optional streamTaskLogs() return kubernetesPeonLifecycle.streamLogs(); } - protected synchronized void setResultIfRequired(ListenableFuture statusFuture) - { - this.statusFuture = statusFuture; - } - - @Override - @JsonIgnore - public ListenableFuture getResult() - { - return statusFuture; - } - @Override public TaskLocation getLocation() { diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 3d408cc849f0..e29312984845 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -107,7 +107,7 @@ public void test_streamTaskLog_withoutExistingTask_returnsEmptyOptional() @Test public void test_streamTaskLog_withExistingTask() throws IOException { - KubernetesWorkItem workItem = new KubernetesWorkItem(task) + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { @Override protected Optional streamTaskLogs() @@ -155,7 +155,7 @@ public void test_run_withoutExistingTask() throws IOException, ExecutionExceptio @Test public void test_run_withExistingTask_returnsExistingWorkItem() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task); + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); runner.tasks.put(task.getId(), workItem); ListenableFuture future = runner.run(task); @@ -211,7 +211,7 @@ public void test_join_withoutExistingTask() throws ExecutionException, Interrupt @Test public void test_join_withExistingTask_returnsExistingWorkItem() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task); + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); runner.tasks.put(task.getId(), workItem); ListenableFuture future = runner.run(task); @@ -249,7 +249,7 @@ public void test_doTask_withoutWorkItem_throwsISE() @Test public void test_doTask_whenShutdownRequested_throwsISE() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task); + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); workItem.shutdown(); runner.tasks.put(task.getId(), workItem); @@ -270,7 +270,7 @@ public void test_shutdown_withoutExistingTask() @Test public void test_shutdown_withExistingTask() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { @Override protected synchronized void shutdown() { @@ -295,7 +295,7 @@ public void test_restore_withExistingJobs() throws IOException @Override protected ListenableFuture runOrJoinTask(Task task, boolean run) { - return new KubernetesWorkItem(task).getResult(); + return new KubernetesWorkItem(task, null).getResult(); } }; @@ -331,7 +331,7 @@ public void test_restore_whenDeserializationExceptionThrown_isIgnored() throws I @Override protected ListenableFuture runOrJoinTask(Task task, boolean run) { - return new KubernetesWorkItem(task).getResult(); + return new KubernetesWorkItem(task, null).getResult(); } }; @@ -371,7 +371,7 @@ public void test_getTotalTaskSlotCount() @Test public void test_getKnownTasks() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task); + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); runner.tasks.put(task.getId(), workItem); @@ -385,7 +385,7 @@ public void test_getKnownTasks() public void test_getRunningTasks() { Task pendingTask = NoopTask.create("pending-id", 0); - KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask) { + KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, null) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -395,7 +395,7 @@ protected RunnerTaskState getRunnerTaskState() runner.tasks.put(pendingTask.getId(), pendingWorkItem); Task runningTask = NoopTask.create("running-id", 0); - KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask) { + KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, null) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -414,7 +414,7 @@ protected RunnerTaskState getRunnerTaskState() public void test_getPendingTasks() { Task pendingTask = NoopTask.create("pending-id", 0); - KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask) { + KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, null) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -424,7 +424,7 @@ protected RunnerTaskState getRunnerTaskState() runner.tasks.put(pendingTask.getId(), pendingWorkItem); Task runningTask = NoopTask.create("running-id", 0); - KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask) { + KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, null) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -448,7 +448,7 @@ public void test_getRunnerTaskState_withoutExistingTask_returnsNull() @Test public void test_getRunnerTaskState_withExistingTask() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -463,7 +463,7 @@ protected RunnerTaskState getRunnerTaskState() @Test public void test_streamTaskReports_withExistingTask() throws Exception { - KubernetesWorkItem workItem = new KubernetesWorkItem(task) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { @Override public TaskLocation getLocation() { @@ -498,7 +498,7 @@ public void test_streamTaskReports_withoutExistingTask_returnsEmptyOptional() th @Test public void test_streamTaskReports_withUnknownTaskLocation_returnsEmptyOptional() throws Exception { - KubernetesWorkItem workItem = new KubernetesWorkItem(task) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { @Override public TaskLocation getLocation() { @@ -515,7 +515,7 @@ public TaskLocation getLocation() @Test public void test_streamTaskReports_whenInterruptedExceptionThrown_throwsRuntimeException() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { @Override public TaskLocation getLocation() { @@ -579,7 +579,7 @@ public InputStream get(long timeout, TimeUnit unit) throws InterruptedException @Test public void test_streamTaskReports_whenExecutionExceptionThrown_throwsRuntimeException() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task) { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { @Override public TaskLocation getLocation() { diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java index b8e600383a65..b272230b0795 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java @@ -45,7 +45,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport public void setup() { task = NoopTask.create("id", 0); - workItem = new KubernetesWorkItem(task); + workItem = new KubernetesWorkItem(task, null); } @Test @@ -95,7 +95,7 @@ public void test_shutdown_withKubernetesPeonLifecycle() @Test public void test_isPending_withTaskStateWaiting_returnsFalse() { - workItem = new KubernetesWorkItem(task) { + workItem = new KubernetesWorkItem(task, null) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -108,7 +108,7 @@ protected RunnerTaskState getRunnerTaskState() @Test public void test_isPending_withTaskStatePending_returnsTrue() { - workItem = new KubernetesWorkItem(task) { + workItem = new KubernetesWorkItem(task, null) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -121,7 +121,7 @@ protected RunnerTaskState getRunnerTaskState() @Test public void test_isRunning_withTaskStateWaiting_returnsFalse() { - workItem = new KubernetesWorkItem(task) { + workItem = new KubernetesWorkItem(task, null) { @Override protected RunnerTaskState getRunnerTaskState() { @@ -134,7 +134,7 @@ protected RunnerTaskState getRunnerTaskState() @Test public void test_isRunning_withTaskStatePending_returnsTrue() { - workItem = new KubernetesWorkItem(task) { + workItem = new KubernetesWorkItem(task, null) { @Override protected RunnerTaskState getRunnerTaskState() { From 182da42680242969db846a774f538f4535f63c65 Mon Sep 17 00:00:00 2001 From: YongGang Date: Tue, 25 Jul 2023 14:13:23 -0700 Subject: [PATCH 4/6] move task put to try block --- .../k8s/overlord/KubernetesTaskRunner.java | 26 +++++++++---------- .../overlord/KubernetesTaskRunnerTest.java | 8 +++--- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 346117c2761f..6551cf5fbf82 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -148,23 +148,23 @@ protected ListenableFuture runOrJoinTask(Task task, boolean run) @VisibleForTesting protected TaskStatus doTask(Task task, boolean run) { - KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task); + try { + KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task); - synchronized (tasks) { - KubernetesWorkItem workItem = tasks.get(task.getId()); + synchronized (tasks) { + KubernetesWorkItem workItem = tasks.get(task.getId()); - if (workItem == null) { - throw new ISE("Task [%s] disappeared", task.getId()); - } + if (workItem == null) { + throw new ISE("Task [%s] disappeared", task.getId()); + } - if (workItem.isShutdownRequested()) { - throw new ISE("Task [%s] has been shut down", task.getId()); - } + if (workItem.isShutdownRequested()) { + throw new ISE("Task [%s] has been shut down", task.getId()); + } - workItem.setKubernetesPeonLifecycle(peonLifecycle); - } + workItem.setKubernetesPeonLifecycle(peonLifecycle); + } - try { TaskStatus taskStatus; if (run) { taskStatus = peonLifecycle.run( @@ -182,12 +182,10 @@ protected TaskStatus doTask(Task task, boolean run) return taskStatus; } - catch (Exception e) { log.error(e, "Task [%s] execution caught an exception", task.getId()); throw new RuntimeException(e); } - finally { synchronized (tasks) { tasks.remove(task.getId()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index e29312984845..f779b8154671 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -237,17 +237,17 @@ public void test_join_whenExceptionThrown_throwsRuntimeException() } @Test - public void test_doTask_withoutWorkItem_throwsISE() + public void test_doTask_withoutWorkItem_throwsRuntimeException() { Assert.assertThrows( "Task [id] disappeared", - ISE.class, + RuntimeException.class, () -> runner.doTask(task, true) ); } @Test - public void test_doTask_whenShutdownRequested_throwsISE() + public void test_doTask_whenShutdownRequested_throwsRuntimeException() { KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); workItem.shutdown(); @@ -256,7 +256,7 @@ public void test_doTask_whenShutdownRequested_throwsISE() Assert.assertThrows( "Task [id] has been shut down", - ISE.class, + RuntimeException.class, () -> runner.doTask(task, true) ); } From 38cc459abfdaee1fd788bb0d7916ccc491c161a3 Mon Sep 17 00:00:00 2001 From: YongGang Date: Tue, 25 Jul 2023 16:34:53 -0700 Subject: [PATCH 5/6] fix checkstyle --- .../org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index f779b8154671..53ce8a8d3bc3 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -32,7 +32,6 @@ import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; From 0a78d11127a95b66c3b4c96280d4c662a8604797 Mon Sep 17 00:00:00 2001 From: YongGang Date: Wed, 26 Jul 2023 09:27:00 -0700 Subject: [PATCH 6/6] separate run and join methods --- .../k8s/overlord/KubernetesTaskRunner.java | 23 +++++++++++++++---- .../overlord/KubernetesTaskRunnerTest.java | 8 +++---- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 6551cf5fbf82..12c5bb602977 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -134,17 +134,30 @@ public Optional streamTaskLog(String taskid, long offset) @Override public ListenableFuture run(Task task) { - return runOrJoinTask(task, true); + synchronized (tasks) { + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task)))) + .getResult(); + } } - protected ListenableFuture runOrJoinTask(Task task, boolean run) + protected ListenableFuture joinAsync(Task task) { synchronized (tasks) { - tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> doTask(task, run)))); - return tasks.get(task.getId()).getResult(); + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task)))) + .getResult(); } } + private TaskStatus runTask(Task task) + { + return doTask(task, true); + } + + private TaskStatus joinTask(Task task) + { + return doTask(task, false); + } + @VisibleForTesting protected TaskStatus doTask(Task task, boolean run) { @@ -264,7 +277,7 @@ public List>> restore() for (Job job : client.getPeonJobs()) { try { Task task = adapter.toTask(job); - restoredTasks.add(Pair.of(task, runOrJoinTask(task, false))); + restoredTasks.add(Pair.of(task, joinAsync(task))); } catch (IOException e) { log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 53ce8a8d3bc3..0ef053a61982 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -199,7 +199,7 @@ public void test_join_withoutExistingTask() throws ExecutionException, Interrupt replayAll(); - ListenableFuture future = runner.runOrJoinTask(task, false); + ListenableFuture future = runner.joinAsync(task); Assert.assertEquals(taskStatus, future.get()); verifyAll(); @@ -225,7 +225,7 @@ public void test_join_whenExceptionThrown_throwsRuntimeException() replayAll(); - ListenableFuture future = runner.runOrJoinTask(task, false); + ListenableFuture future = runner.joinAsync(task); Exception e = Assert.assertThrows(ExecutionException.class, future::get); Assert.assertTrue(e.getCause() instanceof RuntimeException); @@ -292,7 +292,7 @@ public void test_restore_withExistingJobs() throws IOException new TestPeonLifecycleFactory(kubernetesPeonLifecycle) ) { @Override - protected ListenableFuture runOrJoinTask(Task task, boolean run) + protected ListenableFuture joinAsync(Task task) { return new KubernetesWorkItem(task, null).getResult(); } @@ -328,7 +328,7 @@ public void test_restore_whenDeserializationExceptionThrown_isIgnored() throws I new TestPeonLifecycleFactory(kubernetesPeonLifecycle) ) { @Override - protected ListenableFuture runOrJoinTask(Task task, boolean run) + protected ListenableFuture joinAsync(Task task) { return new KubernetesWorkItem(task, null).getResult(); }